nmurthy
nmurthy

Reputation: 1577

Spark: How do I pass a PartialFunction to a DStream?

I'm trying to pass a partial function to the union of all the RDDs captured in a DStream batch over a sliding window. Lets say I construct a window operation over 10 seconds on a stream discretized into 1 second batches:

val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))

My window will have K many RDDs. I want to use collect(f: PartialFunction[T, U]) on the union of all K of these RDDs. I could call the union operator ++ using foreachRDD, but I want to return an RDD not a Unit and avoid side effects.

What I'm looking for is a reducer like

def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]

on a DStream that I can use like so:

window.reduce(_ ++ _).transform(_.collect(myPartialFunc))

But this is not available in the Spark Streaming API.

Does anyone have any good ideas for combining RDDs captured in a stream into a single RDD so I can pass in a partial function? Or for implementing my own RDD reducer? Perhaps this feature is coming in a subsequent Spark release?

Upvotes: 3

Views: 1370

Answers (1)

maasg
maasg

Reputation: 37435

Partial functions are not directly supported by a DStream operation, but it's not difficult to achieve the same functionality.

For example, let's take a trivial partial function that takes a String an produces an Int of the String if it's a number:

val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt}

And we have a dstream of Strings:

val stringDStream:DStream[String] = ??? // use your stream source here

Then we can then apply the partial function to the DStream like this:

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)

Upvotes: 2

Related Questions