Reputation: 1977
From within a Quarkus application I need to publish tombstone messages to a compacted Apache Kafka topic. As my use-case is imperative I use an Emitter
for sending messages to the topic (as suggested in the quarkus blog). The code for non-tombstone messages (with payload) is:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<MyDataStructure> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
OutgoingKafkaRecordMetadata<String> metadata =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey(myData.getTopicKey())
.build();
return CompletableFuture.runAsync(
() -> emitter.send(Message.of(myData).addMetadata(metadata)));
}
}
The Emitter
also implements <M extends Message<? extends T>> void send(M msg)
which I hoped would allow me to craft a Message
with payload of null
as tombstone message. Unfortunately all implementations of the Message.of(..)
factory method, that allow to provide metadata (which is needed to provide the message-key), specify that payload, must not be {@code null}.
What is the proper way (following Quarkus / SmallRye Reactive Messaging concepts) to publish tombstone messages to a Kafka topic using an Emitter
?
Upvotes: 3
Views: 2093
Reputation: 3192
I would recommend using the Record
class (see documentation).
A Record
is a key/value pair, which represents the key and value of the Kafka record to write. Both can be null
, but in your case, only the value part should be null
: Record.of(key, null);
.
So, you need to change the type of the Emitter to be Record<Key, Value>
, such as:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<Record<Key, MyDataStructure>> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
return emitter.send(Record.of(myData.getTopicKey(), null);
}
}
While runAsync
is convenient, emitters are already async. So, no need to use that. In addition, the behavior can be dramatic in containers (if your level of parallelism is less than 2).
My code returned the result of the send
method which is a CompletionStage
. That stage will be completed when the record is written in Kafka (and acked by the broker).
Upvotes: 3