user3840810
user3840810

Reputation: 9

SPARK - Joining two data streams - maintenance of cache

It is evident that the out of box join capability in spark streaming does not warrent a lot of real life use cases. The reason being it joins only the data contained in the micro batch RDDs.

Use case is to join data from two kafka streams and enrich each object in stream1 with it's corresponding object in stream2 in spark and save it to HBase.

Implementation would

This question is on exploration of Spark streaming and it's API to find a way to implement the above mentioned.

Upvotes: 0

Views: 1426

Answers (2)

sgvd
sgvd

Reputation: 3939

A good start would be to look into mapWithState. This is a more efficient replacement for updateStateByKey. These are defined on PairDStreamFunction, so assuming your objects of type V in stream2 are identified by some key of type K, your first point would go like this:

def stream2: DStream[(K, V)] = ???

def maintainStream2Objects(key: K, value: Option[V], state: State[V]): (K, V) = {
  value.foreach(state.update(_))
  (key, state.get())
}

val spec = StateSpec.function(maintainStream2Objects)

val stream2State = stream2.mapWithState(spec)

stream2State is now a stream where each batch contains the (K, V) pairs with the latest value seen for each key. You can do a join on this stream and stream1 to perform the further logic for your second point.

Upvotes: 0

David Griffin
David Griffin

Reputation: 13927

You can join the incoming RDDs to other RDDs -- not just the ones in that micro-batch. Basically you keep a "running total" RDD that you fill something like:

var globalRDD1: RDD[...] = sc.emptyRDD
var globalRDD2: RDD[...] = sc.emptyRDD

dstream1.foreachRDD(rdd => if (!rdd.isEmpty) globalRDD1 = globalRDD1.union(rdd))
dstream2.foreachRDD(rdd => if (!rdd.isEmpty) {
  globalRDD2 = globalRDD2.union(rdd))
  globalRDD1.join(globalRDD2).foreach(...) // etc, etc
}

Upvotes: 1

Related Questions