jarvis
jarvis

Reputation: 25

RabbitMQ Super Streams | Spring Cloud Stream Consumer Groups using RabbitMQ

I have 2 services (service A and Service B) which need to consume same data. I can achieve this by using queue type as stream. But at the same time, during auto-scaling I dont want service A1 and service A2 (2 instances) to consume the same data.

I found that by implementing super stream I can achieve this but I dont find any good example to work around and using spring cloud stream is also possible but again not able to find correct resource.

Can anyone please provide me a good resource or an example to achieve this using rabbitmq?

Upvotes: -2

Views: 231

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

I am not aware of any spring-cloud-stream examples but, generally, SCSt apps are agnostic about the underlying transport (that's one of its goals).

However, the Spring AMQP project documentation has examples of how to use the spring-rabbit-stream library (both for normal and super streams).

https://docs.spring.io/spring-amqp/docs/current/reference/html/#stream-support

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}

(You would need to read the rest of the Spring AMQP docs to understand what a listener container is).

Upvotes: 2

Related Questions