Reputation: 1577
I'm trying to pass a partial function to the union of all the RDDs captured in a DStream batch over a sliding window. Lets say I construct a window operation over 10 seconds on a stream discretized into 1 second batches:
val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))
My window
will have K many RDDs. I want to use collect(f: PartialFunction[T, U])
on the union of all K of these RDDs. I could call the union operator ++
using foreachRDD
, but I want to return an RDD
not a Unit
and avoid side effects.
What I'm looking for is a reducer like
def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]
on a DStream
that I can use like so:
window.reduce(_ ++ _).transform(_.collect(myPartialFunc))
But this is not available in the Spark Streaming API.
Does anyone have any good ideas for combining RDDs captured in a stream into a single RDD so I can pass in a partial function? Or for implementing my own RDD reducer? Perhaps this feature is coming in a subsequent Spark release?
Upvotes: 3
Views: 1370
Reputation: 37435
Partial functions are not directly supported by a DStream operation, but it's not difficult to achieve the same functionality.
For example, let's take a trivial partial function that takes a String an produces an Int of the String if it's a number:
val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt}
And we have a dstream of Strings:
val stringDStream:DStream[String] = ??? // use your stream source here
Then we can then apply the partial function to the DStream like this:
val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)
Upvotes: 2