Greg Clinton
Greg Clinton

Reputation: 375

Spark Structured Streaming for appending to text file using foreach

I want to append lines to a text file using structured streaming. This code results in SparkException: Task not serializable. I think toDF is not allowed. How could I get this code to work?

df.writeStream
  .foreach(new ForeachWriter[Row] {
    override def open(partitionId: Long, version: Long): Boolean = {
      true 
    }

    override def process(row: Row): Unit = {
       val df = Seq(row.getString(0)).toDF

       df.write.format("text").mode("append").save(output)
    } 

    override def close(errorOrNull: Throwable): Unit = {
    }      
  }).start

Upvotes: 0

Views: 2548

Answers (1)

zsxwing
zsxwing

Reputation: 20836

You cannot call df.write.format("text").mode("append").save(output) inside process method. It will run in the executor side. You can use the file sink instead, such as

df.writeStream.format("text")....

Upvotes: 2

Related Questions