Toxic_Drop
Toxic_Drop

Reputation: 1

Azure Stream Analytics and JSON data

I have an event hub which im ingesting data from to put into a synapse table. The event data has multiple layers of nested JSON, and im trying to manipulate some of this data. e.g

{
    "eventName": "testEvent",
    "eventSentTimestamp": "2023-07-04T17:15:00.277Z",
    "eventCorrelationID": "428776db-ae8c-41cb-8b3d-1edcb2c5a25b",
    "payload": [
      {
        "eventAttributeName": "currentLocalTime",
        "value": "2023-07-01T12:34:57Z"
      },
      {
        "eventAttributeName": "softwareVersion",
        "value": "3.2.4"
      },
      {
        "eventAttributeName": "deviceGuid",
        "value": "AA-BB-CC-DD-11-22-33-44"
      },
      {
        "eventAttributeName": "connectionID",
        "value": 40
      },
      {
        "eventAttributeName": "connectionState",
        "value": "43"
      },
      {
        "eventAttributeName": "status",
        "value": "ONLINE"
      },
      {
        "eventAttributeName": "statusCode",
        "value": 3
      },
      {
        "eventAttributeName": "previousTime",
        "value": "2021-06-30T12:34:50Z"
      },
    ],
    "EventProcessedUtcTime": "2023-07-05T09:23:50.8441073Z",
    "PartitionId": 10,
    "EventEnqueuedUtcTime": "2023-07-04T17:14:03.3520000Z"
  }

the payload section has some system defined key/value pairs but i want to transform this so that the key is actually the value of the eventAttributeName and the value is the value of the key value...if this makes sense, so would look something like:

snippet.....

"payload": [
      {
        "currentLocalTime": "2023-07-01T12:34:57Z"
      },
      {
        "softwareVersion": "3.2.4"
      },
      {
        "deviceGuid": "AA-BB-CC-DD-11-22-33-44"
      }

have written a JS UDF which works ok in testing but when i write this to stream analytics query i get blank values output.

function extractKeyValuePairs(arr) {
  return arr.map((obj) => {
    const payload = obj.payload;
    const keyValuePairs = {};

    for (let key in payload) {
      if (payload.hasOwnProperty(key)) {
        const { eventAttributeName, value } = payload[key];
        keyValuePairs[eventAttributeName] = value;
      }
    }

    return keyValuePairs;
  });
}

Stream Query:

SELECT udf.extractKeyValuePairs(payload) FROM [eventHubAlias]

output(redacted):

[{"extractKeyValuePairs":[{},{},{},{},{},{},{},{},{},{}]},{"extractKeyValuePairs":[{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}]}]

Anyone able to see why it cant read the data? Im able ot refer to the values when i CROSS APPLY but cant see how to aggregate them to the array i need

Tried UDF, and also inline Queries. Expecting not blank key value pairs in the result.

Upvotes: 0

Views: 296

Answers (1)

Aswin
Aswin

Reputation: 7126

In order to aggregate the cross apply results to the array, you can use collect() aggregate function.

with stg as(
select concat('{',arrayElement.arrayvalue.eventAttributeName,':',arrayElement.arrayvalue.value,'}')  from InputStream as event
cross  apply  GetArrayElements(event.payload)  AS arrayElement
)
select  collect(concat) as Payload from stg group  by  TumblingWindow  (second,10)

This query uses the GetArrayElements function to extract the eventAttributeName and value pairs from the payload array, and then concatenate them into a JSON object. collect function is then used to aggregate the JSON objects into an array. This approach produces an output stream with an array of objects.

Tested with the sample input provided and below is the output.

Output:

[
  {
    "Payload": [
      "{currentLocalTime:2023-07-01T12:34:57Z}",
      "{softwareVersion:3.2.4}",
      "{deviceGuid:AA-BB-CC-DD-11-22-33-44}",
      "{connectionID:40}",
      "{connectionState:43}",
      "{status:ONLINE}",
      "{statusCode:3}",
      "{previousTime:2021-06-30T12:34:50Z}"
    ]
  }
]

enter image description here

Upvotes: 0

Related Questions