Max Tet
Max Tet

Reputation: 755

Aggregate over batches of records

I would like to aggregate stream analytics records in batches of n records each with the following conditions:

I have not found a way to achieve this with windowing functions (since they are time-based, not count-based). Also functions like CollectTOP would not work, since it is evaluated for each record, not for each batch.

Upvotes: 2

Views: 152

Answers (1)

Florian Eiden
Florian Eiden

Reputation: 842

As mentioned in the comments, Azure Stream Analytics doesn't have a concept of event count.

To be able to regroup events per batch of certain size, the first step is to rank them (CollectTop). ASA, like any stream processor, will need a time window to define that rank.

This doesn't match the requirement, also clarified in the comments. Sorry but it looks like ASA is not a solution here.

If you could tolerate a time window - meaning to tolerate the delay (records are output at the end of the window) and to tolerate incomplete batches (most time windows won't have a multiple of n events) - then this could be partially achieved via a multi step query.

With this input file in input, and this query:

WITH Ranking AS (
    SELECT
        System.Timestamp() as ts,
        COLLECTTOP(1000000) OVER (ORDER BY EntryTime, TollId) as c
    FROM [entry] AS e TIMESTAMP BY EntryTime
    GROUP BY TumblingWindow(minute,5)
),
Unfolding AS (
    SELECT
        System.Timestamp() AS windowEnd,
        FLOOR((d.ArrayValue.rank-1) /3.0) as batch3,
        d.ArrayValue.rank,
        d.ArrayValue.value.*
    FROM Ranking AS r
    CROSS APPLY GetElements(r.c) AS d
),
Batching AS (
    SELECT
        System.Timestamp() as ts,
        Collect() as batch
    FROM Unfolding
    GROUP BY System.Timestamp, batch3

)
SELECT * FROM Batching

We will get:

{"ts":"2014-09-10T12:05:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:01:00.0000000Z","TollId":1,"LicensePlate":"JNB 7001","State":"NY","Make":"Honda","Model":"CRV","VehicleType":1,"VehicleWeight":0,"Toll":7,"Tag":null},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:02:00.0000000Z","TollId":1,"LicensePlate":"YXZ 1001","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":123456789},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:02:00.0000000Z","TollId":3,"LicensePlate":"ABC 1004","State":"CT","Make":"Ford","Model":"Taurus","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":456789123}]}
{"ts":"2014-09-10T12:05:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:03:00.0000000Z","TollId":1,"LicensePlate":"BNJ 1007","State":"NY","Make":"Honda","Model":"CRV","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":789123456},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:03:00.0000000Z","TollId":2,"LicensePlate":"XYZ 1003","State":"CT","Make":"Toyota","Model":"Corolla","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null},{"windowEnd":"2014-09-10T12:05:00.0000000Z","batch3":1.0,"rank":6,"EntryTime":"2014-09-10T12:05:00.0000000Z","TollId":2,"LicensePlate":"CDE 1007","State":"NJ","Make":"Toyota","Model":"4x4","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":321987654}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:06:00.0000000Z","TollId":2,"LicensePlate":"BAC 1005","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":567891234},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:07:00.0000000Z","TollId":1,"LicensePlate":"ZYX 1002","State":"NY","Make":"Honda","Model":"Accord","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":234567891},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:07:00.0000000Z","TollId":2,"LicensePlate":"ZXY 1001","State":"PA","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":987654321}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:08:00.0000000Z","TollId":3,"LicensePlate":"CBA 1008","State":"PA","Make":"Ford","Model":"Mustang","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":891234567},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":2,"LicensePlate":"CDB 1003","State":"PA","Make":"Volvo","Model":"C30","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":765432198},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":1.0,"rank":5,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":2,"LicensePlate":"DCB 1004","State":"NY","Make":"Volvo","Model":"S80","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":654321987}]}
{"ts":"2014-09-10T12:10:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":7,"EntryTime":"2014-09-10T12:09:00.0000000Z","TollId":3,"LicensePlate":"YZX 1009","State":"NY","Make":"Volvo","Model":"V70","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":912345678},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":8,"EntryTime":"2014-09-10T12:10:00.0000000Z","TollId":1,"LicensePlate":"CBD 1005","State":"NY","Make":"Toyota","Model":"Camry","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":543219876},{"windowEnd":"2014-09-10T12:10:00.0000000Z","batch3":2.0,"rank":9,"EntryTime":"2014-09-10T12:10:00.0000000Z","TollId":3,"LicensePlate":"BCD 1002","State":"NY","Make":"Toyota","Model":"Rav4","VehicleType":1,"VehicleWeight":0,"Toll":5.5,"Tag":876543219}]}
{"ts":"2014-09-10T12:15:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:11:00.0000000Z","TollId":1,"LicensePlate":"NJB 1006","State":"CT","Make":"Ford","Model":"Focus","VehicleType":1,"VehicleWeight":0,"Toll":4.5,"Tag":678912345},{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:12:00.0000000Z","TollId":3,"LicensePlate":"PAC 1209","State":"NJ","Make":"Chevy","Model":"Malibu","VehicleType":1,"VehicleWeight":0,"Toll":6,"Tag":219876543},{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:15:00.0000000Z","TollId":2,"LicensePlate":"BAC 1005","State":"PA","Make":"Peterbilt","Model":"389","VehicleType":2,"VehicleWeight":2.675,"Toll":15.5,"Tag":567891234}]}
{"ts":"2014-09-10T12:15:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:15:00.0000000Z","batch3":1.0,"rank":4,"EntryTime":"2014-09-10T12:15:00.0000000Z","TollId":3,"LicensePlate":"EDC 3109","State":"NJ","Make":"Ford","Model":"Focus","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":198765432}]}
{"ts":"2014-09-10T12:20:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:18:00.0000000Z","TollId":2,"LicensePlate":"DEC 1008","State":"NY","Make":"Toyota","Model":"Corolla","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null},{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":2,"EntryTime":"2014-09-10T12:20:00.0000000Z","TollId":1,"LicensePlate":"DBC 1006","State":"NY","Make":"Honda","Model":"Civic","VehicleType":1,"VehicleWeight":0,"Toll":5,"Tag":432198765},{"windowEnd":"2014-09-10T12:20:00.0000000Z","batch3":0.0,"rank":3,"EntryTime":"2014-09-10T12:20:00.0000000Z","TollId":2,"LicensePlate":"APC 2019","State":"NJ","Make":"Honda","Model":"Civic","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":345678912}]}
{"ts":"2014-09-10T12:25:00.0000000Z","batch":[{"windowEnd":"2014-09-10T12:25:00.0000000Z","batch3":0.0,"rank":1,"EntryTime":"2014-09-10T12:22:00.0000000Z","TollId":1,"LicensePlate":"EDC 1019","State":"NJ","Make":"Honda","Model":"Accord","VehicleType":1,"VehicleWeight":0,"Toll":4,"Tag":null}]}

Which guarantee that each batch won't have more than N=3 events here. But we enforce that guarantee on a 5 minutes window. When there's less records in a time window, we will get a batch with less than N records.

Upvotes: 2

Related Questions