Reputation: 609
I am querying a mongoDB from ADFv2 and I am trying to filter the data based on dates. However the date appears in a random string of numbers. Just wondering if there there is a way to filter the data dynamically in the source of the copy task as I am trying to load the data in an incremental fashion.
[
{
"_id": {
"$oid": "5d5123dc8cf0b4453ceb2088"
},
"TemplateId": "5d3ac5c77eb20a2cf4bdf46a",
"LastUpdate": {
"$date": 1565603495299
},
"Answers": [
{
"Question": "Q001c",
"Answer": {
"_t": "System.String[]",
"_v": [
"0,In person"
]
}
},
{
"Question": "Q001a",
"Answer": {
"_t": "System.String[]",
"_v": [
"0,Yes"
]
}
},
{
"Question": "Q003a",
"Answer": {
"_t": "System.String[]",
"_v": [
"0,Yes"
]
}
},
{
"Question": "Q006a",
"Answer": {
"_t": "System.String[]",
"_v": [
"1,No"
]
The SInk/Destination is into a SQL database. Any help will be greatly appreciated.
Upvotes: 1
Views: 1170
Reputation: 7126
I repro'd the incremental loading of data from MongoDb to SQL DB using ADF. As @Nick.Mc.Dermaid suggested, max of timestamp value should be given as watermark value for incremental data load. Approach that followed here is to copy all the data with date greater than watermark value and to update the watermark value with max of date. So that in next pipeline run, delta data will be copied. Below are the detailed steps.
In MongoDB API, two documents are inserted into a container.
Then In SQL database, Watermark table is created as in below image with watermark value =1000000000000
.
The value for watermark is set as above, so that in first run all data from source gets loaded into sink.
A stored procedure is written in SQL database to update the watermark table with latest date value.
create proc usp_update_watermark_table as
begin
update watermark
set watermarkvalue=(select max(LastUpdate) from tgt_table)
end
select WatermarkValue from watermark
{"LastUpdate":{$gt:@{activity('LookupLastWaterMark').output.firstRow.WatermarkValue}}}
Stored procedure activity is added next to copy activity. So that new value gets updated in watermark table.
After pipeline run, the output table and watermark table is updated.
New Document is added in source
When pipeline is triggered again, only delta record got loaded to sink.
Upvotes: 2