user4446237
user4446237

Reputation: 646

Azure stream analytics array_agg equivalent?

Is there a way to do the postgres equivalent of array_agg or string_agg in stream analytics? I have data that comes in every few seconds, and would like to get the count of the values within a time frame.

Data:

{time:12:01:01,name:A,location:X,value:10}
{time:12:01:01,name:B,location:X,value:9}
{time:12:01:02,name:C,location:Y,value:5}
{time:12:01:02,name:B,location:Y,value:4}
{time:12:01:03,name:B,location:Z,value:2}
{time:12:01:03,name:A,location:Z,value:3}
{time:12:01:06,name:B,location:Z,value:4}
{time:12:01:06,name:C,location:Z,value:7}
{time:12:01:08,name:B,location:Y,value:1}
{time:12:01:13,name:B,location:X,value:8}

With a sliding window of 2 seconds, I want to group the data to see the following:

12:01:01, 2 events, 9.5 avg, 2 distinct names, 1 distinct location, nameA:1, nameB:1, locationX:1
12:01:02, 4 events, 7 avg, 3 distinct names, 2 distinct location, nameA:1, nameB:2,nameC:1,locationX:1,locationY:1
12:01:03...
12:01:06...
...

I can get the number of events, average, and distinct counts without issue. I use a window as well as a with statement to join on the timestamp to get the aggregated counts for that timestamp. I am having trouble figuring out how to get the total counts by name and location, mostly because I do not know how to aggregate strings in Azure.

with agg1 as (
select system.timestamp as start,
avg(value) as avg,
count(1) as events,
count(distinct name) as distinct names,
count(distinct location) as distinct location
from input timestamp by created
group by slidingwindow(second,2)
),
agg2 as (
select agg2_inner.start,
array_agg(name,'|',ct_name) as countbyname (????)
from (
    select system.timestamp as start,
    name, count(1) as ct_name
    from input timestamp by created
    group by slidingwindow(second,2), name
) as agg2_inner
group by agg2_inner.start, slidingwindow(seconds,2)
)

select * from agg1 join agg2 on (datediff(second,agg1,agg2) between 0 and 2 
and agg1.start = agg2.start)

There is not set list of names, locations so the query needs to be a bit dynamic. It is ok if the counts are in an object within a single query, a process later on can parse to get individual counts.

Upvotes: 0

Views: 355

Answers (1)

Brando Zhang
Brando Zhang

Reputation: 28397

As far as I know, azure stream analysis doesn't provide the array_agg method. But it provides Collect method which could return the all record values from the window.

I suggest you could use Collect method firstly return the array which grouped by the time and window.

Then you could use Azure Stream Analytics JavaScript user-defined functions to write your own logic to convert the array to the result.

More details, you could refer to below sample:

The query like this:

SELECT
     time, udf.yourunfname(COLLECT()) as Result
INTO
    [YourOutputAlias]
FROM
    [YourInputAlias]
Group by time, TumblingWindow(minute, 10)

The UDF is like this:

I just return the avg and the event length.

function main(InputJSON) {
      var sum = 0;
      for (i = 0; i < InputJSON.length; i++) {
             sum += InputJSON[i].value;

      }
    var result = {events:InputJSON.length,avg:sum/InputJSON.length };

     return result;
}

Data:

{"name": "A", "time":"12:01:01","value":10}

{"name": "B", "time":"12:01:01","value":9}

{"name": "C", "time":"12:01:02","value":10}

Result:

enter image description here

Upvotes: 1

Related Questions