CSUNNY
CSUNNY

Reputation: 454

Structured streaming: Queries with streaming sources must be executed with writeStream.start()

I am trying to read some data from a file using Structured streaming and finally write it to Cassandra. However I am getting the below error (much before cassandra writing)

"org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"

Here is the code snippet I am using

val ip15M = spark.readStream.schema(NewsSchema).parquet(INPUT_DIRECTORY)
val dataframeToRowColFunction = new RowToColumn(table) // This seems to work fine

val pairs = ip15M.toJavaRDD.flatMapToPair(dataframeToRowColFunction.FlatMapData) // This fails
// ... Other code

Here is how RowToColumn class looks like

class RowToColumn (var table: Table) extends java.io.Serializable{
  val decomposer = new EventDecomposer(table)

  val FlatMapData: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
  //val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
    println(" in FlatMapData") // This is printed
    override def call(x: Row) = {
      println(" in Call method") // This is not printed
      // Other code ...
  }

}

This job works fine without streaming. Also, I looked at other link1 and link2 , but is not solving the problem

Upvotes: 0

Views: 1590

Answers (1)

dumitru
dumitru

Reputation: 2108

You can approach the write part in following way since I don't know if Cassandra has a stream-to-stream connector for structured streaming in spark:

ip15M
      .writeStream
      .foreachBatch { (df, batchId) => {
        // here apply all of your logic on dataframe
      }
      }
      .start()

Keep in mind that in the foreach loop you're dealing with dataframe, not streams, and most probably you can save them directly in Cassandra.

Upvotes: 1

Related Questions