Reputation: 107
I'm trying to iterate a dataframe using foreachpartition for inserting a value into database. I used foreachpartition and group the rows and using foreach to iterate each row. Please find my code below,
val endDF=spark.read.parquet(path).select("pc").filter(col("pc").isNotNull);
endDF.foreachpartition((partition: Iterator[Row]) =>
class.forname(driver)
val con=DriverManager.connection(jdbcurl,user,pwd)
partition.grouped(100).foreach(batch => {
val st=con.createStatement()
batch.foreach(row => {
val pc=row.get(0).toString()
val in=s"""insert tshdim (pc) values(${pc})""".stripMargin
st.addBatch(in)
})
st.executeLargeBatch
})
con.close()
})
When I try to get the pc value from the row(val pc=row.get(0).toString()) it's throwing the following exception. I'm doing this in spark-shell
org.apache.spark.SparkException : Task not serializable . .
Caused by:
Java.io.NotSerializable exception: org.apache.spark.sql.DataSet$RDDQueryExecution$ Serialization stack: Object not serializable (class:org.apache.spark.sql.DataSet$RDDQueryExecution$, value: org.apache.spark.sql.DataSet$RDDQueryExecution$@jfaf ) -field(class:org.apache.spark.sql.DataSet, name:RDDQueryExecutionModule, type: org.apache.spark.sql.DataSet$RDDQueryExecution$) -object(class:org.apache.spark.sql.DataSet,[pc:String])
Upvotes: 0
Views: 467
Reputation: 21
Function in foreachpartition
need to be serialized and passed to executors.
So, in your case, spark is trying to serialize DriverManager class and everything for your jdbc connection, and some of that is not serializable.
foreachPartition works without DriverManager -
endDF.foreachPartition((partition: Iterator[Row]) => {
partition.grouped(100).foreach(batch => {
batch.foreach(row => {
val pc=row.get(0)
println(pc)
})
})
})
To save it in your DB, first do .collect
Upvotes: 0