nonoDa
nonoDa

Reputation: 453

foreach() method with Spark Streaming errors

I'm trying to write data pulled from a Kafka to a Bigquery table every 120 seconds. I would like to do some additional operations which by documentation should be possible inside the .foreach() or foreachBatch() method.

As a test i wanted to print a simple message every time data get's pulled from kafka and written to BigQuery.

batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreachBatch(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") \
.option("temporaryGcsBucket",path1) \
.option("checkpointLocation",path2) \
.option("table", table_kafka) \
.start()
batch_job.awaitTermination()

I would expect this message to be printed every 120 secs on jupyter Lab output cell, instead it gets printed only once and just keeps writing to BigQuery.

If i try to use .foreach() instead of foreachBatch()

batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreach(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") \
.option("temporaryGcsBucket",path1) \
.option("checkpointLocation",path2) \
.option("table", table_kafka) \
.start()
batch_job.awaitTermination()

it prints the message once and immediately after gives the following error, which i could not debug/understard:

/usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f)
   1335 
   1336             if not hasattr(f, 'process'):
-> 1337                 raise Exception("Provided object does not have a 'process' method")
   1338 
   1339             if not callable(getattr(f, 'process')):

Exception: Provided object does not have a 'process' method

Am i doing something wrong? how can i simply do some operations every 120 secs other than those performed directly on the dataframe evaluated df_alarmsFromKafka?

Upvotes: 1

Views: 1288

Answers (1)

Enes Uğuroğlu
Enes Uğuroğlu

Reputation: 407

Additional operations allowed but only on the output data of streaming query. But here you are trying to print some strings which is not related with output data itself. It can be printed only once.

For example if you write foreachbatch function like below:

def write_to_cassandra(target_df, batch_id):
target_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "tweet_db") \
    .option("table", "tweet2") \
    .mode("append") \
    .save()
target_df.show()

It will pring target_df on every batch since .show() function is related with output data itself.

For your second question:

Foreach function expect from you to extend the class ForeachWriter by implementing open, process and close methods which you did not there.

Upvotes: 0

Related Questions