chrischu
chrischu

Reputation: 3117

Why does Azure Data Factory seemingly insist on inserting DateTimes as string?

I'm trying to set up an Azure Data Factory to copy and denormalize my data from a AzureSQL database to another AzureSQL database for reporting/BI purposes with a data flow, but I ran into a problem with inserting dates.

This is the definition of my dataflow.

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable2",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "script": "\n\nsource(output(\n\t\tBucketId as string,\n\t\tStreamId as string,\n\t\tStreamIdOriginal as string,\n\t\tStreamRevision as integer,\n\t\tItems as integer,\n\t\tCommitId as string,\n\t\tCommitSequence as integer,\n\t\tCommitStamp as timestamp,\n\t\tCheckpointNumber as long,\n\t\tDispatched as boolean,\n\t\tHeaders as binary,\n\t\tPayload as binary\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\tmapColumn(\n\t\tBucketId,\n\t\tCommitStamp\n\t)) ~> sink1"
        }
    }
}

and these are the definitions of my source

{
    "name": "AzureSqlTable1",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Source_Test",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "BucketId",
                "type": "varchar"
            },
            {
                "name": "StreamId",
                "type": "char"
            },
            {
                "name": "StreamIdOriginal",
                "type": "nvarchar"
            },
            {
                "name": "StreamRevision",
                "type": "int",
                "precision": 10
            },
            {
                "name": "Items",
                "type": "tinyint",
                "precision": 3
            },
            {
                "name": "CommitId",
                "type": "uniqueidentifier"
            },
            {
                "name": "CommitSequence",
                "type": "int",
                "precision": 10
            },
            {
                "name": "CommitStamp",
                "type": "datetime2",
                "scale": 7
            },
            {
                "name": "CheckpointNumber",
                "type": "bigint",
                "precision": 19
            },
            {
                "name": "Dispatched",
                "type": "bit"
            },
            {
                "name": "Headers",
                "type": "varbinary"
            },
            {
                "name": "Payload",
                "type": "varbinary"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[Commits]"
        }
    }
}

and sink data sets

{
    "name": "AzureSqlTable2",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Dest_Test",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [],
        "typeProperties": {
            "tableName": "dbo.Test2"
        }
    }
}

When running my pipeline with the data flow I get the following error:

Activity dataflow1 failed: DF-EXEC-1 Conversion failed when converting date and/or time from character string.
com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
    at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:256)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:108)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:28)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1611)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$200(SQLServerBulkCopy.java:58)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:709)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:739)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1684)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:669)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:124)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:459)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1401)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

My Azure SQL audit log shows the following statement that failed (which is not a huge surprise considering that it uses VARCHAR(50) as type for [CommitStamp]:

INSERT BULK dbo.T_301fcb5e4a4148d4a48f2943011b2f04 (
  [BucketId] NVARCHAR(MAX), 
  [CommitStamp] VARCHAR(50), 
  [StreamId] NVARCHAR(MAX), 
  [StreamIdOriginal] NVARCHAR(MAX),
  [StreamRevision] INT,
  [Items] INT,
  [CommitId] NVARCHAR(MAX),
  [CommitSequence] INT, 
  [CheckpointNumber] BIGINT, 
  [Dispatched] BIT,
  [Headers] VARBINARY(MAX),
  [Payload] VARBINARY(MAX),
  [r8e440f7252bb401b9ead107597de6293] INT) 
with (ROWS_PER_BATCH = 4096, TABLOCK)

I have absolutely no idea why this occurs. It looks like the schema information is correct but somehow it seems the data factory/data flow wants to insert the CommitStamp as a string type.

As requested, the output from the data flow/code/plan view:



source(output(
        BucketId as string,
        StreamId as string,
        StreamIdOriginal as string,
        StreamRevision as integer,
        Items as integer,
        CommitId as string,
        CommitSequence as integer,
        CommitStamp as timestamp,
        CheckpointNumber as long,
        Dispatched as boolean,
        Headers as binary,
        Payload as binary
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    isolationLevel: 'READ_UNCOMMITTED',
    format: 'table',
    schemaName: '[dbo]',
    tableName: '[Commits]',
    store: 'sqlserver',
    server: 'sign2025-sqldata.database.windows.net',
    database: 'SignPath.Application',
    user: 'Sign2025Admin',
    password: '**********') ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false,
    format: 'table',
    deletable:false,
    insertable:true,
    updateable:false,
    upsertable:false,
    mapColumn(
        BucketId,
        CommitStamp
    ),
    schemaName: 'dbo',
    tableName: 'Test2',
    store: 'sqlserver',
    server: 'sign2025-sqldata.database.windows.net',
    database: 'SignPath.Reporting',
    user: 'Sign2025Admin',
    password: '**********') ~> sink1

Upvotes: 5

Views: 2216

Answers (2)

Leon Yue
Leon Yue

Reputation: 16401

I created a data flow to copy data from an Azure SQL database to another Azure SQL database. It succeeded to covert datatime2 to VARCHAR(50).

This is the definition of my dataflow:

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DestinationDataset_sto",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DestinationDataset_mex",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "script": "\n\nsource(output(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as timestamp\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(input(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false) ~> sink1"
        }
    }
}

