devo
devo

Reputation: 1332

Stream Kafka messages in REST endpoint

I am trying to achieve something with a spring-boot REST service and kafka. I want to dynamically create a listener to a kafka topic when I receive a REST request, listen to that topic for a period of time, say 10 seconds, filter out messages based on the some value in the message (RecordFilterStrategy I guess?) and stream the messages back to the REST consumer. Is this possible with Kafka?

I was able to dynamically create a listener to a topic but don't see how to add a record filter to it or how I can stream these back to the caller.

This is how I was dynamically creating a listener to a topic:

public ConcurrentMessageListenerContainer getConsumer(String topic, MessageListener<String, String> listener) {
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
                consumerConfig);

        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener(listener);


        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
                kafkaConsumerFactory,
                containerProperties);
return container;
}

Can someone point me to any documentation on how to achieve this? I created a GET mapping that can return a Flux of messages but I have no idea how to return the messages on the topic. I know the below is not correct but the message listener code does get hit.

@GetMapping(path = "/v1/messages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Message> getMessages(@RequestParam String id) {

    MessageListener<String, String> listener = new MessageListener<String, String>() {

        @Override
        public void onMessage(ConsumerRecord<String, String> arg0) {
            logger.debug("We got a message:" + arg0);

        }
    };

    ConcurrentMessageListenerContainer consumer = consumerFactory.getConsumer("mytopic", listener);
    consumer.start();

  //return messages here

}

Upvotes: 0

Views: 1194

Answers (2)

Dawid D
Dawid D

Reputation: 111

Since you are returning Flux, maybe it is a good idea to take a look at https://projectreactor.io/docs/kafka/release/reference/

and use reactive receiver and which returns flux that you can map

Flux<ReceiverRecord<Integer, String>> inboundFlux =
Receiver.create(receiverOptions)
        .receive();

Or looking at your code you can just pull them all, map and build flux, but that would be blocking.

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174484

When using your own MessageListener implementation, you have to do the filtering yourself. The RecordFilterStrategy is used by the FilteringMessageListenerAdapter when using Spring-created listeners for @RabbitListener methods.

Upvotes: 0

Related Questions