Spark Scala UDP receive on listening port

The example mentioned in http://spark.apache.org/docs/latest/streaming-programming-guide.html Lets me receive data packets in a TCP stream and listening on port 9999

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

 // Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

I am able to send data over TCP by creating a data server by using in my Linux system $ nc -lk 9999

I need to receive stream from an android phone streaming using UDP and the Scala/Spark
receives ONLY in TCP streams.

How can I receive UDP streams in a similar simple manner using Scala+Spark and create Spark DStream.

The problem with the Yuval Itzchakov's solution is that the receiver receives one message and restarts itself. Just replace restart for receive as shown below.

def receive() {
    try {
      val buffer = new Array[Byte](200000)

      // Create a packet to receive data into the buffer
      val packet = new DatagramPacket(buffer, buffer.length)

      val iterator = bytesToLines(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
      // Now loop forever, waiting to receive packets and printing them.
      while (!isStopped() && iterator.hasNext) {

      if (!isStopped()) {
//        restart("Udp socket data stream had no more data")
    } catch {
      case NonFatal(e) =>
        restart("Error receiving data", e)
    } finally {

There isn't something built in, but it's not too much work to get it done youself. Here is a simple solution I made based on a custom UdpSocketInputDStream[T]:

import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

import scala.reflect.ClassTag
import scala.util.control.NonFatal

class UdpSocketInputDStream[T: ClassTag](
                                          _ssc: StreamingContext,
                                          host: String,
                                          port: Int,
                                          bytesToObjects: InputStream => Iterator[T],
                                          storageLevel: StorageLevel
                                        ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)

class UdpSocketReceiver[T: ClassTag](host: String,
                                     port: Int,
                                     bytesToObjects: InputStream => Iterator[T],
                                     storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

  var udpSocket: DatagramSocket = _

  override def onStart(): Unit = {

    try {
      udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $port", e)

    // Start the thread that receives data over a connection
    new Thread("Udp Socket Receiver") {

      override def run() {

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val buffer = new Array[Byte](2048)

      // Create a packet to receive data into the buffer
      val packet = new DatagramPacket(buffer, buffer.length)


      val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
      // Now loop forever, waiting to receive packets and printing them.
      while (!isStopped() && iterator.hasNext) {

      if (!isStopped()) {
        restart("Udp socket data stream had no more data")
    } catch {
      case NonFatal(e) =>
        restart("Error receiving data", e)
    } finally {

  override def onStop(): Unit = {
    synchronized {
      if (udpSocket != null) {
        udpSocket = null

In order to get StreamingContext to add a method on itself, we enrich it with an implicit class:

object Implicits {
  implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
    def udpSocketStream[T: ClassTag](host: String,
                                     port: Int,
                                     converter: InputStream => Iterator[T],
                                     storageLevel: StorageLevel): InputDStream[T] = {
      new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)

And here is how you call it all:

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.reflect.ClassTag

object TestRunner {
  import Implicits._

  def main(args: Array[String]): Unit = {
    val sparkContext = new SparkContext("local[*]", "udpTest")
    val ssc = new StreamingContext(sparkContext, Seconds(4))

    val stream = ssc.udpSocketStream("localhost", 


  def bytesToLines(inputStream: InputStream): Iterator[String] = {
    val dataInputStream = new BufferedReader(
      new InputStreamReader(inputStream, StandardCharsets.UTF_8))
    new NextIterator[String] {
      protected override def getNext(): String = {
        val nextValue = dataInputStream.readLine()
        if (nextValue == null) {
          finished = true

      protected override def close() {

  abstract class NextIterator[U] extends Iterator[U] {
    protected var finished = false
    private var gotNext = false
    private var nextValue: U = _
    private var closed = false

    override def next(): U = {
      if (!hasNext) {
        throw new NoSuchElementException("End of stream")
      gotNext = false

    override def hasNext: Boolean = {
      if (!finished) {
        if (!gotNext) {
          nextValue = getNext()
          if (finished) {
          gotNext = true

    def closeIfNeeded() {
      if (!closed) {
        closed = true

    protected def getNext(): U
    protected def close()

Most of this code is taken from the SocketInputDStream[T] provided by Spark, I simply re-used it. I also took the code for the NextIterator which is used by bytesToLines, all it does is consume the line from the packet and transform it to a String. If you have more complex logic, you can provide it by passing converter: InputStream => Iterator[T] your own implementation.

Testing it with simple UDP packet:

echo -n "hello hello hello!" >/dev/udp/localhost/3003


Time: 1482676728000 ms
hello hello hello!

Of course, this has to be further tested. I also has a hidden assumption that each buffer created from the DatagramPacket is 2048 bytes, which is perhaps something you'll want to change.

Upvotes: 6