The definitions of my source:

{
    "name": "DestinationDataset_sto",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase1",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "ID",
                "type": "int",
                "precision": 10
            },
            {
                "name": "tName",
                "type": "varchar"
            },
            {
                "name": "myTime",
                "type": "datetime2",
                "scale": 7
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[demo]"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

My sink settings:

{
    "name": "DestinationDataset_mex",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase1",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "ID",
                "type": "int",
                "precision": 10
            },
            {
                "name": "tName",
                "type": "varchar"
            },
            {
                "name": "myTime",
                "type": "varchar"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[demo1]"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

Here are my data flow steps. enter image description here

Step 1: Source settings: enter image description here enter image description here

Step 2: Sink settings: enter image description here enter image description here enter image description here enter image description here

Running succeeded: enter image description here

The table demo and demo1 almost have the same schema except the myTime.

My source table and it's data:

enter image description here

My sink table and the data copied from demo:

enter image description here

Data Flow plan:

source(output(
        ID as integer,
        tName as string,
        myTime as timestamp
    ),
    allowSchemaDrift: true,
    validateSchema: true,
    isolationLevel: 'SERIALIZABLE',
    format: 'table',
    schemaName: '[dbo]',
    tableName: '[demo]',
    store: 'sqlserver',
    server: '****.database.windows.net',
    database: '****',
    user: 'ServerAdmin',
    password: '**********') ~> source1
source1 sink(input(
        ID as integer,
        tName as string,
        myTime as string
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    format: 'table',
    deletable:false,
    insertable:true,
    updateable:false,
    upsertable:false,
    schemaName: '[dbo]',
    tableName: '[demo1]',
    store: 'sqlserver',
    server: '****.database.windows.net',
    database: '****',
    user: 'ServerAdmin',
    password: '**********') ~> sink1

Update1:

I create the sink table manually and found that:

Data Flow can convert datatime2 to VARCHAR()(maybe NVARCHAR()) , date ,datetimeoffset.

When I try the date type time, datetime, datetime2, smalldatetime, Data Flow always gives the error:

"message": "DF-EXEC-1 Conversion failed when converting date and/or time from character 

Update 2019-7-11:

I asked Azure Support for help and they replied me: this is a bug of Data Flow and there is no solution for now. enter image description here

Update 2019-7-12:

I tested with Azure Support and they conform this is a bug. Here is the new email: enter image description here

They also told me that the fix is already made and it will be deployed in next deployment train. This could be end of next week.

Hope this helps.

Upvotes: 1

Mark Kromer MSFT
Mark Kromer MSFT

Reputation: 3838

Looks like your Sink dataset defines myTime as a String:

sink(input( ID as integer, tName as string, myTime as string )

Can you change that to timestamp or Date, whichever you'd like to land it as?

Alternatively, you can land the data in a temporary staging table in SQL by setting "Recreate table" on the Sink and let ADF generate a new table definition on the fly using the data types of your mapped fields in the data flow.

Upvotes: 1

Related Questions