Reputation: 11
We are migrating from ADF V1 to ADF V2. Our pipelines are scheduled to run daily and we have that part of V2 working. A pipeline will start at the appointed time.
My question is how do I pass the time that the scheduled trigger is scheduled to start into the where clause of a select statement in a copy activity? I suspect I've overlooked or misread some documentation, but I just can't figure it out.
No matter what I try, the variable does not seem to evaluate at runtime. So, the where clause looks something like this when it runs: where last_update>=’@pipeline().TriggerTime’ And obviously @pipeline().TriggerTime is not a date.
I’ve pulled this list of parameters to try from https://learn.microsoft.com/en-us/azure/data-factory/control-flow-system-variables
This is what worked in V1: "sqlReaderQuery": "$$Text.Format('select columns from table where last_update>=\'{0:yyyy-MM-dd}\'', WindowStart, WindowEnd)"
Things I've tried:
-- Passing @trigger().scheduledTime in as a parameter in the trigger to @{formatDateTime(pipeline().parameters.startDate,'yyyy-MM-dd')} in the where clause with a parameter in the pipeline defined as startDate of type string.
-- Setting @pipeline().TriggerTime as the default value for the pipeline().parameters.startDate.
-- Calling @pipeline().TriggerTime in the where clause.
Many thanks in advance.
EDIT:
{
"name": "PL_ADLS_RAW_IDMTables_DAILY",
"properties": {
"activities": [
{
"name": "isStartDateNotNull",
"description": "If the startDate parameter == 0 then run the full load. If the startDate parameter != 0 then run for >= startDate parameter.",
"type": "IfCondition",
"typeProperties": {
"expression": {
"value": "@equals(pipeline().parameters.startDate,'0')",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "copy_ENTITY_FULL",
"description": "Copies ENTITY from IDM Database to ADLS Raw Zone",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 60,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "SELECT [ENTITY_ID],[ENTITY_TYPE_ID],REPLACE(REPLACE([SHORT_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_DESCRIPTION], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([STATUS], CHAR(13),' '), CHAR(10),' '),[STATUS_DATE],[REVISION_ID],[CREATED_BY],[CREATION_DATE],[LAST_UPDATED_BY],[LAST_UPDATE_DATE],REPLACE(REPLACE([COMMENTS], CHAR(13),' '), CHAR(10),' ') FROM [dbo].[ENTITY] where LAST_UPDATE_DATE >= '@{formatDateTime(pipeline().parameters.startDate,'yyyy-MM-dd')}'",
"type": "Expression"
}
},
"sink": {
"type": "AzureDataLakeStoreSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "DS_IN_SQL_IDM_ENTITY",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "DS_OUT_ADLS_RAW_IDM_ENTITY",
"type": "DatasetReference"
}
]
}
],
"ifTrueActivities": [
{
"name": "copy_ENTITY",
"description": "Copies ENTITY from IDM Database to ADLS Raw Zone",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 60,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "SELECT [ENTITY_ID],[ENTITY_TYPE_ID],REPLACE(REPLACE([SHORT_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_DESCRIPTION], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([STATUS], CHAR(13),' '), CHAR(10),' '),[STATUS_DATE],[REVISION_ID],[CREATED_BY],[CREATION_DATE],[LAST_UPDATED_BY],[LAST_UPDATE_DATE],REPLACE(REPLACE([COMMENTS], CHAR(13),' '), CHAR(10),' ') FROM [dbo].[ENTITY]",
"type": "Expression"
}
},
"sink": {
"type": "AzureDataLakeStoreSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "DS_IN_SQL_IDM_ENTITY",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "DS_OUT_ADLS_RAW_IDM_ENTITY",
"type": "DatasetReference"
}
]
}
]
}
}
],
"parameters": {
"startDate": {
"type": "String"
}
}
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
{
"name": "TR_SCHED_0800EST",
"properties": {
"description": "Daily 0800 EST",
"runtimeState": "Stopped",
"pipelines": [
{
"pipelineReference": {
"referenceName": "PL_ADLS_RAW_IDMTables_DAILY",
"type": "PipelineReference"
},
"parameters": {
"startDate": "@trigger().scheduledTime"
}
}
],
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Day",
"interval": 1,
"startTime": "2018-08-30T13:00:00Z",
"timeZone": "UTC",
"schedule": {
"minutes": [
0
],
"hours": [
13
]
}
}
}
}
}
EDIT
This works
This does not work
Upvotes: 0
Views: 1219
Reputation: 2363
Passing @trigger().scheduledTime in as a parameter in the trigger to @{formatDateTime(pipeline().parameters.startDate,'yyyy-MM-dd')} in the where clause with a parameter in the pipeline defined as startDate of type string. is the right way.
When you edit the trigger in the UI,
Make sure you pass value in the following way.
Upvotes: 1