Tomtom
Tomtom

Reputation: 91

Spark Structured Streaming - compare two streams

I am using Kafka and Spark 2.1 Structured Streaming. I have two topics with data in json format eg:

topic 1:

{"id":"1","name":"tom"}
{"id":"2","name":"mark"}

topic 2:

{"name":"tom","age":"25"}
{"name":"mark","age:"35"}

I need to compare those two streams in Spark base on tag:name and when value is equal execute some additional definition/function.

How to use Spark Structured Streaming to do this ?

Thanks

Upvotes: 1

Views: 1209

Answers (4)

Ander
Ander

Reputation: 513

I faced a similar requirement some time ago: I had 2 streams which had to be "joined" together based on some criteria. What I used was a function called mapGroupsWithState.

What this functions does (in few words, more details on the reference below) is to take stream in the form of (K,V) and accumulate together its elements on a common state, based on the key of each pair. Then you have ways to tell Spark when the state is complete (according to your application), or even have a timeout for incomplete states.

Example based on your question:

  1. Read Kafka topics into a Spark Stream:

    val rawDataStream: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", "topic1,topic2") // Both topics on same stream!
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "true")
    .load()
    .selectExpr("CAST(value AS STRING) as jsonData") // Kafka sends bytes
    
  2. Do some operations on your data (I prefer SQL, but you can use the DataFrame API) to transform each element into a key-value pair:

    spark.sqlContext.udf.register("getKey", getKey) // You define this function; I'm assuming you will be using the name as key in your example.
    
    val keyPairsStream = rawDataStream
    .sql("getKey(jsonData) as ID, jsonData from rawData")
    .groupBy($"ID")
    
  3. Use the mapGroupsWithState function (I will show you the basic idea; you will have to define the myGrpFunct according to your needs):

    keyPairsStream
    .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(myGrpFunct)
    
  4. Thats it! If you implement myGrpFunct correctly, you will have one stream of merged data, which you can further transform, like the following:

["tom",{"id":"1","name":"tom"},{"name":"tom","age":"25"}]

["mark",{"id":"2","name":"mark"},{"name":"mark","age:"35"}]

Hope this helps!

An excellent explanation with some code snippets: http://asyncified.io/2017/07/30/exploring-stateful-streaming-with-spark-structured-streaming/

Upvotes: 0

Naman Agarwal
Naman Agarwal

Reputation: 654

I hope you got your solution. In case not, then you can try creating two KStreams from two topics and then join those KStreams and put joined data back to one topic. Now you can read the joined data as one DataFrame using Spark Structured Streaming. Now you'll be able to apply any transformations you want on the joined data. Since Structured streaming doesn't support join of two streaming DataFrames you can follow this approach to get the task done.

Upvotes: 0

maasg
maasg

Reputation: 37435

Following the current documentation (Spark 2.1.1)

Any kind of joins between two streaming Datasets are not yet supported.

ref: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

At this moment, I think you need to rely on Spark Streaming as proposed by @igodfried's answer.

Upvotes: 0

igodfried
igodfried

Reputation: 918

One method would be to transform both streams into (K,V) format. In your case this would probably take the form of (name, otherJSONData) See the Spark documentation for more information on joining streams and an example located here. Then do a join on both streams and then perform whatever function on the newly joined stream. If needed you can use map to return (K,(W,V)) to (K,V).

Upvotes: -1

Related Questions