Reputation: 220
Trying to do some manipulation on stractured streaming and for that need to use .rdd function, but get """AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"""
?
val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url")).as[String]
.rdd.toDF()
.writeStream
.format("console") // <-- use ConsoleSink
.option("truncate", false)
.option("numRows", 10)
.trigger(Trigger.ProcessingTime(5 seconds))
.queryName("rate-console")
.start
.awaitTermination()
Upvotes: 0
Views: 83
Reputation: 21
TL;DR And it didn't suppose to work. Converting Structured Streams to RDDs or DStreams is not supported.
Structured streams and RDD are two different, and ultimately incompatible abstractions. While there is some pending discussion about having an option for arbitrary transformations, there is no such thing at the moment.
Upvotes: 2