J.T.
J.T.

Reputation: 129

Need help using Rabbit annotations to consume data in Spring Boot with AMQP

I'm new to Spring and RabbitMQ and have figured out how to consume messages using something like this:

CONFIGURATION FILE:

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfiguration {

    private String queueName;

    @Autowired
    private ConnectionFactory connectionFactory;

    public void setQueueName(final String queueName) {
        this.queueName = queueName;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(this.connectionFactory);
    }

    @Bean
    public MessageConverter messageConverter() {
        final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        return converter;
    }

    @Bean
    public SimpleMessageListenerContainer listenerContainer() {
        final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(this.connectionFactory);
        container.setQueueNames(this.queueName);
        container.setMessageListener(messageListenerAdapter());

        return container;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(this, messageConverter());
    }

}

CONSUMER FILE:

import org.springframework.stereotype.Component;

import com.vts.cped.basic.TopicNames;
import com.vts.cped.correlator.data.CorrelatedData;
import com.vts.cped.main.config.BaseReceiver;

@Component
public class Consumer {

    public void handleMessage(final SomeData data) {
        System.out.println("Consumer received SomeData with data " + data.getData());
    }
}

I'm trying to accomplish the same thing with RabbitMQ annotations in hopes that it will make the code cleaner. I've attempted to do this with the following, but I cannot get it to work:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class Receiver {

    @RabbitListener(queues = "tut.hello")
    public void process(final SomeData in) {
    System.out.println(" [x] Received '" + in + "'");
    }

}

I had originally tried something like this, but it also didn't work:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@RabbitListener(queues = "tut.hello")
@Service
public class Receiver {

    @RabbitHandler
    public void process(final SomeData in) {
    System.out.println(" [x] Received '" + in + "'");
    }

}

If you have the secret sauce, please send me some! Thank you for any help you can provide.

Upvotes: 2

Views: 8085

Answers (2)

J.T.
J.T.

Reputation: 129

I found the fix, I was missing this:

@Bean
public RabbitTemplate rabbitTemplate() {
    final RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
    template.setMessageConverter(messageConverter());
    return template;
}

Overall, this is my configuration file:

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class CommonConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(this.connectionFactory);
    }

    @Bean
    public MessageConverter messageConverter() {
        final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        return converter;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(this.connectionFactory);
        factory.setMessageConverter(messageConverter());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        final RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
        template.setMessageConverter(messageConverter());
        return template;
    }

    @Bean
    public Queue helloQueue() {
        return new Queue("tut.hello");
    }

    @Bean
    public Sender sender() {
        return new Sender();
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

Upvotes: 0

Mathias Dpunkt
Mathias Dpunkt

Reputation: 12184

There is a statement in the documentation:

To enable support for @RabbitListener annotations add @EnableRabbit to one of your @Configuration classes.

http://docs.spring.io/spring-amqp/docs/1.6.1.RELEASE/reference/html/_reference.html#async-annotation-driven-enable

You may also still need a SimpleRabbitListenerContainerFactory bean to register the Jackson2JsonMessageConverter.

http://docs.spring.io/spring-amqp/docs/1.6.1.RELEASE/reference/html/_reference.html#async-annotation-conversion

Upvotes: 1

Related Questions