filip
filip

Reputation: 1513

Stream analytics - How to handle json in reference input

I have an Azure Stream Analytics (ASA) job which processes device telemetry data from event hub. The stream should be joined with reference data from a sql table, to enhance each message with additional device meta data. The merged entry should be stored in CosmosDb.

The sql database to serve the device metadata:

CREATE TABLE [dbo].[MyTable]
(
  [DeviceId] NVARCHAR(20) NOT NULL PRIMARY KEY, 
  [MetaData] NVARCHAR(MAX) NULL   /* this stores json, which can vary per record */
)

In ASA I have configured the reference data input with a simple query:

SELECT DeviceId, JSON_QUERY(MetaData) FROM [dbo].[MyTable]

And I have the main ASA query that performs the join:

WITH temptable AS (
SELECT * FROM [telemetry-input] TD PARTITION BY PartitionId
LEFT OUTER JOIN [metadata-input] MD
ON TD.DeviceId = MD.DeviceId
)

SELECT TD.*, MD.MetaData 
INTO [cosmos-db-output] 
FROM temptable PARTITION BY PartitionId

It all works and merged data gets stored in CosmosDb. However, the value of the Metadata column from sql is treated as string, and stored in comos with quotes and escape chars. Example:

{ "DeviceId" : "abc1234", … , "MetaData" : "{ \"TestKey\": \"test value\" }" };

Is there a way to handle & store the json from Metadata as a proper Json object i.e.

{ "DeviceId" : "abc1234", … , "MetaData" : { "TestKey": "test value" } };

Upvotes: 2

Views: 1378

Answers (2)

filip
filip

Reputation: 1513

I found the way to achieve it in ASA - you need to create javascript user function:

function parseJson(strjson){
          return JSON.parse(strjson);
}

And call it in your query:

...
SELECT TD.*, udf.parseJson(MD.MetaData)
...

Upvotes: 3

Jay Gong
Jay Gong

Reputation: 23792

As you mentioned in your question,the reference json data is treated as json string, not json object. Based on my researching on the Query Syntax in ASA, there is no built-in function to convert that.

However, I'd suggest you using Azure Function Cosmos DB Trigger to process every document which is created. Please refer to my function code:

using System;
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json.Linq;

namespace ProcessJson
{
    public class Class1
    {
        [FunctionName("DocumentUpdates")]
        public static void Run(
        [CosmosDBTrigger(databaseName:"db",collectionName: "item", ConnectionStringSetting = "CosmosDBConnection",LeaseCollectionName = "leases",
            CreateLeaseCollectionIfNotExists = true)]
        IReadOnlyList<Document> documents,
        TraceWriter log)
        {
            log.Verbose("Start.........");
            String endpointUrl = "https://***.documents.azure.com:443/";
            String authorizationKey = "***";
            String databaseId = "db";
            String collectionId = "import";

            DocumentClient client = new DocumentClient(new Uri(endpointUrl), authorizationKey);

            for (int i = 0; i < documents.Count; i++)
            {
                Document doc = documents[i];
                if((doc.alreadyFormat == Undefined.Value) ||(!doc.alreadyFormat)){
                   String MetaData = doc.GetPropertyValue<String>("MetaData");
                   JObject o = JObject.Parse(MetaData);

                   doc.SetPropertyValue("MetaData", o);
                   doc.SetPropertyValue("alreadyFormat", true);
                   client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(databaseId, collectionId, doc.Id), doc); 

                   log.Verbose("Update document Id " + doc.Id);

                }

            }
        }
    }
}

In addition, please refer to the case: Azure Cosmos DB SQL - how to unescape inner json property

Upvotes: 2

Related Questions