David
David

Reputation: 220

Structured streaming to rdd is not working

Trying to do some manipulation on stractured streaming and for that need to use .rdd function, but get """AnalysisException: Queries with streaming sources must be executed with writeStream.start();;""" ?

val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url")).as[String]
    .rdd.toDF()
    .writeStream
    .format("console") // <-- use ConsoleSink
    .option("truncate", false)
    .option("numRows", 10)
    .trigger(Trigger.ProcessingTime(5 seconds))
    .queryName("rate-console")
    .start
    .awaitTermination()

Upvotes: 0

Views: 83

Answers (1)

user10664295
user10664295

Reputation: 21

TL;DR And it didn't suppose to work. Converting Structured Streams to RDDs or DStreams is not supported.

Structured streams and RDD are two different, and ultimately incompatible abstractions. While there is some pending discussion about having an option for arbitrary transformations, there is no such thing at the moment.

Upvotes: 2

Related Questions