Juh_
Juh_

Reputation: 15529

Merging micro batches in Spark Streaming

(I have little knowledge about batch spark, but none on spark streaming)

Problem

I have a kafka topics Kafka[(A,B)->X] where (A,B) is the key (A and B are simple numeric types) and X is the message type, relatively big (couple of Mb). Putting aside the problem of failure in input, the data is a grid: for each a in A, there will be messages (a,b) for all b in B. Moreover, the b's are ordered and I think that we can assume that all messages for one a will arrive following the b's order (what I know is that the topic is filled in this order).

Then I need to process the messages as follow:

  1. a (couple of) function is applied on each message (a,b)->x, outputting (a,b)->y
  2. a function should be applied on the messages aB->Seq[y] where aB = {(a,b) for all b in B}

(and later there is a pass where messages need to be "transposed" to be processed across all a's, but that's not the question here)

Question

How can I achieved such a merge of messages, from step 1 to step 2?

It looks like a groupby over the sub-key a, but to my understanding the method groupby would be applied per micro-batch. What I need is, for each a, to wait that all b's are received (assume a simple counting system would work). Once again putting aside missing b and error in input data.

Some idea

Without knowledge, I would try to see if such merging could be achieved by appending to a hdfs file, one for each a. And try to trigger a second stream process on those files once full. I.e. when it contains all b, move the file to an input directory for step 2. But:

  1. I don't know if such appending can be implemented on hdfs
  2. Two sparkStreamingContext would need to run in parallel, one for each step. And that looks to be a problem (?).
  3. I understood that passing by hdfs would break the "exactly once" property of spark (streaming)

Upvotes: 2

Views: 1368

Answers (1)

David Griffin
David Griffin

Reputation: 13927

You can create a master RDD, and merge the micro RDDs generated by the stream to the master with RDD.union. Something like:

var masterRDD: RDD[(Long,Long), String] = sc.emptyRDD  // guessing on RDD type

myStream.foreachRDD(rdd => {
  if (! rdd.isEmpty) {
    masterRDD.union(rdd)

    masterRDD.groupBy(...).....
  }
})

You should take some time and read up on checkpointing as well, specifically:

Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

Upvotes: 1

Related Questions