Reputation: 2482
am trying to insert spark stream into kafka after being processed using the below snippet
query = ds1 \
.selectExpr("CAST(value AS STRING)")\
.writeStream\
.foreachBatch(do_something) \
.format("kafka") \
.option("topic","topic-name") \
.option("kafka.bootstrap.servers", "borkers-IPs") \
.option("checkpointLocation", "/home/location") \
.start()
but it seems it's inserting the original stream not the processed one.
Upvotes: 0
Views: 56
Reputation: 18013
Use of foreachBatch has no effect here as you can see. Spark will not generate an error, it will just be like into the void.
Quote from the manuals:
Structured Streaming APIs provide two ways to write the output of a streaming query to data sources that do not have an existing streaming sink: foreachBatch() and foreach().
This excellent read is what you are looking for.
https://aseigneurin.github.io/2018/08/14/kafka-tutorial-8-spark-structured-streaming.html
Upvotes: 1