Ahmed
Ahmed

Reputation: 131

Trying to understand spark streaming flow

I have this piece of code:

val lines: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)
    lines.foreachRDD { rdd =>
      val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
      sparkStreamingService.run(df)
    }
    ssc.start()
    ssc.awaitTermination()

The way I understand it is, foreachRDD is happening at the driver level? So basically all that block of code:

lines.foreachRDD { rdd =>
  val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
  sparkStreamingService.run(df)
}

is happening at the driver level? The sparkStreamingService.run(df) method basically does some transformations on the current dataframe to yield a new dataframe and then calls another method (on another jar) which stores the dataframe to cassandra. So if this is happening all at the driver level, we are not utilizing the spark executors and how can I make it so that the executors are being used in parallel to process each partition of the RDD in parallel

My spark streaming service run method:

    var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect()
 metadataDataframe.foreach(rowD => {
      metaData = populateMetaDataService.populateSiteMetaData(rowD)
      val headers = (rowD.getString(2).split(recordDelimiter)(0))

      val fields = headers.split("\u0001").map(
        fieldName => StructField(fieldName, StringType, nullable = true))
      val schema = StructType(fields)

      val listOfRawData = rowD.getString(2).indexOf(recordDelimiter)
      val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1)

      val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter))
//      val rawData = dataWithoutHeaders.split(recordDelimiter)
      val rowRDD = rawData
        .map(_.split("\u0001"))
        .map(attributes => Row(attributes: _*))

      val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema)
      dataFrameFilterService.processBasedOnOpType(metaData, newDF)
    })

Upvotes: 0

Views: 98

Answers (2)

Justin Pihony
Justin Pihony

Reputation: 67135

The foreachRDD may run locally, but that just means the setup. The RDD itself is a distributed collection, so the actual work is distributed.

To comment directly on the code from the docs:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

Notice that the part of the code that is NOT based around the RDD is executed at the driver. It's the code built up using RDD that is distributed to the workers.

Your code specifically is commented below:

   //df.select will be distributed, but collect will pull it all back in
   var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect()
 //Since collect created a local collection then this is done on the driver
 metadataDataframe.foreach(rowD => {
      metaData = populateMetaDataService.populateSiteMetaData(rowD)
      val headers = (rowD.getString(2).split(recordDelimiter)(0))

      val fields = headers.split("\u0001").map(
        fieldName => StructField(fieldName, StringType, nullable = true))
      val schema = StructType(fields)

      val listOfRawData = rowD.getString(2).indexOf(recordDelimiter)
      val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1)

      //This will run locally, creating a distributed record
      val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter))
//      val rawData = dataWithoutHeaders.split(recordDelimiter)
      //This will redistribute the work
      val rowRDD = rawData
        .map(_.split("\u0001"))
        .map(attributes => Row(attributes: _*))
      //again, setting this up locally, to be run distributed
      val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema)
      dataFrameFilterService.processBasedOnOpType(metaData, newDF)
    })

Ultimately, you probably can rewrite this to not need the collect and keep it all distributed, but that is for you not StackOverflow

Upvotes: 1

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149618

The invocation of foreachRDD does happen on the driver node. But, since we're operating at the RDD level, any transformation on it will be distributed. In your example, rdd.map will cause each partition to be sent to a particular worker node for computation.

Since we don't know what your sparkStreamingService.run method is doing, we cant tell you about the locality of its execution.

Upvotes: 2

Related Questions