Reputation: 101
The only way to join / union /cogroup a DStream RDD with Batch RDD is via the "transform" method, which returns another DStream RDD and hence it gets discarded at the end of the micro-batch.
Is there any way to e.g. union Dstream RDD with Batch RDD which produces a new Batch RDD containing the elements of both the DStream RDD and the Batch RDD.
And once such Batch RDD is created in the above way, can it be used by other DStream RDDs to e.g. join with as this time the result can be another DStream RDD
Effectively the functionality described above will result in periodical updates (additions) of elements to a Batch RDD - the additional elements will keep coming from DStream RDDs which keep streaming in with every micro-batch. Also newly arriving DStream RDDs will be able to join with the thus previously updated BAtch RDD and produce a result DStream RDD
Something almost like that can be achieved with updateStateByKey, but is there a way to do it as described here
Upvotes: 3
Views: 1728
Reputation: 11284
Another approach would be to transform the batch input to a DStream and union it with your streaming input. Then you write it out using foreachRDD which is new your batch input to other jobs.
val batch = sc.textFile(...)
val ssc = new StreamingContext(sc, Seconds(30))
val stream = ssc.textFileStream(...)
import scala.collection.mutable
val batchStream = ssc.queueStream(mutable.Queue.empty[RDD[String]], oneAtATime = false, defaultRDD = batch)
val union = ssc.union(Seq(stream, batchStream))
union.print()
union.foreachRDD { rdd =>
// Delete previous, or use SchemaRDD with .insertInto(, overwrite = true)
rdd.saveTextFile(...)
}
ssc.start()
ssc.awaitTermination()
Upvotes: 5