Alex
Alex

Reputation: 2030

RabbitMQ separate listeners by type

I have POJO which represents a message to Rabbit MQ. There is an integer which is responsible for the type of the message(whether it's update, remove, add and so on):

public class Message {
    private String field1;
    private String field2;

    private Integer type;
    ...
    <some other fields>
}

I have a consumer which accepts such messages in my spring boot app. So in order to handle each type separately, I have to add some switch/case construction in my code.

Are there any more clear solutions for such case?

Upvotes: 0

Views: 406

Answers (1)

Gary Russell
Gary Russell

Reputation: 174544

You can use Spring Integration with a router...

Rabbit Inbound channel adapter -> router -> 

Where the router routes to a different service activator (method) based on the type.

EDIT

Here's an example:

@SpringBootApplication
public class So47272336Application {

    public static void main(String[] args) {
        SpringApplication.run(So47272336Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate rabbitTemplate) {
        return args -> {
            rabbitTemplate.convertAndSend("my.queue", new Domain(1, "foo"));
            rabbitTemplate.convertAndSend("my.queue", new Domain(2, "bar"));
            rabbitTemplate.convertAndSend("my.queue", new Domain(3, "baz"));
        };
    }

    @Bean
    public Queue queue() {
        return new Queue("my.queue");
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "my.queue"))
                .route("payload.type", r -> r
                        .subFlowMapping("1", f -> f.handle("bean", "add"))
                        .subFlowMapping("2", f -> f.handle("bean", "remove"))
                        .subFlowMapping("3", f -> f.handle("bean", "update")))
                .get();
    }

    @Bean
    public MyBean bean() {
        return new MyBean();
    }

    public static class MyBean {

        public void add(Domain object) {
            System.out.println("Adding " + object);
        }

        public void remove(Domain object) {
            System.out.println("Removing " + object);
        }

        public void update(Domain object) {
            System.out.println("Updating " + object);
        }

    }

    public static class Domain implements Serializable {

        private final Integer type;

        private final String info;

        public Domain(Integer type, String info) {
            this.type = type;
            this.info = info;
        }

        public Integer getType() {
            return this.type;
        }

        public String getInfo() {
            return this.info;
        }

        @Override
        public String toString() {
            return "Domain [type=" + this.type + ", info=" + this.info + "]";
        }

    }

}

Upvotes: 1

Related Questions