Patterson
Patterson

Reputation: 2757

Azure Data Factory LOOKUP Activity issues

I have the following pipeline with a range of activities, see image below.

I keep on getting the error with my lookup activity

Failure happened on 'Source' side. ErrorCode=SqlOperationFailed,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=A database operation failed with the following error: 'Invalid column name 'updated_at'.',Source=,''Type=System.Data.SqlClient.SqlException,Message=Invalid column name 'updated_at'.,Source=.Net SqlClient Data Provider,SqlErrorNumber=207,Class=16,ErrorCode=-2146232060,State=1,Errors=[{Class=16,Number=207,State=1,Message=Invalid column name 'updated_at'.,},],'

I kind of know what the problem is.. the lookup isn't looping through the individual tables to find the column name 'updated_at'.

But, I don't understand why.

The Lookup 'Lookup New Watermark' activity has the following query

SELECT MAX(updated_at) as NewWatermarkvalue FROM @{item().Table_Name}

The ForEach activity 'For Each Table' as the following for Items:

@activity('Find SalesDB Tables').output.value

The Lookup activity 'Find SalesDB Tables' has the following query

SELECT QUOTENAME(table_schema)+'.'+QUOTENAME(table_name) AS Table_Name FROM information_Schema.tables WHERE table_name not in ('watermarktable', 'database_firewall_rules')

The only thing I can see that is wrong with the 'Lookup New Watermark' actvitiy is that its not looping through table. Can someone let me know what is needed.

enter image description here

enter image description here

enter image description here

Just to show the column exists I adjusted the connection from enter image description here

To the following:

enter image description here

And the Lookup was able to find the updated_at column on dbo.Products, but couldn't locate the updated_at column on the other 4 tables.

Therefore, I'm suggesting the problem is that the Lookup activity isn't iterating over the tables automatically.

Upvotes: 0

Views: 1922

Answers (1)

Saideep Arikontham
Saideep Arikontham

Reputation: 6104

The error is when using the following query on a table that does not have updated_at column, we get this error.

SELECT MAX(updated_at) as NewWatermarkvalue FROM @{item().Table_Name}
  • The items field in for each activity was given the value as @activity('FindSalesDBTables').output.value (returns a list of table names). Inside the for each, when we use the above query, it will be executed as following:
#first iteration
SELECT MAX(updated_at) as NewWatermarkvalue FROM <table_1>

#second iteration
SELECT MAX(updated_at) as NewWatermarkvalue FROM <table_2>
.
.
...
  • During this process, when we use the above query on a table that does not have updated_at column, it gives the same error. The following is a demonstration of the same.

  • I created 2 tables (for demonstration) called t1 and t2.

create table t1(id int, updated_at int)

create table t2(id int, up int)
  • I used look up activity to get the list of table names using the following query:
SELECT QUOTENAME(table_schema)+'.'+QUOTENAME(table_name) AS Table_Name FROM information_Schema.tables WHERE table_name not in ('watermarktable', 'database_firewall_rules','ipv6_database_firewall_rules')
  • Inside the for each activity (looping through @activity('lookup1').output.value), I have tried the same query as given.
SELECT MAX(updated_at) as NewWatermarkvalue FROM @{item().Table_Name}

After debugging the pipeline, we can observe that it produces the same error.

  • For iteration where the table is t1 (has updated_at column):

enter image description here

  • For iteration where the table is t2 (does not have updated_at column):

enter image description here

  • If you publish and run this pipeline, the pipeline will fail giving the same error.

  • Therefore, try to check if the updated_at column exists or not in the particular table (current for each item). If it does exist, proceed to query it.

  • Inside for each use look up with the following query. It returns the length of column in bytes if the column exists in a table, else it returns null. Use this result along with If condition activity.

select COL_LENGTH('@{item().Table_Name}','updated_at') as column_exists

enter image description here

  • Use the following condition in If activity. If it returns false, then it indicates that the particular table contains updated_at column and we can work with it.
@equals(activity('check for column in table').output.firstRow['column_exists'],null)

enter image description here

  • The following is the debug output for the same (t1 and t2 tables)

enter image description here

You can continue with other required activities inside the False section of the If condition activity using above process.

Upvotes: 2

Related Questions