lloydmeta
lloydmeta

Reputation: 1339

Idiomatic way to turn an Akka Source into a Spark InputDStream

I'm essentially trying to do the opposite of what is being asked in this question; that is to say, use a Source[A] to push elements into a InputDStream[A].

So far, I've managed to clobber together an implementation that uses a Feeder actor and a Receiver actor similar to the ActorWordCount example, but this seems a bit complex so I'm curious if there is a simpler way.

Upvotes: 2

Views: 587

Answers (2)

lloydmeta
lloydmeta

Reputation: 1339

EDIT: Self-accepting after 5 days since there have been no good answers.

I've extracted the Actor-based implementation into a lib, Sparkka-streams, and it's been working for me thus far. When a solution to this question that is better shows up, I'll either update or deprecate the lib.

Its usage is as follows:

// InputDStream can then be used to build elements of the graph that require integration with Spark
val (inputDStream, feedDInput) = Streaming.connection[Int]()
val source = Source.fromGraph(GraphDSL.create() { implicit builder =>

  import GraphDSL.Implicits._

  val source = Source(1 to 10)

  val bCast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val add1 = Flow[Int].map(_ + 1)
  val times3 = Flow[Int].map(_ * 3)
  source ~> bCast ~> add1 ~> merge
            bCast ~> times3 ~> feedDInput ~> merge

  SourceShape(merge.out)
})

val reducedFlow = source.runWith(Sink.fold(0)(_ + _))
whenReady(reducedFlow)(_ shouldBe 230)

val sharedVar = ssc.sparkContext.accumulator(0)
inputDStream.foreachRDD { rdd =>
  rdd.foreach { i =>
    sharedVar += i
  }
}
ssc.start()
eventually(sharedVar.value shouldBe 165)

Upvotes: 1

PH88
PH88

Reputation: 1806

Ref: http://spark.apache.org/docs/latest/streaming-custom-receivers.html

You can do it like:

class StreamStopped extends RuntimeException("Stream stopped")

// Serializable factory class
case class SourceFactory(start: Int, end: Int) {
  def source = Source(start to end).map(_.toString)
}

class CustomReceiver(sourceFactory: SourceFactory)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  implicit val materializer = ....

  def onStart() {
    sourceFactory.source.runForEach { e =>
      if (isStopped) {
        // Stop the source
        throw new StreamStopped
      } else {
        store(e)
      }
    } onFailure {
      case _: StreamStopped => // ignore
      case ex: Throwable => reportError("Source exception", ex)
    }
  }

  def onStop() {}
}

val customReceiverStream = ssc.receiverStream(new CustomReceiver(SourceFactory(1,100))

Upvotes: 0

Related Questions