Reputation: 375
I want to append lines to a text file using structured streaming. This code results in SparkException: Task not serializable
. I think toDF
is not allowed. How could I get this code to work?
df.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(row: Row): Unit = {
val df = Seq(row.getString(0)).toDF
df.write.format("text").mode("append").save(output)
}
override def close(errorOrNull: Throwable): Unit = {
}
}).start
Upvotes: 0
Views: 2548
Reputation: 20836
You cannot call df.write.format("text").mode("append").save(output)
inside process
method. It will run in the executor side. You can use the file sink instead, such as
df.writeStream.format("text")....
Upvotes: 2