Martin Brandl
Martin Brandl

Reputation: 59031

Azure Stream Analytics Query to consolidate two events

I try to consolidate two different events (EventB and EventC) that come from the same EventHub input. What I want to achieve is to output (Azure Function) a consolidated event (EventB + EventC) whenever a EventC is received.

This is how the events looks like:

{
    "EventB": {
        "Claim": {
            "EventAUri": "A/123",
            "Uri": "B/456"
        },
        "Metainfo": {
            "Color": "Green"
        }
    }   
}

and

{
    "EventC" : {
        "Claim": {
            "EventBUri": "B/456"
        },
        "Target": {
            "City": "Berlin",
            "Position": {
                "Latitude": 50.325096,
                "Longitude": 72.19710
            }
        }
    }
}

EventB will be sent only one time whereas EventC will be sent a few times per minute. The desired output for the above example would be:

    {
        "Claim": {
            "EventBUri": "B/456"
        },
        "Target": {
            "City": "Berlin",
            "Position": {
                "Latitude": 50.325096,
                "Longitude": 72.19710
            }
        },
        "BMetainfo": {
            "Color": "Green"
        }
    }

This is what I have tried so far:

WITH AllEvents AS (
    SELECT 
        *
    FROM
        ehubinput
),
EventB AS (
select
    EventB
From AllEvents
Where EventB Is Not NUll
),
EventC AS (
    select EventC
from AllEvents
Where EventC Is Not NUll
)

Select * From EventB 
 Inner Join EventC 
On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5 
AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri

Unfortunately, my code will output EventB + x(for each EventC) * EventC instead of EventB + Last EventC....

Can anyone help me with this?

Update:

This is my input.

This is my current output. (I only want the latest EventC combined together with EventB, not every event in the stream as I do it right now)

Upvotes: 0

Views: 441

Answers (1)

kgalic
kgalic

Reputation: 2664

I reproduced your case, and came up with the following query:

WITH AllEvents AS (
  SELECT 
    *
  FROM
  Input
),
EventB AS (
 select
 EventB
 From AllEvents
 Where EventB Is Not NUll
),
EventC AS (
  select EventC, EventC.Time
  from AllEvents
  Where EventC Is Not NUll
),
test as (
  Select *, EventC.* From EventB 
  Inner Join EventC 
  On DATEDIFF(day, EventB, EventC) BETWEEN 0 AND 5 
 AND EventB.EventB.Claim.Uri = EventC.EventC.Claim.EventBUri)


select topone() over (order by Time) from test  GROUP BY TumblingWindow(second, 10)   

For the series of the events, it will always return the last combination of (EventC, EventB) pair that matches. In case that this is not your expected output, could you please for the specified input above write the expected output?

I used VS2019 and Stream Analytics extension. I specified local inputs per your description above.

Update

The query has been updated. I noticed that only your last EventC in sample payload contains the property 'Time'. By having that property for each event C, and with the query above you would get 'Wasserburg' as a result.

Of course, output has to be formatted, but result in that case is correct.

Further update I played a bit more with this as I found it really interesting and came up with the following query that is conceptually different from the previous one, and I would say even more precise:

-- timestamp by how events are enqueued
WITH AllEvents AS (
    SELECT 
       Input
     FROM 
     Input timestamp by input.EventEnqueuedUtcTime  
    ),

-- get the last eventB, because only last eventB is relevant
EventB AS (
    select last(AllEvents.Input) over (limit duration(minute, 1)  when AllEvents.input.EventB Is Not NUll)  as EventB 
    From AllEvents 
 ),
 LastB as (select topone() over (order by EventB.Time) from EventB GROUP BY  slidingwindow(second, 60)),

 -- get the last eventC
 EventC AS (
    select last(AllEvents.Input) over (limit duration(minute, 1)  when AllEvents.input.EventC Is Not NUll)  as EventC 
    From AllEvents 
 ),
LastC as (select topone() over (order by EventC.Time) from EventC GROUP BY  slidingwindow(second, 60)),

-- create the result if the join between last EventB and last EventC exists
ResultJoin as (
   Select LastB.topone.*, LastC.topone.* From LastB 
   Inner Join LastC 
   On DATEDIFF(second, LastB, LastC) BETWEEN 0 AND 60 
   AND LastB.topone.EventB.EventB.Claim.Uri  = LastC.topone.EventC.EventC.Claim.EventBUri)

-- get the last event that is a pair of EventB,EventC
select topone() over (order by EventB.Time) into Output from ResultJoin  GROUP BY  slidingwindow(second, 60)

-- Just a cross-check what is the last event B
select * into Output1 from LastB

I used time window functions because you mentioned that events are arriving in a minute time-frame. So essentially, the idea is to extract the last B event and lastC event, and then the matching one to propagate to the output.

I tested it on the real event hub with event hub message publisher so that I can simulate the event flow, similar like in your example: enter image description here

And after that I watched the output locally to see if I would get the correct result after the last event:

enter image description here

Also, I added the time property to every event(B and C), as you can see it from the message simulator because that property is used to order the events in the query. Of course, you can replace that with some other property, like EventEnqueuedUtcTime or something similar.

I hope you would find one of these two different approaches useful for your final solution.

Upvotes: 1

Related Questions