Reputation: 9
It is evident that the out of box join capability in spark streaming does not warrent a lot of real life use cases. The reason being it joins only the data contained in the micro batch RDDs.
Use case is to join data from two kafka streams and enrich each object in stream1 with it's corresponding object in stream2 in spark and save it to HBase.
Implementation would
maintain a dataset in memory from objects from stream2, adding or replacing objects as and when they are recieved
for every element in stream1, access the cache to find a matching object from stream2, save to HBase if match is found or put it back on the kafka stream if not.
This question is on exploration of Spark streaming and it's API to find a way to implement the above mentioned.
Upvotes: 0
Views: 1426
Reputation: 3939
A good start would be to look into mapWithState
. This is a more efficient replacement for updateStateByKey
. These are defined on PairDStreamFunction
, so assuming your objects of type V
in stream2 are identified by some key of type K
, your first point would go like this:
def stream2: DStream[(K, V)] = ???
def maintainStream2Objects(key: K, value: Option[V], state: State[V]): (K, V) = {
value.foreach(state.update(_))
(key, state.get())
}
val spec = StateSpec.function(maintainStream2Objects)
val stream2State = stream2.mapWithState(spec)
stream2State
is now a stream where each batch contains the (K, V)
pairs with the latest value seen for each key. You can do a join on this stream and stream1
to perform the further logic for your second point.
Upvotes: 0
Reputation: 13927
You can join the incoming RDD
s to other RDD
s -- not just the ones in that micro-batch. Basically you keep a "running total" RDD
that you fill something like:
var globalRDD1: RDD[...] = sc.emptyRDD
var globalRDD2: RDD[...] = sc.emptyRDD
dstream1.foreachRDD(rdd => if (!rdd.isEmpty) globalRDD1 = globalRDD1.union(rdd))
dstream2.foreachRDD(rdd => if (!rdd.isEmpty) {
globalRDD2 = globalRDD2.union(rdd))
globalRDD1.join(globalRDD2).foreach(...) // etc, etc
}
Upvotes: 1