Rok Purkeljc
Rok Purkeljc

Reputation: 27

Set message priority Spring Integration DSL

I'm trying to set-up a Integration Workflow which publishes messages to RabbitMQ.

I have 2 questions regarding this: 1. Is my Queue Bean Working as I hope it is :) 2. How can I set a message's priority with the outbound-amqp-adapter using Integration DSL?

 @Configuration

public class RabbitConfig {

  @Autowired
  private ConnectionFactory rabbitConnectionFactory;

  @Bean
  TopicExchange worksExchange() {
    return new TopicExchange("work.exchange", true, false);
  }


  @Bean
  Queue queue() {
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-max-priority", 10);
    return new Queue("dms.document.upload.queue", true, false, false, args);
  }

  @Bean
  public RabbitTemplate worksRabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
    template.setExchange("work.exchange");
    template.setRoutingKey("work");
    template.setConnectionFactory(rabbitConnectionFactory);
    return template;
  }


@Configuration
public class WorksOutbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow toOutboundQueueFlow() {
        return IntegrationFlows.from("worksChannel")
                .transform(Transformers.toJson())
                .handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate()))
                .get();
    }

}

UPDATE After beeing able to push the message with the appropriate "Priority header" I can pull the messages according to their priority using the Rabbit Management UI, but I am somehow unable to pull them correctly using spring-amqp consumer...

  @Bean
  public SimpleMessageListenerContainer workListenerContainer() {
    SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(rabbitConnectionFactory);
    container.setQueues(worksQueue());
    container.setConcurrentConsumers(2);
    container.setDefaultRequeueRejected(false);    
    return container;
  }

Upvotes: 0

Views: 813

Answers (1)

Gary Russell
Gary Russell

Reputation: 174769

  1. It looks ok.

  2. Before the .handle(), use an .enrichHeaders(...) with header name IntegrationMessageHeaderAccessor.PRIORITY and an integer value.

EDIT

@SpringBootApplication
public class So49692361Application {

    public static void main(String[] args) {
        SpringApplication.run(So49692361Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(SimpleMessageListenerContainer container, ApplicationContext ctx) {
        return args -> {
            Gate gate = ctx.getBean(Gate.class);
            gate.send(new GenericMessage<>("foo", Collections.singletonMap("foo", 1)));
            gate.send(new GenericMessage<>("bar", Collections.singletonMap("foo", 2)));
            container.start();
        };
    }

    @Bean
    public static IntegrationFlow flow(AmqpTemplate amqpTemplate) {
        return IntegrationFlows.from(Gate.class)
            .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.PRIORITY,
                    "headers.foo"))
                .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("so49692361"))
            .get();
    }

    @Bean
    public Queue queue() {
        return new Queue("so49692361", true, false, false, Collections.singletonMap("x-max-priority", 5));
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory cf) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueues(queue());
        container.setMessageListener(m -> {
            System.out.println(m);
        });
        container.setAutoStartup(false);
        return container;
    }

    public interface Gate {

        public void send(Message<?> message);

    }

}

and

(Body:'bar' MessageProperties [headers={errorChannel=, foo=2, priority=2}, ...    
(Body:'foo' MessageProperties [headers={errorChannel=, foo=1, priority=1}, ...

Upvotes: 1

Related Questions