cscan
cscan

Reputation: 3840

AnalysisException: Queries with streaming sources must be executed with writeStream.start()

I'm receiving an exception which indicates that I need to start a stream in order to use it. However, the stream is being started. What's wrong with this setup?

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("subscribe", "inputTopic")
  .option("startingOffsets", "earliest")
  .load
  .selectExpr(deserializeKeyExpression, deserializeValueExpression)
  .select("value.*")
  .withColumn("date", to_timestamp(from_unixtime(col("date"))))
  .transform(model.transform)
  .select(col("id") as "key", func(col(config.probabilityCol)) as "value.prediction")
  .selectExpr(serializeKeyExpression, serializeValueExpression)
  .writeStream
  .outputMode("update")
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("checkpointLocation", "checkpoint")
  .option("topic", "outputTopic")
  .start

Here's the exception:

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    ...
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2491)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2498)
    at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
    at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
    at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
    at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:306)
    at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
    at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
    at org.apache.spark.sql.Dataset.transform(Dataset.scala:2513)
    at com.company.project.Stream.transform(NewsRateJob.scala:65)
    at com.company.project.Stream.setupStream(NewsRateJob.scala:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:311)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:134)
    ... 18 common frames omitted

I'm familiar with the issues of spark 2.2 and the VectorAssembler, however I am using spark 2.3.1.

Upvotes: 6

Views: 3373

Answers (2)

cscan
cscan

Reputation: 3840

This exception occurred because the model was trying to access the data from the stream before the stream was started. In this case, the VectorAssembler was calling first on the dataset in order to determine how wide the vector is.

2.3 does not automatically fix problems with the VectorAssembler and structured streaming, it simply provides a class (specifically the VectorSizeHint class) which can be used in conjunction with the VectorAssembler with structured streaming. Adding this to the stages of the pipeline fixed the problem.

stages += new VectorSizeHint()
  .setInputCol(column)
  .setSize(size)

Here's some documentation which shows how it can be used: https://docs.databricks.com/spark/latest/mllib/mllib-pipelines-and-stuctured-streaming.html

Note: this is not required for the OneHotEncoderEstimator feature.

We experienced similar stack traces for a couple other reasons. One was caused because we were using a OneHotEstimator in our model (that needed to be updated to a OneHotEncoderEstimator), the other was caused because we were caching the pipeline (we removed the cache step).

Upvotes: 4

user10391850
user10391850

Reputation:

The exception occurs because you are trying to use ML Transformer with streaming Dataset. As explained in Spark Structured Streaming and Spark-Ml Regression, as of today, Spark doesn't support ML on structured streams.

You'll have to rewrite your code to manually transform the data without dependency on RDDs and ML library.

Upvotes: 0

Related Questions