Reputation: 1038
I am building a web application using spring boot and now I have this requirement of receiving real-time notifications. I am planning to use apache kafka as a message broker for this.
The requirement is such that there are users with different roles and based on the role, they should receive notifications of what other users are doing.
I did set up a single producer and consumer and as a consumer, I could receive the information published to a topic let's say topic1.
The part where I am stuck is that I could have several users listening to the same topic and each user should get the message published to that topic. I understand that for this requirement, we need to set different group.id for each kafkalistener so that each consumer can get the message.
But how I am going to create a kafkalistener with a different group id when a user is logged in?
Hope someone can provide some guidance on that?
Thank you
Upvotes: 1
Views: 3888
Reputation: 174484
Simply create a new KafkaMessageListenerContainer
each time and start/stop it as needed.
You can use Boot's auto-configured ConcurrentKafkaListenerContainerFactory
to create containers. Just set the groupId
container property to make them unique.
EDIT
Here's an example:
@SpringBootApplication
public class So60150686Application {
public static void main(String[] args) {
SpringApplication.run(So60150686Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so60150686", "foo");
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
}
}
@RestController
class Web {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
@GetMapping(path="/foo/{group}")
public String foo(@PathVariable String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
container.getContainerProperties().setGroupId(group);
container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(record);
}
});
container.start();
return "ok";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60
ok
2020-02-10 14:42:09.744 INFO 34096 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : bar: partitions assigned: [so60150686-0]
ConsumerRecord(topic = so60150686, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1581363648938, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)
Upvotes: 2