Reputation: 71
This code works in Spark 2.4 with Scala 2.11.7. It is not working in Spark 3.4.2 with Scala 2.12.15.
I have tried Running change the foreachPartition(parti => { })
to
.foreachPartition((parti: Iterator[Row]) => {
it didn't worked. I am struck here
Can anyone help me on this one.
The following code:
import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
val targetDbDriver : String = "org.postgresql.Driver"
val targetDbUrl : String = "jdbc:postgresql://server:portnumber/Dbname"
val targetDbUser : String = "*****"
val targetDbPassword : String= "******"
val targetDbschema : String = "abc"
val connection: Connection = null
val df = Seq((1, "a1", "aaaa"),
(2, "b1", "bbbb")).toDF("id", "a_id", "first_nm").toDF()
df.coalesce(1).foreachPartition(parti => {
Class.forName(targetDbDriver)
connection = DriverManager.getConnection(targetDbUrl, targetDbUser, targetDbPassword)
val batchSize = 10
var updatePrepareStmt: PreparedStatement = null
parti.grouped(batchSize).foreach(batch => {
batch.foreach { row => {
val IdIndx = row.fieldIndex("id")
val id = row.getString(IdIndx)
val aIdIndx = row.fieldIndex("a_id")
val a_id = row.getString(aIdIndx)
val firstNmIndx = row.fieldIndex("first_nm")
val first_nm = row.getString(firstNmIndx)
val sql = "INSERT INTO " + targetDbschema + ".agency_leads VALUES (?,?,?) " +
"ON CONFLICT (lead_id) " +
"DO UPDATE SET " +
"agency_id = EXCLUDED.agency_id, " +
"first_nm = EXCLUDED.first_nm "
updatePrepareStmt = connection.prepareStatement(sql)
updatePrepareStmt.setString(1, lead_id)
updatePrepareStmt.setString(2, agency_id)
updatePrepareStmt.setString(3, first_nm)
updatePrepareStmt.addBatch()
}
updatePrepareStmt.executeBatch()
}
})
connection.close()
})
Upvotes: 0
Views: 42