nonoDa
nonoDa

Reputation: 453

How to execute operations every batch?

I'm trying to read from a kafka topic, do some operations and then write the df to disk, for example:

df_alarmsFromKafka=spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", ip) \
.option("subscribe", topic) \
.option("request.timeout.ms",80000) \
.option("includeHeaders", "true") \
.load() 

df_alarmsFromKafka=df_alarmsFromKafka.drop("test")
print("ran only once, not in the stream")

batch_job=df_alarmsFromKafka.writeStream \
    .format("parquet").outputMode("append") \
    .option("path",path) \
    .option("checkpointLocation",cp) \
    .start()

batch_job.awaitTermination()

The issue i have with this is that only operations on df_alarmsFromKafka are ran every batch.
For example if i want a simple print to be evaluated every batch it appears not to be possible as it gets obviously printed and evaluated only the first time.

Is there a different way such that i'm able to do other operations in between batches and not only the ones strictly related to the Dataframe.writeStream evaluated?

Upvotes: 2

Views: 427

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

Ah, I think I understood what you meant by "if i want a simple print to be evaluated every batch". You seem to be asking for foreach operator:

Sets the output of the streaming query to be processed using the provided writer f. This is often used to write the output of a streaming query to arbitrary storage systems.

A pseudo-code could look as follows:

df_alarmsFromKafka = df_alarmsFromKafka.drop("test")
df_alarmsFromKafka.foreach(print("runs every batch, in the stream"))

batch_job=df_alarmsFromKafka.writeStream \
    .format("parquet").outputMode("append") \
    .option("path",path) \
    .option("checkpointLocation",cp) \
    .start()

The above would actually start 2 streaming queries (for one input).

Upvotes: 1

Related Questions