Reputation: 454
I am trying to read some data from a file using Structured streaming and finally write it to Cassandra. However I am getting the below error (much before cassandra writing)
"org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"
Here is the code snippet I am using
val ip15M = spark.readStream.schema(NewsSchema).parquet(INPUT_DIRECTORY)
val dataframeToRowColFunction = new RowToColumn(table) // This seems to work fine
val pairs = ip15M.toJavaRDD.flatMapToPair(dataframeToRowColFunction.FlatMapData) // This fails
// ... Other code
Here is how RowToColumn class looks like
class RowToColumn (var table: Table) extends java.io.Serializable{
val decomposer = new EventDecomposer(table)
val FlatMapData: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
//val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
println(" in FlatMapData") // This is printed
override def call(x: Row) = {
println(" in Call method") // This is not printed
// Other code ...
}
}
This job works fine without streaming. Also, I looked at other link1 and link2 , but is not solving the problem
Upvotes: 0
Views: 1590
Reputation: 2108
You can approach the write part in following way since I don't know if Cassandra has a stream-to-stream connector for structured streaming in spark:
ip15M
.writeStream
.foreachBatch { (df, batchId) => {
// here apply all of your logic on dataframe
}
}
.start()
Keep in mind that in the foreach
loop you're dealing with dataframe
, not streams, and most probably you can save them directly in Cassandra.
Upvotes: 1