Gokulraj
Gokulraj

Reputation: 520

Column data to nested json object in Spark structured streaming

In our application we obtain the field values as columns using Spark sql. Im' trying to figure out how to put the columns values to nested json object and push to Elasticsearch. Also is there a way to parameterise values in selectExpr to pass to the regex?

We are currently using the Spark Java API.

Dataset<Row> data = rowExtracted.selectExpr("split(value,\"[|]\")[0] as channelId",
                "split(value,\"[|]\")[1] as country",
                "split(value,\"[|]\")[2] as product",
                "split(value,\"[|]\")[3] as sourceId",
                "split(value,\"[|]\")[4] as systemId",
                "split(value,\"[|]\")[5] as destinationId",
                "split(value,\"[|]\")[6] as batchId",
                "split(value,\"[|]\")[7] as orgId",
                "split(value,\"[|]\")[8] as businessId",
                "split(value,\"[|]\")[9] as orgAccountId",
                "split(value,\"[|]\")[10] as orgBankCode",
                "split(value,\"[|]\")[11] as beneAccountId",
                "split(value,\"[|]\")[12] as beneBankId",
                "split(value,\"[|]\")[13] as currencyCode",
                "split(value,\"[|]\")[14] as amount",
                "split(value,\"[|]\")[15] as processingDate",
                "split(value,\"[|]\")[16] as status",
                "split(value,\"[|]\")[17] as rejectCode",
                "split(value,\"[|]\")[18] as stageId",
                "split(value,\"[|]\")[19] as stageStatus",
                "split(value,\"[|]\")[20] as stageUpdatedTime",
                "split(value,\"[|]\")[21] as receivedTime",
                "split(value,\"[|]\")[22] as sendTime"
        );
StreamingQuery query = data.writeStream()
                .outputMode(OutputMode.Append()).format("es").option("checkpointLocation", "C:\\checkpoint")
                .start("spark_index/doc")

Actual output:

{
  "_index": "spark_index",
  "_type": "doc",
  "_id": "test123",
  "_version": 1,
  "_score": 1,
  "_source": {
    "channelId": "test",
    "country": "SG",
    "product": "test",
    "sourceId": "",
    "systemId": "test123",
    "destinationId": "",
    "batchId": "",
    "orgId": "test",
    "businessId": "test",
    "orgAccountId": "test",
    "orgBankCode": "",
    "beneAccountId": "test",
    "beneBankId": "test",
    "currencyCode": "SGD",
    "amount": "53.0000",
    "processingDate": "",
    "status": "Pending",
    "rejectCode": "test",
    "stageId": "123",
    "stageStatus": "Comment",
    "stageUpdatedTime": "2019-08-05 18:11:05.999000",
    "receivedTime": "2019-08-05 18:10:12.701000",
    "sendTime": "2019-08-05 18:11:06.003000"
  }
}

We need the above columns under a node "txn_summary" such as the below json:

Expected output:

{
  "_index": "spark_index",
  "_type": "doc",
  "_id": "test123",
  "_version": 1,
  "_score": 1,
  "_source": {
    "txn_summary": {
      "channelId": "test",
      "country": "SG",
      "product": "test",
      "sourceId": "",
      "systemId": "test123",
      "destinationId": "",
      "batchId": "",
      "orgId": "test",
      "businessId": "test",
      "orgAccountId": "test",
      "orgBankCode": "",
      "beneAccountId": "test",
      "beneBankId": "test",
      "currencyCode": "SGD",
      "amount": "53.0000",
      "processingDate": "",
      "status": "Pending",
      "rejectCode": "test",
      "stageId": "123",
      "stageStatus": "Comment",
      "stageUpdatedTime": "2019-08-05 18:11:05.999000",
      "receivedTime": "2019-08-05 18:10:12.701000",
      "sendTime": "2019-08-05 18:11:06.003000"
    }
  }
}

Upvotes: 2

Views: 644

Answers (1)

Shaido
Shaido

Reputation: 28392

Adding all columns to a top level struct should give the expected output. In Scala:

import org.apache.spark.sql.functions._

data.select(struct(data.columns:_*).as("txn_summary"))

In Java I would suspect it it would be:

import org.apache.spark.sql.functions.struct;

data.select(struct(data.columns()).as("txn_summary"));

Upvotes: 1

Related Questions