Reputation: 453
I'm trying to write data pulled from a Kafka to a Bigquery table every 120 seconds.
I would like to do some additional operations which by documentation should be possible inside the .foreach()
or foreachBatch()
method.
As a test i wanted to print a simple message every time data get's pulled from kafka and written to BigQuery.
batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreachBatch(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") \
.option("temporaryGcsBucket",path1) \
.option("checkpointLocation",path2) \
.option("table", table_kafka) \
.start()
batch_job.awaitTermination()
I would expect this message to be printed every 120 secs on jupyter Lab output cell, instead it gets printed only once and just keeps writing to BigQuery.
If i try to use .foreach()
instead of foreachBatch()
batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreach(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") \
.option("temporaryGcsBucket",path1) \
.option("checkpointLocation",path2) \
.option("table", table_kafka) \
.start()
batch_job.awaitTermination()
it prints the message once and immediately after gives the following error, which i could not debug/understard:
/usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f)
1335
1336 if not hasattr(f, 'process'):
-> 1337 raise Exception("Provided object does not have a 'process' method")
1338
1339 if not callable(getattr(f, 'process')):
Exception: Provided object does not have a 'process' method
Am i doing something wrong? how can i simply do some operations every 120 secs other than those performed directly on the dataframe evaluated df_alarmsFromKafka
?
Upvotes: 1
Views: 1288
Reputation: 407
Additional operations allowed but only on the output data of streaming query. But here you are trying to print some strings which is not related with output data itself. It can be printed only once.
For example if you write foreachbatch function like below:
def write_to_cassandra(target_df, batch_id):
target_df.write \
.format("org.apache.spark.sql.cassandra") \
.option("keyspace", "tweet_db") \
.option("table", "tweet2") \
.mode("append") \
.save()
target_df.show()
It will pring target_df on every batch since .show() function is related with output data itself.
For your second question:
Foreach function expect from you to extend the class ForeachWriter by implementing open, process and close methods which you did not there.
Upvotes: 0