user7615505
user7615505

Reputation: 81

Process binary data in Spark Structured Streaming

I am using Kafka and Spark Structured Streaming. I am receiving kafka messages in following format.

{"deviceId":"001","sNo":1,"data":"aaaaa"}
{"deviceId":"002","sNo":1,"data":"bbbbb"}
{"deviceId":"001","sNo":2,"data":"ccccc"}
{"deviceId":"002","sNo":2,"data":"ddddd"}

I am reading it like below.

Dataset<String> data = spark
      .readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as(Encoders.STRING());
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo"); 
ds.foreach(event -> 
      processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes())
);}

private void processData(String deviceId,int SNo, byte[] data) 
{
  //How to check previous processed Dataset???
} 

In my json message "data" is String form of byte[]. I have a requirement where I need to process the binary "data" for given "deviceId" in order of "sNo". So for "deviceId"="001", I have to process the binary data for "sNo"=1 and then "sNo"=2 and so on. How can I check state of previous processed Dataset in Structured Streaming?

Upvotes: 1

Views: 1541

Answers (1)

abaghel
abaghel

Reputation: 15297

If you are looking for state management like DStream.mapWithState then it is not supported yet in Structured Streaming. Work is in progress. Please Check https://issues.apache.org/jira/browse/SPARK-19067.

Upvotes: 1

Related Questions