Reputation: 1118
I have created an Azure stream Analytics Job which will get input data from EventHub and write to cosmosDB and Blob.
I could see sometimes the data from eventHub is duplicating and as a result duplicate data will be written to cosmosDB and Blob storage.
A sample input data into Stream Analytics from EventHub is shown below.
[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"04XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00026XXX03",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]
In the above sample the event with idnum: 00086XXX02 is duplicating 3 times.
I am doing the below analysis and obtaining the output with duplicates.
temp AS (
SELECT
input.idnum AS IDNUM,
input.basetime AS BASETIME,
input.time AS TIME,
ROUND(input.sig1,5) AS SIG1,
flatArrayElement as SIG2,
udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
FROM [input01] as input
CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
WHERE GetArrayLength(input.sig2) >=1
),
SIGNALS AS (
SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3
)
--Insert SIG2 to COSMOS Container
SELECT
t.IDNUM,
t.BASETIME,
t.TIME,
t.SIG1,
t.SIG2.ArrayValue.id AS ID,
t.SIG2.ArrayValue.sig3 AS SIG3,
t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId
The output will be as below where duplicate events are present for "idnum":"00086XXX02"
[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"000000",
"id":61
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]
The Expected output will be events without duplicates (For the sample provided there shouldn't be duplicate events for "idnum":"00086XXX02")
Before writing the data into storage I want to remove the duplicate events. Is it possible to do from Stream Analytics?
Creating cosmos DB collection with Unique ID is a solution from Cosmos end, But here the table alreay exists and can we do anything from Stream Analytics end ?
Upvotes: 1
Views: 1842
Reputation: 118
You can use Distinct to remove duplicate events. There is online documentation available: https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#remove-duplicate-events-in-a-window
Example:
With Temp AS (
SELECT
COUNT(DISTINCT Time) AS CountTime,
Value,
DeviceId
FROM Input TIMESTAMP BY Time
GROUP BY Value, DeviceId, SYSTEM.TIMESTAMP()
)
SELECT
AVG(Value) AS AverageValue,
DeviceId
INTO Output
FROM Temp
GROUP BY DeviceId,TumblingWindow(minute, 5)
Upvotes: 0
Reputation: 23792
I simplify your test sql as below:
with t AS (
SELECT
flatArrayElement as SIG2
FROM fromblob as input
CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
WHERE GetArrayLength(input.sig2) >=1
)
SELECT
t.SIG2.ArrayValue.id AS ID,
t.SIG2.ArrayValue.sig3 AS SIG3
FROM t PARTITION BY PartitionId
And it produces duplicate data because of GetArrayElements()
method which is normal i think.The array is split, the result should be duplicated definitely.
Based on my experience and my findings(plus this feedback),there is no distinct method in ASA. The reason i think is that ASA processes real-time stream data,not static data something like SQL table.It can't judge whether the data is duplicate during per time unit.
Combined with the last cosmos db case(How to find Duplicate documents in Cosmos DB), i think the key point of solution is getting the root cause : why the event hub processes duplicate source data. Surely,you could set a cosmos db trigger to prevent the duplicate data from streaming into db.But i think it's not an effective way.
Upvotes: 0