Reputation: 1332
I am trying to achieve something with a spring-boot REST service and kafka. I want to dynamically create a listener to a kafka topic when I receive a REST request, listen to that topic for a period of time, say 10 seconds, filter out messages based on the some value in the message (RecordFilterStrategy I guess?) and stream the messages back to the REST consumer. Is this possible with Kafka?
I was able to dynamically create a listener to a topic but don't see how to add a record filter to it or how I can stream these back to the caller.
This is how I was dynamically creating a listener to a topic:
public ConcurrentMessageListenerContainer getConsumer(String topic, MessageListener<String, String> listener) {
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
consumerConfig);
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(listener);
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
return container;
}
Can someone point me to any documentation on how to achieve this? I created a GET mapping that can return a Flux of messages but I have no idea how to return the messages on the topic. I know the below is not correct but the message listener code does get hit.
@GetMapping(path = "/v1/messages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message> getMessages(@RequestParam String id) {
MessageListener<String, String> listener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> arg0) {
logger.debug("We got a message:" + arg0);
}
};
ConcurrentMessageListenerContainer consumer = consumerFactory.getConsumer("mytopic", listener);
consumer.start();
//return messages here
}
Upvotes: 0
Views: 1194
Reputation: 111
Since you are returning Flux, maybe it is a good idea to take a look at https://projectreactor.io/docs/kafka/release/reference/
and use reactive receiver and which returns flux that you can map
Flux<ReceiverRecord<Integer, String>> inboundFlux =
Receiver.create(receiverOptions)
.receive();
Or looking at your code you can just pull them all, map and build flux, but that would be blocking.
Upvotes: 1
Reputation: 174484
When using your own MessageListener
implementation, you have to do the filtering yourself. The RecordFilterStrategy
is used by the FilteringMessageListenerAdapter
when using Spring-created listeners for @RabbitListener
methods.
Upvotes: 0