Reputation: 59031
I try to consolidate two different events (EventB and EventC) that come from the same EventHub input. What I want to achieve is to output (Azure Function) a consolidated event (EventB + EventC) whenever a EventC is received.
This is how the events looks like:
{
"EventB": {
"Claim": {
"EventAUri": "A/123",
"Uri": "B/456"
},
"Metainfo": {
"Color": "Green"
}
}
}
and
{
"EventC" : {
"Claim": {
"EventBUri": "B/456"
},
"Target": {
"City": "Berlin",
"Position": {
"Latitude": 50.325096,
"Longitude": 72.19710
}
}
}
}
EventB will be sent only one time whereas EventC will be sent a few times per minute. The desired output for the above example would be:
{
"Claim": {
"EventBUri": "B/456"
},
"Target": {
"City": "Berlin",
"Position": {
"Latitude": 50.325096,
"Longitude": 72.19710
}
},
"BMetainfo": {
"Color": "Green"
}
}
This is what I have tried so far:
WITH AllEvents AS (
SELECT
*
FROM
ehubinput
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC
from AllEvents
Where EventC Is Not NUll
)
Select * From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri
Unfortunately, my code will output EventB + x(for each EventC) * EventC instead of EventB + Last EventC....
Can anyone help me with this?
Update:
This is my current output. (I only want the latest EventC combined together with EventB, not every event in the stream as I do it right now)
Upvotes: 0
Views: 441
Reputation: 2664
I reproduced your case, and came up with the following query:
WITH AllEvents AS (
SELECT
*
FROM
Input
),
EventB AS (
select
EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
select EventC, EventC.Time
from AllEvents
Where EventC Is Not NUll
),
test as (
Select *, EventC.* From EventB
Inner Join EventC
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)
select topone() over (order by Time) from test GROUP BY TumblingWindow(second, 10)
For the series of the events, it will always return the last combination of (EventC, EventB) pair that matches. In case that this is not your expected output, could you please for the specified input above write the expected output?
I used VS2019 and Stream Analytics extension. I specified local inputs per your description above.
Update
The query has been updated. I noticed that only your last EventC in sample payload contains the property 'Time'. By having that property for each event C, and with the query above you would get 'Wasserburg' as a result.
Of course, output has to be formatted, but result in that case is correct.
Further update I played a bit more with this as I found it really interesting and came up with the following query that is conceptually different from the previous one, and I would say even more precise:
-- timestamp by how events are enqueued
WITH AllEvents AS (
SELECT
Input
FROM
Input timestamp by input.EventEnqueuedUtcTime
),
-- get the last eventB, because only last eventB is relevant
EventB AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventB Is Not NUll) as EventB
From AllEvents
),
LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY slidingwindow(second, 60)),
-- get the last eventC
EventC AS (
select last(AllEvents.Input) over (limit duration(minute, 1) when AllEvents.input.EventC Is Not NUll) as EventC
From AllEvents
),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY slidingwindow(second, 60)),
-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
Select LastB.topone.*, LastC.topone.* From LastB
Inner Join LastC
On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60
AND LastB.topone.EventB.EventB.Claim.Uri = LastC.topone.EventC.EventC.Claim.EventBUri)
-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin GROUP BY slidingwindow(second, 60)
-- Just a cross-check what is the last event B
select * into Output1 from LastB
I used time window functions because you mentioned that events are arriving in a minute time-frame. So essentially, the idea is to extract the last B event and lastC event, and then the matching one to propagate to the output.
I tested it on the real event hub with event hub message publisher so that I can simulate the event flow, similar like in your example:
And after that I watched the output locally to see if I would get the correct result after the last event:
Also, I added the time property to every event(B and C), as you can see it from the message simulator because that property is used to order the events in the query. Of course, you can replace that with some other property, like EventEnqueuedUtcTime or something similar.
I hope you would find one of these two different approaches useful for your final solution.
Upvotes: 1