Tianliang Bian
Tianliang Bian

Reputation: 31

Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

at the code if not df.head(1).isEmpty: I got exception,

Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

I do not know how to use if in streaming data. when I use jupyter, to execute each line, the code is well, and I can got my result. but use .py it's not good.

my perpose is this: I want use streaming to get data from kafka every one second, then I transform every batch steaming data(one batch means the data one second I get) to pandas dataframe, and then I use pandas function to do something to the data, finally I send the result to other kafka topic.

Please help me, and forgive my pool english, Thanks a lot.

sc = SparkContext("local[2]", "OdometryConsumer")
spark = SparkSession(sparkContext=sc) \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "data") \
  .load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))

if not df.head(1).isEmpty:
  alertQuery = ds \
          .writeStream \
          .queryName("qalerts")\
          .format("memory")\
          .start()

  alerts = spark.sql("select * from qalerts")
  pdAlerts = alerts.toPandas()
  a = pdAlerts['value'].tolist()

  d = []
  for i in a:
      x = json.loads(i)
      d.append(x)

  df = pd.DataFrame(d)
  print(df)
  ds = df['jobID'].unique().tolist()


  dics = {}
  for source in ds:
      ids = df.loc[df['jobID'] == source, 'id'].tolist()
      dics[source]=ids

  print(dics)  
query = ds \
  .writeStream \
  .queryName("tableName") \
  .format("console") \
  .start()

query.awaitTermination()

Upvotes: 2

Views: 2776

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

Remove if not df.head(1).isEmpty: and you should be fine.

The reason for the exception is simple, i.e. a streaming query is a structured query that never ends and is continually executed. It is simply not possible to look at a single element since there is no "single element", but (possibly) thousands of elements and it'd be hard to tell when exactly you'd like to look under the covers and see just a single element.

Upvotes: 2

Related Questions