Reputation: 520
In our application we obtain the field values as columns using Spark sql. Im' trying to figure out how to put the columns values to nested json object and push to Elasticsearch. Also is there a way to parameterise values in selectExpr
to pass to the regex?
We are currently using the Spark Java API.
Dataset<Row> data = rowExtracted.selectExpr("split(value,\"[|]\")[0] as channelId",
"split(value,\"[|]\")[1] as country",
"split(value,\"[|]\")[2] as product",
"split(value,\"[|]\")[3] as sourceId",
"split(value,\"[|]\")[4] as systemId",
"split(value,\"[|]\")[5] as destinationId",
"split(value,\"[|]\")[6] as batchId",
"split(value,\"[|]\")[7] as orgId",
"split(value,\"[|]\")[8] as businessId",
"split(value,\"[|]\")[9] as orgAccountId",
"split(value,\"[|]\")[10] as orgBankCode",
"split(value,\"[|]\")[11] as beneAccountId",
"split(value,\"[|]\")[12] as beneBankId",
"split(value,\"[|]\")[13] as currencyCode",
"split(value,\"[|]\")[14] as amount",
"split(value,\"[|]\")[15] as processingDate",
"split(value,\"[|]\")[16] as status",
"split(value,\"[|]\")[17] as rejectCode",
"split(value,\"[|]\")[18] as stageId",
"split(value,\"[|]\")[19] as stageStatus",
"split(value,\"[|]\")[20] as stageUpdatedTime",
"split(value,\"[|]\")[21] as receivedTime",
"split(value,\"[|]\")[22] as sendTime"
);
StreamingQuery query = data.writeStream()
.outputMode(OutputMode.Append()).format("es").option("checkpointLocation", "C:\\checkpoint")
.start("spark_index/doc")
Actual output:
{
"_index": "spark_index",
"_type": "doc",
"_id": "test123",
"_version": 1,
"_score": 1,
"_source": {
"channelId": "test",
"country": "SG",
"product": "test",
"sourceId": "",
"systemId": "test123",
"destinationId": "",
"batchId": "",
"orgId": "test",
"businessId": "test",
"orgAccountId": "test",
"orgBankCode": "",
"beneAccountId": "test",
"beneBankId": "test",
"currencyCode": "SGD",
"amount": "53.0000",
"processingDate": "",
"status": "Pending",
"rejectCode": "test",
"stageId": "123",
"stageStatus": "Comment",
"stageUpdatedTime": "2019-08-05 18:11:05.999000",
"receivedTime": "2019-08-05 18:10:12.701000",
"sendTime": "2019-08-05 18:11:06.003000"
}
}
We need the above columns under a node "txn_summary" such as the below json:
Expected output:
{
"_index": "spark_index",
"_type": "doc",
"_id": "test123",
"_version": 1,
"_score": 1,
"_source": {
"txn_summary": {
"channelId": "test",
"country": "SG",
"product": "test",
"sourceId": "",
"systemId": "test123",
"destinationId": "",
"batchId": "",
"orgId": "test",
"businessId": "test",
"orgAccountId": "test",
"orgBankCode": "",
"beneAccountId": "test",
"beneBankId": "test",
"currencyCode": "SGD",
"amount": "53.0000",
"processingDate": "",
"status": "Pending",
"rejectCode": "test",
"stageId": "123",
"stageStatus": "Comment",
"stageUpdatedTime": "2019-08-05 18:11:05.999000",
"receivedTime": "2019-08-05 18:10:12.701000",
"sendTime": "2019-08-05 18:11:06.003000"
}
}
}
Upvotes: 2
Views: 644
Reputation: 28392
Adding all columns to a top level struct should give the expected output. In Scala:
import org.apache.spark.sql.functions._
data.select(struct(data.columns:_*).as("txn_summary"))
In Java I would suspect it it would be:
import org.apache.spark.sql.functions.struct;
data.select(struct(data.columns()).as("txn_summary"));
Upvotes: 1