Klun
Klun

Reputation: 54

Spark : how to parallelize subsequent specific work on each dataframe partitions

My Spark application is as follow :

1) execute large query with Spark SQL into the dataframe "dataDF"

2) foreach partition involved in "dataDF" :

2.1) get the associated "filtered" dataframe, in order to have only the partition associated data

2.2) do specific work with that "filtered" dataframe and write output

The code is as follow :

val dataSQL = spark.sql("SELECT ...")
val dataDF = dataSQL.repartition($"partition")

for {
  row <- dataDF.dropDuplicates("partition").collect
} yield {

   val partition_str : String = row.getAs[String](0)
   val filtered = dataDF.filter($"partition" .equalTo( lit( partition_str ) ) )

   // ... on each partition, do work depending on the partition, and write result on HDFS

   // Example :

   if( partition_str == "category_A" ){

       // do group by, do pivot, do mean, ...
       val x = filtered
         .groupBy("column1","column2")
         ...

       // write final DF
       x.write.parquet("some/path")

   } else if( partition_str == "category_B" ) {

       // select specific field and apply calculation on it
       val y = filtered.select(...)

       // write final DF
       x.write.parquet("some/path")

   } else if ( ... ) {

      // other kind of calculation
      // write results

   } else {

      // other kind of calculation
      // write results

   }

}

Such algorithm works successfully. The Spark SQL query is fully distributed. However the particular work done on each resulting partition is done sequentially, and the result is inneficient especially because each write related to a partition is done sequentially.

In such case, what are the ways to replace the "for yield" by something in parallel/async ?

Thanks

Upvotes: 0

Views: 2539

Answers (1)

Ged
Ged

Reputation: 18098

  1. You could use foreachPartition if writing to data stores outside Hadoop scope with specific logic needed for that particular env.

  2. Else map, etc.

  3. .par parallel collections (Scala) - but that is used with caution. For reading files and pre-processing them, otherwise possibly considered risky.

  4. Threads.

  5. You need to check what you are doing and if the operations can be referenced, usewd within a foreachPartition block, etc. You need to try as some aspects can only be written for the driver and then get distributed to the executors via SPARK to the workers. But you cannot write, for example, spark.sql for the worker as per below - at the end due to some formatting aspect errors I just got here in the block of text. See end of post.

  6. Likewise df.write or df.read cannot be used in the below either. What you can do is write individual execute/mutate statements to, say, ORACLE, mySQL.

Hope this helps.

rdd.foreachPartition(iter => {
       while(iter.hasNext) {
         val item = iter.next()
         // do something
         spark.sql("INSERT INTO tableX VALUES(2,7, 'CORN', 100, item)")
         // do some other stuff
  })

or

RDD.foreachPartition (records => {       
  val JDBCDriver = "com.mysql.jdbc.Driver" ...
  ...
  connectionProperties.put("user", s"${jdbcUsername}")
  connectionProperties.put("password", s"${jdbcPassword}")
 val connection = DriverManager.getConnection(ConnectionURL, jdbcUsername, jdbcPassword)
  ...
  val mutateStatement = connection.createStatement()
  val queryStatement = connection.createStatement()
  ...
      records.foreach (record => { 
              val val1 = record._1
              val val2 = record._2
              ...
              mutateStatement.execute (s"insert into sample (k,v) values(${val1}, ${nIterVal})")      
            })
  }            
)   

Upvotes: 1

Related Questions