John Watson
John Watson

Reputation: 103

How to consolidate the Spark streaming into array to Kafka

Currently, I have the following df

+-------+--------------------+-----+
|    key|          created_at|count|
+-------+--------------------+-----+
|Bullish|[2017-08-06 08:00...|   12|
|Bearish|[2017-08-06 08:00...|    1|
+-------+--------------------+-----+

I use the following to stream the data to Kafka

df.selectExpr("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092").option("topic","chart3").option("checkpointLocation", "/tmp/checkpoints2")
  .outputMode("complete")
  .start()

The problem here is that, for each of the row in DataFrame, it will write to Kafka one by one. My consumer will get the message one by one.

Is there any way to consolidate all rows into an array and stream to Kafka, such that my consumer can get the whole data in one go.

Thanks for advice.

Upvotes: 0

Views: 109

Answers (1)

T. Gawęda
T. Gawęda

Reputation: 16086

My consumer will get the message one by one.

Not exactly. It may depend on Kafka property. You can specify own properties and use for example:

props.put("batch.size", 16384);

In the background Spark uses normal cached KafkaProducer. It will use properties that you will provide in options when submitting query.

See also Java Doc. Be aware, that it may not scale correctly

Upvotes: 3

Related Questions