Ga999
Ga999

Reputation: 71

Spark2 to Spark3 Upgrade -> DataFrame foreachPartition method does not work

This code works in Spark 2.4 with Scala 2.11.7. It is not working in Spark 3.4.2 with Scala 2.12.15.

I have tried Running change the foreachPartition(parti => { })

to

.foreachPartition((parti: Iterator[Row]) => {

it didn't worked. I am struck here

Can anyone help me on this one.

The following code:

 import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
    
    val targetDbDriver : String = "org.postgresql.Driver"
              val targetDbUrl : String = "jdbc:postgresql://server:portnumber/Dbname"
              val targetDbUser : String = "*****"
              val targetDbPassword : String= "******"
              val targetDbschema : String = "abc"
              val connection: Connection = null
    
    
    val df = Seq((1, "a1", "aaaa"),
          (2, "b1", "bbbb")).toDF("id", "a_id", "first_nm").toDF()
    
    
    df.coalesce(1).foreachPartition(parti => {
    
                Class.forName(targetDbDriver)
    
                connection = DriverManager.getConnection(targetDbUrl, targetDbUser, targetDbPassword)
    
                val batchSize = 10
    
                var updatePrepareStmt: PreparedStatement = null
    
                parti.grouped(batchSize).foreach(batch => {
                  batch.foreach { row => {
    
                    val IdIndx = row.fieldIndex("id")
                    val id = row.getString(IdIndx)
    
                    val aIdIndx = row.fieldIndex("a_id")
                    val a_id = row.getString(aIdIndx)            
                    
                    val firstNmIndx = row.fieldIndex("first_nm")
                    val first_nm = row.getString(firstNmIndx)  
                                    
                    
                    val sql = "INSERT INTO " + targetDbschema + ".agency_leads VALUES (?,?,?) " +
                              "ON CONFLICT (lead_id) " +
                              "DO UPDATE SET " +
                              "agency_id = EXCLUDED.agency_id, " + 
                              "first_nm = EXCLUDED.first_nm "
                              
                    updatePrepareStmt = connection.prepareStatement(sql)
                    updatePrepareStmt.setString(1, lead_id)
                    updatePrepareStmt.setString(2, agency_id)  
                    updatePrepareStmt.setString(3, first_nm)      
    
                    updatePrepareStmt.addBatch()
    
                  }
                    updatePrepareStmt.executeBatch()
                  }
    
                })
                connection.close()
              })

Upvotes: 0

Views: 42

Answers (0)

Related Questions