ashley
ashley

Reputation: 1038

spring kafka - multiple consumer reading from a single topic

I am building a web application using spring boot and now I have this requirement of receiving real-time notifications. I am planning to use apache kafka as a message broker for this. The requirement is such that there are users with different roles and based on the role, they should receive notifications of what other users are doing.
I did set up a single producer and consumer and as a consumer, I could receive the information published to a topic let's say topic1.
The part where I am stuck is that I could have several users listening to the same topic and each user should get the message published to that topic. I understand that for this requirement, we need to set different group.id for each kafkalistener so that each consumer can get the message.
But how I am going to create a kafkalistener with a different group id when a user is logged in? Hope someone can provide some guidance on that? Thank you

Upvotes: 1

Views: 3888

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

Simply create a new KafkaMessageListenerContainer each time and start/stop it as needed.

You can use Boot's auto-configured ConcurrentKafkaListenerContainerFactory to create containers. Just set the groupId container property to make them unique.

EDIT

Here's an example:

@SpringBootApplication
public class So60150686Application {

    public static void main(String[] args) {
        SpringApplication.run(So60150686Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so60150686", "foo");
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
    }

}

@RestController
class Web {

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    @GetMapping(path="/foo/{group}")
    public String foo(@PathVariable String group) {
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
        container.getContainerProperties().setGroupId(group);
        container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {

            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                System.out.println(record);
            }

        });
        container.start();
        return "ok";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200 
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60

ok

2020-02-10 14:42:09.744 INFO 34096 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : bar: partitions assigned: [so60150686-0]

ConsumerRecord(topic = so60150686, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1581363648938, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)

Upvotes: 2

Related Questions