Exorcismus
Exorcismus

Reputation: 2482

How to insert processed spark stream into kafka

am trying to insert spark stream into kafka after being processed using the below snippet

query = ds1 \
    .selectExpr("CAST(value AS STRING)")\
    .writeStream\
    .foreachBatch(do_something) \
    .format("kafka") \
    .option("topic","topic-name") \
    .option("kafka.bootstrap.servers", "borkers-IPs") \
    .option("checkpointLocation", "/home/location") \
    .start()

but it seems it's inserting the original stream not the processed one.

Upvotes: 0

Views: 56

Answers (1)

Ged
Ged

Reputation: 18013

Use of foreachBatch has no effect here as you can see. Spark will not generate an error, it will just be like into the void.

Quote from the manuals:

Structured Streaming APIs provide two ways to write the output of a streaming query to data sources that do not have an existing streaming sink: foreachBatch() and foreach().

This excellent read is what you are looking for.

https://aseigneurin.github.io/2018/08/14/kafka-tutorial-8-spark-structured-streaming.html

Upvotes: 1

Related Questions