Reputation: 15529
(I have little knowledge about batch spark, but none on spark streaming)
I have a kafka topics Kafka[(A,B)->X]
where (A,B)
is the key (A and B are simple numeric types) and X is the message type, relatively big (couple of Mb). Putting aside the problem of failure in input, the data is a grid: for each a
in A
, there will be messages (a,b)
for all b
in B
. Moreover, the b's are ordered and I think that we can assume that all messages for one a
will arrive following the b's order (what I know is that the topic is filled in this order).
Then I need to process the messages as follow:
(a,b)->x
, outputting (a,b)->y
aB->Seq[y]
where aB = {(a,b) for all b in B}
(and later there is a pass where messages need to be "transposed" to be processed across all a's, but that's not the question here)
How can I achieved such a merge of messages, from step 1 to step 2?
It looks like a groupby over the sub-key a
, but to my understanding the method groupby would be applied per micro-batch. What I need is, for each a
, to wait that all b
's are received (assume a simple counting system would work). Once again putting aside missing b and error in input data.
Without knowledge, I would try to see if such merging could be achieved by appending to a hdfs file, one for each a. And try to trigger a second stream process on those files once full. I.e. when it contains all b, move the file to an input directory for step 2. But:
Upvotes: 2
Views: 1368
Reputation: 13927
You can create a master RDD, and merge the micro RDDs generated by the stream to the master with RDD.union
. Something like:
var masterRDD: RDD[(Long,Long), String] = sc.emptyRDD // guessing on RDD type
myStream.foreachRDD(rdd => {
if (! rdd.isEmpty) {
masterRDD.union(rdd)
masterRDD.groupBy(...).....
}
})
You should take some time and read up on checkpointing as well, specifically:
Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.
Upvotes: 1