Mark Fennell
Mark Fennell

Reputation: 11

What is the ADF v2 equivalent of WindowStart?

We are migrating from ADF V1 to ADF V2. Our pipelines are scheduled to run daily and we have that part of V2 working. A pipeline will start at the appointed time.

My question is how do I pass the time that the scheduled trigger is scheduled to start into the where clause of a select statement in a copy activity? I suspect I've overlooked or misread some documentation, but I just can't figure it out.

No matter what I try, the variable does not seem to evaluate at runtime. So, the where clause looks something like this when it runs: where last_update>=’@pipeline().TriggerTime’ And obviously @pipeline().TriggerTime is not a date.

I’ve pulled this list of parameters to try from https://learn.microsoft.com/en-us/azure/data-factory/control-flow-system-variables

This is what worked in V1: "sqlReaderQuery": "$$Text.Format('select columns from table where last_update>=\'{0:yyyy-MM-dd}\'', WindowStart, WindowEnd)"

Things I've tried:

-- Passing @trigger().scheduledTime in as a parameter in the trigger to @{formatDateTime(pipeline().parameters.startDate,'yyyy-MM-dd')} in the where clause with a parameter in the pipeline defined as startDate of type string.

-- Setting @pipeline().TriggerTime as the default value for the pipeline().parameters.startDate.

-- Calling @pipeline().TriggerTime in the where clause.

Many thanks in advance.


EDIT:

Pipeline Source

{
    "name": "PL_ADLS_RAW_IDMTables_DAILY",
    "properties": {
        "activities": [
            {
                "name": "isStartDateNotNull",
                "description": "If the startDate parameter == 0 then run the full load. If the startDate parameter != 0 then run for >= startDate parameter.",
                "type": "IfCondition",
                "typeProperties": {
                    "expression": {
                        "value": "@equals(pipeline().parameters.startDate,'0')",
                        "type": "Expression"
                    },
                    "ifFalseActivities": [
                        {
                            "name": "copy_ENTITY_FULL",
                            "description": "Copies ENTITY from IDM Database to ADLS Raw Zone",
                            "type": "Copy",
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 60,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "SqlSource",
                                    "sqlReaderQuery": {
                                        "value": "SELECT [ENTITY_ID],[ENTITY_TYPE_ID],REPLACE(REPLACE([SHORT_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_DESCRIPTION], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([STATUS], CHAR(13),' '), CHAR(10),' '),[STATUS_DATE],[REVISION_ID],[CREATED_BY],[CREATION_DATE],[LAST_UPDATED_BY],[LAST_UPDATE_DATE],REPLACE(REPLACE([COMMENTS], CHAR(13),' '), CHAR(10),' ') FROM [dbo].[ENTITY] where LAST_UPDATE_DATE >= '@{formatDateTime(pipeline().parameters.startDate,'yyyy-MM-dd')}'",
                                        "type": "Expression"
                                    }
                                },
                                "sink": {
                                    "type": "AzureDataLakeStoreSink"
                                },
                                "enableStaging": false,
                                "dataIntegrationUnits": 0
                            },
                            "inputs": [
                                {
                                    "referenceName": "DS_IN_SQL_IDM_ENTITY",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "DS_OUT_ADLS_RAW_IDM_ENTITY",
                                    "type": "DatasetReference"
                                }
                            ]
                        }
                    ],
                    "ifTrueActivities": [
                        {
                            "name": "copy_ENTITY",
                            "description": "Copies ENTITY from IDM Database to ADLS Raw Zone",
                            "type": "Copy",
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 60,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "SqlSource",
                                    "sqlReaderQuery": {
                                        "value": "SELECT [ENTITY_ID],[ENTITY_TYPE_ID],REPLACE(REPLACE([SHORT_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_DESCRIPTION], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([STATUS], CHAR(13),' '), CHAR(10),' '),[STATUS_DATE],[REVISION_ID],[CREATED_BY],[CREATION_DATE],[LAST_UPDATED_BY],[LAST_UPDATE_DATE],REPLACE(REPLACE([COMMENTS], CHAR(13),' '), CHAR(10),' ') FROM [dbo].[ENTITY]",
                                        "type": "Expression"
                                    }
                                },
                                "sink": {
                                    "type": "AzureDataLakeStoreSink"
                                },
                                "enableStaging": false,
                                "dataIntegrationUnits": 0
                            },
                            "inputs": [
                                {
                                    "referenceName": "DS_IN_SQL_IDM_ENTITY",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "DS_OUT_ADLS_RAW_IDM_ENTITY",
                                    "type": "DatasetReference"
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "startDate": {
                "type": "String"
            }
        }
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

Trigger Source:

{
    "name": "TR_SCHED_0800EST",
    "properties": {
        "description": "Daily 0800 EST",
        "runtimeState": "Stopped",
        "pipelines": [
            {
                "pipelineReference": {
                    "referenceName": "PL_ADLS_RAW_IDMTables_DAILY",
                    "type": "PipelineReference"
                },
                "parameters": {
                    "startDate": "@trigger().scheduledTime"
                }
            }
        ],
        "type": "ScheduleTrigger",
        "typeProperties": {
            "recurrence": {
                "frequency": "Day",
                "interval": 1,
                "startTime": "2018-08-30T13:00:00Z",
                "timeZone": "UTC",
                "schedule": {
                    "minutes": [
                        0
                    ],
                    "hours": [
                        13
                    ]
                }
            }
        }
    }
}

EDIT

This works

enter image description here

This does not work

enter image description here

Upvotes: 0

Views: 1219

Answers (1)

Fang Liu
Fang Liu

Reputation: 2363

Passing @trigger().scheduledTime in as a parameter in the trigger to @{formatDateTime(pipeline().parameters.startDate,'yyyy-MM-dd')} in the where clause with a parameter in the pipeline defined as startDate of type string. is the right way.

When you edit the trigger in the UI, Make sure you pass value in the following way. enter image description here

Upvotes: 1

Related Questions