abs786123
abs786123

Reputation: 609

Azure Data Factory querying Mongo DB to filter on Dates

I am querying a mongoDB from ADFv2 and I am trying to filter the data based on dates. However the date appears in a random string of numbers. Just wondering if there there is a way to filter the data dynamically in the source of the copy task as I am trying to load the data in an incremental fashion.

    [
  {
    "_id": {
      "$oid": "5d5123dc8cf0b4453ceb2088"
    },
    "TemplateId": "5d3ac5c77eb20a2cf4bdf46a",
    "LastUpdate": {
      "$date": 1565603495299
    },
    "Answers": [
      {
        "Question": "Q001c",
        "Answer": {
          "_t": "System.String[]",
          "_v": [
            "0,In person"
          ]
        }
      },
      {
        "Question": "Q001a",
        "Answer": {
          "_t": "System.String[]",
          "_v": [
            "0,Yes"
          ]
        }
      },
      {
        "Question": "Q003a",
        "Answer": {
          "_t": "System.String[]",
          "_v": [
            "0,Yes"
          ]
        }
      },
      {
        "Question": "Q006a",
        "Answer": {
          "_t": "System.String[]",
          "_v": [
            "1,No"
          ]

The SInk/Destination is into a SQL database. Any help will be greatly appreciated.

Upvotes: 1

Views: 1170

Answers (1)

Aswin
Aswin

Reputation: 7126

I repro'd the incremental loading of data from MongoDb to SQL DB using ADF. As @Nick.Mc.Dermaid suggested, max of timestamp value should be given as watermark value for incremental data load. Approach that followed here is to copy all the data with date greater than watermark value and to update the watermark value with max of date. So that in next pipeline run, delta data will be copied. Below are the detailed steps.

  • In MongoDB API, two documents are inserted into a container. enter image description here

  • Then In SQL database, Watermark table is created as in below image with watermark value =1000000000000. enter image description here The value for watermark is set as above, so that in first run all data from source gets loaded into sink.

  • A stored procedure is written in SQL database to update the watermark table with latest date value.

create proc usp_update_watermark_table as
begin
update watermark
set watermarkvalue=(select max(LastUpdate) from tgt_table)
end
  • In ADF, Lookup activity is taken and watermark table is referred in that. select WatermarkValue from watermark

enter image description here

  • Copy activity is taken next to lookup activity. In Source dataset, MongoDB API is taken and in filter , below expression is given to copy the data that are greater than the value from lookup activity.
{"LastUpdate":{$gt:@{activity('LookupLastWaterMark').output.firstRow.WatermarkValue}}}

enter image description here

  • Stored procedure activity is added next to copy activity. So that new value gets updated in watermark table. enter image description here

  • After pipeline run, the output table and watermark table is updated. enter image description here

  • New Document is added in source enter image description here

  • When pipeline is triggered again, only delta record got loaded to sink.

enter image description here

enter image description here

Upvotes: 2

Related Questions