Reputation: 1223
I have a simple Kafka Multi processing, I'm listening to Multi<Bytes>
, doing a flatmap and producing multiple messages:
@Incoming("generic-in")
@Outgoing("generic-out")
public Multi<String> consume(@NotNull Multi<Bytes> messages) {
return messages.flatMap((message) -> {
return Multi.createFrom().items({parse message into multiple strings});
});
}
I added the propagate-headers key to the outgoing topic:
generic-out:
merge: true
bootstrap:
servers: localhost:9092
topic: 'test.test.out'
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
connector: smallrye-kafka
value:
serializer: org.apache.kafka.common.serialization.StringSerializer
propagate-headers: "HEADER1,HEADER2"
Still I'm not getting the headers propagated except the opentelemetry.
Upvotes: 0
Views: 263
Reputation: 474
The Multi<O> process(Multi<I> messages)
signature does not support automatic metadata propagation, hence the header propagation for Kafka.
I suggest you use, if it suits your use case, Multi<String> consume(Bytes message)
. The method will be called for each incoming message and you can produce multiple messages per incoming message using your code Multi.createFrom().items({parse message into multiple strings});
.
And the Kafka header propagation will work expectedly.
Upvotes: 2