Reputation: 43
I try to migrate a pipeline that already exists in ADFV1 to ADFV2 and have some issues with the concept of triggers. My pipeline has two activiries, the first one is an Azure Data Lake Analytics activity and the second a copy activity. The first activity runs a usql script where data is read from partioned folder /{yyyy}/{MM}/{dd}/, process it and write in folder /{yyyy}-{MM}-{dd}/. Here are some JSON files from my factory (pipeline, trigger and datasets).
Pipeline:
{
"name": "StreamCompressionBlob2SQL",
"properties": {
"activities": [
{
"name": "compress",
"type": "DataLakeAnalyticsU-SQL",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"scriptPath": "d00044653/azure-configurations/usql-scripts/stream/compression.usql",
"scriptLinkedService": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"Year": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')",
"type": "Expression"
},
"Month": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'MM')",
"type": "Expression"
},
"Day": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'dd')",
"type": "Expression"
}
}
},
"linkedServiceName": {
"referenceName": "AzureDataLakeAnalytics1",
"type": "LinkedServiceReference"
}
},
{
"name": "Blob2SQL",
"type": "Copy",
"dependsOn": [
{
"activity": "compress",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000
},
"enableStaging": false,
"dataIntegrationUnits": 0,
"translator": {
"type": "TabularTranslator",
"columnMappings": {
"tag": "TAG",
"device_id": "DEVICE_ID",
"system_id": "SYSTEM_ID",
"utc": "UTC",
"ts": "TS",
"median": "MEDIAN",
"min": "MIN",
"max": "MAX",
"avg": "AVG",
"stdev": "STDEV",
"first_value": "FIRST_VALUE",
"last_value": "LAST_VALUE",
"message_count": "MESSAGE_COUNT"
}
}
},
"inputs": [
{
"referenceName": "AzureBlobDataset_COMPRESSED_ASA_v1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"type": "DatasetReference"
}
]
}
],
"parameters": {
"windowStartTime": {
"type": "String"
}
}
}
}
Trigger:
{
"name": "trigger1",
"properties": {
"runtimeState": "Started",
"pipelines": [
{
"pipelineReference": {
"referenceName": "StreamCompressionBlob2SQL",
"type": "PipelineReference"
},
"parameters": {
"windowStartTime": "@trigger().scheduledTime"
}
}
],
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Day",
"interval": 1,
"startTime": "2018-08-17T10:46:00.000Z",
"endTime": "2018-11-04T10:46:00.000Z",
"timeZone": "UTC"
}
}
}
}
Input Dataset for Copy Activity:
{
"name": "AzureBlobDataset_COMPRESSED_ASA_v1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"Year": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
},
"Month": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
},
"Day": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
}
},
"type": "AzureBlob",
"structure": [
{
"name": "tag",
"type": "String"
},
{
"name": "device_id",
"type": "String"
},
{
"name": "system_id",
"type": "String"
},
{
"name": "utc",
"type": "DateTime"
},
{
"name": "ts",
"type": "DateTime"
},
{
"name": "median",
"type": "Double"
},
{
"name": "min",
"type": "Double"
},
{
"name": "max",
"type": "Double"
},
{
"name": "avg",
"type": "Double"
},
{
"name": "stdev",
"type": "Double"
},
{
"name": "first_value",
"type": "Double"
},
{
"name": "last_value",
"type": "Double"
},
{
"name": "message_count",
"type": "Int16"
}
],
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ";",
"nullValue": "\\N",
"treatEmptyAsNull": true,
"skipLineCount": 0,
"firstRowAsHeader": true
},
"fileName": "",
"folderPath": {
"value": "@concat('d00044653/processed/stream/compressed',dataset().Year,'-',dataset().Month,'-',dataset().Day)",
"type": "Expression"
}
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
Output Dataset for Copy Activity:
{
"name": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureSqlDatabase1",
"type": "LinkedServiceReference"
},
"type": "AzureSqlTable",
"structure": [
{
"name": "TAG",
"type": "String"
},
{
"name": "DEVICE_ID",
"type": "String"
},
{
"name": "SYSTEM_ID",
"type": "String"
},
{
"name": "UTC",
"type": "DateTime"
},
{
"name": "TS",
"type": "DateTime"
},
{
"name": "MEDIAN",
"type": "Decimal"
},
{
"name": "MIN",
"type": "Decimal"
},
{
"name": "MAX",
"type": "Decimal"
},
{
"name": "AVG",
"type": "Decimal"
},
{
"name": "STDEV",
"type": "Decimal"
},
{
"name": "FIRST_VALUE",
"type": "Decimal"
},
{
"name": "LAST_VALUE",
"type": "Decimal"
},
{
"name": "MESSAGE_COUNT",
"type": "Int32"
}
],
"typeProperties": {
"tableName": "[dbo].[T_ASSET_MONITORING_WARM]"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
My problem is that after publishing nothing happens. Any suggestions??
Upvotes: 0
Views: 802
Reputation: 646
Schedule trigger do not support backfill scenario (based on your trigger definition - you are starting from August 17th 2018). In schedule trigger, pipeline runs can be executed only on time periods from the current time and the future.
In your case, for backfill scenarios use Tumbling window trigger.
Upvotes: 1