Reputation: 103
Currently, I have the following df
+-------+--------------------+-----+
| key| created_at|count|
+-------+--------------------+-----+
|Bullish|[2017-08-06 08:00...| 12|
|Bearish|[2017-08-06 08:00...| 1|
+-------+--------------------+-----+
I use the following to stream the data to Kafka
df.selectExpr("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092").option("topic","chart3").option("checkpointLocation", "/tmp/checkpoints2")
.outputMode("complete")
.start()
The problem here is that, for each of the row in DataFrame, it will write to Kafka one by one. My consumer will get the message one by one.
Is there any way to consolidate all rows into an array and stream to Kafka, such that my consumer can get the whole data in one go.
Thanks for advice.
Upvotes: 0
Views: 109
Reputation: 16086
My consumer will get the message one by one.
Not exactly. It may depend on Kafka property. You can specify own properties and use for example:
props.put("batch.size", 16384);
In the background Spark uses normal cached KafkaProducer. It will use properties that you will provide in options when submitting query.
See also Java Doc. Be aware, that it may not scale correctly
Upvotes: 3