Reputation: 539
Does anyone know if it's possible to call writeStream
to a Kafka topic, where the topic that gets created is a compacted topic? The code below creates a Kafka topic, but the options passed are ignored.
StreamingQuery query = ds
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092"))
.option("topic", "myTopic")
.option("cleanup.policy", "compact")
.option("min.insync.replicas", 1)
.option("segment.bytes", 4096)
.option("delete.retention.ms", 100)
.option("min.compaction.lag.ms", 0)
.option("min.cleanable.dirty.ratio", 0.01)
.start();
Upvotes: 2
Views: 520
Reputation: 20840
As described in the documentation also, use kafka
prefix with producer or consumer properties.
Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix.
Example:
stream.option("kafka.bootstrap.servers","host:port")
Also, you can pass only producer or consumer level properties in the .options
. If you need any topic level configuration to be set, you need to use the broker config and that can't be set through spark streaming code, that is fetched from the default configuration.
Upvotes: 0
Reputation: 6593
Spark Structure Streaming, when calls writeStream
with kafka format under the hood uses KafkaProducer.
If on the broker side auto.create.topics.enable
has true
as a value (what is be default),
when KafkaProducer fetches metadata new topic is created by the broker.
KafkaProducer doesn't pass any topic's properties.
Broker uses default once (ex. default.replication.factor
, num.partitions
, log.cleanup.policy
) to create topic.
You can't pass topic creation properties through KafkaProducer.
Upvotes: 2