ca9163d9
ca9163d9

Reputation: 29159

Parallelism: rdd.parallelize(....) vs dataSet.map(...)?

I have implemented a Spark application using both DataFrame/DataSet and RDD. I submitted the application to my local development environment of Spark 2.1.1. My PC has eight CPU cores.

DateFrame/DataSet

val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._ 
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  val (a, b) = runJob.run(d, date) // returns a tuple of (int, java.sql.Date), which are the passed parameters.
  s"$a, $b"
})

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ 
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") 
    .as[Int] 
  }
}
processed.write.text("c:\\temp\\mpa")

RDD

val itemList = itemListJob.run(rc, priority).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect()

val processed = sc.parallelize(itemList).map(d => {
  runJob.run(d, rc) // returns a tuple of (int, LocalDate), which are the passed parameters.
})
processed.saveAsTextFile("c:\\temp\\mpa")

The RDD application split and generated eight text files while the Dataframe/DataSet one generated only one file. Does it mean the RDD ran eight runJob.run() in parallel while the DataFrame/DataSet approach only ran one a time without concurrency?

I want the runJob.run(), which does the main workload and will also make a jdbc call, to be run distributed and in parallel.

Upvotes: 0

Views: 2159

Answers (2)

Joe K
Joe K

Reputation: 18424

Yes, the number of files produced is a good indicator of the parallelism in the last step. (I can think of a few corner cases where this might not be the case, but that's irrelevant here)

sc.parallelize when running locally should split according to the number of cores.

However, in both cases, you would be using only 1 core to read over the jdbc connection, and in the RDD case, you additionally collect() the data back to the driver, then parallelize back to the task.

The preferred approach is to use repartition rather than collect and then parallelize. And even better would be to always be doing things in parallel. In the case of loading the data frame over jdbc, take a look at whether using the parameters partitionColumn, lowerBound, upperBound, numPartition (link) might be applicable in order to run in parallel from the very start.

Upvotes: 2

Related Questions