Nitin Kumar
Nitin Kumar

Reputation: 249

Is foreachRDD executed on the Driver?

I am trying to process some XML data received on a JMS queue (QPID) using Spark Streaming. After getting xml as DStream I convert them to Dataframes so I can join them with some of my static data in form of Dataframes already loaded. But as per API documentation for foreachRdd method on DStream: it gets executed on Driver, so does that mean all processing logic will only run on Driver and not get distributed to workers/executors.

API Documentation

foreachRDD(func)

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

Upvotes: 14

Views: 5413

Answers (2)

user6253571
user6253571

Reputation:

To make this clear, if you run the following, you will see "monkey" on the driver's stdout:

myDStream.foreachRDD { rdd =>
  println("monkey")
}

If you run the following, you will see "monkey" on the driver's stdout, and the filter work will be done on whatever executors the rdd is distributed across:

myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
}

Let's add the simplification that myDStream only ever receives one RDD, and that this RDD is spread across a set of partitions that we'll call PartitionSetA that exist on MachineSetB where ExecutorSetC are running. If you run the following, you will see "monkey" on the driver's stdout, you will see "turtle" on the stdouts of all executors in ExecutorSetC ("turtle" will appear once for each partition -- many partitions could be on the machine where an executor is running), and the work of both the filter and addition operations will be done across ExecutorSetC:

myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
  }
}

One more thing to note is that in the following code, y would end up being sent across the network from the driver to all of ExecutorSetC for each rdd:

val y = 2
myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
    val z = x + y
  }
}

To avoid this overhead, you can use broadcast variables, which send the value from the driver to the executors just once. For example:

val y = 2
val broadcastY = sc.broadcast(y)
myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
    val z = x + broadcastY.value
  }
}

For sending more complex things over as broadcast variables, such as objects that aren't easily serializable once instantiated, you can see the following blog post: https://allegro.tech/2015/08/spark-kafka-integration.html

Upvotes: 10

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

so does that mean all processing logic will only run on Driver and not get distributed to workers/executors.

No, the function itself runs on the driver, but don't forget that it operates on an RDD. The inner functions that you'll use on the RDD, such as foreachPartition, map, filter etc will still run on the worker nodes. This won't cause all the data to be sent back over the network to the driver, unless you call methods like collect, which do.

Upvotes: 14

Related Questions