Reputation: 453
I'm trying to read from a kafka topic, do some operations and then write the df to disk, for example:
df_alarmsFromKafka=spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", ip) \
.option("subscribe", topic) \
.option("request.timeout.ms",80000) \
.option("includeHeaders", "true") \
.load()
df_alarmsFromKafka=df_alarmsFromKafka.drop("test")
print("ran only once, not in the stream")
batch_job=df_alarmsFromKafka.writeStream \
.format("parquet").outputMode("append") \
.option("path",path) \
.option("checkpointLocation",cp) \
.start()
batch_job.awaitTermination()
The issue i have with this is that only operations on df_alarmsFromKafka are ran every batch.
For example if i want a simple print to be evaluated every batch it appears not to be possible as it gets obviously printed and evaluated only the first time.
Is there a different way such that i'm able to do other operations in between batches and not only the ones strictly related to the Dataframe.writeStream
evaluated?
Upvotes: 2
Views: 427
Reputation: 74619
Ah, I think I understood what you meant by "if i want a simple print to be evaluated every batch". You seem to be asking for foreach operator:
Sets the output of the streaming query to be processed using the provided writer f. This is often used to write the output of a streaming query to arbitrary storage systems.
A pseudo-code could look as follows:
df_alarmsFromKafka = df_alarmsFromKafka.drop("test")
df_alarmsFromKafka.foreach(print("runs every batch, in the stream"))
batch_job=df_alarmsFromKafka.writeStream \
.format("parquet").outputMode("append") \
.option("path",path) \
.option("checkpointLocation",cp) \
.start()
The above would actually start 2 streaming queries (for one input).
Upvotes: 1