Dmytro Chaban
Dmytro Chaban

Reputation: 1223

Quarkus Kafka: header propagation doesn't work for Multi

I have a simple Kafka Multi processing, I'm listening to Multi<Bytes>, doing a flatmap and producing multiple messages:

    @Incoming("generic-in")
    @Outgoing("generic-out")
    public Multi<String> consume(@NotNull Multi<Bytes> messages) {
        return messages.flatMap((message) -> {
               return Multi.createFrom().items({parse message into multiple strings});
        });
    }

I added the propagate-headers key to the outgoing topic:

      generic-out:
        merge: true
        bootstrap:
          servers: localhost:9092
        topic: 'test.test.out'
        key:
          serializer: org.apache.kafka.common.serialization.StringSerializer
        connector: smallrye-kafka
        value:
          serializer: org.apache.kafka.common.serialization.StringSerializer
        propagate-headers: "HEADER1,HEADER2"

Still I'm not getting the headers propagated except the opentelemetry.

Upvotes: 0

Views: 263

Answers (1)

Ozan G&#252;nalp
Ozan G&#252;nalp

Reputation: 474

The Multi<O> process(Multi<I> messages) signature does not support automatic metadata propagation, hence the header propagation for Kafka.

I suggest you use, if it suits your use case, Multi<String> consume(Bytes message). The method will be called for each incoming message and you can produce multiple messages per incoming message using your code Multi.createFrom().items({parse message into multiple strings});. And the Kafka header propagation will work expectedly.

Upvotes: 2

Related Questions