Reputation: 1625
I have a need where I listen message from queue named Metadata - and then based on that message I will have to read some queue lets call it dataQ (name of that queue will be in metadata message). To read Metadata I can use rabbit listener but after that I have to read that other messages from dataQ so one way I can do manual pull - but I wanted to have something more cleaner like rabbit listener that way I dont have to manage channel, ack etc. But since queue name is not known until we read message from metadata queue, trying to explore other solutions. This dataQ can be 1000 different queues names so we have to listen that dataQ dynamically.
Also ack should work like this - read message from Metadata queue, process given dataQ - sends acks for messages in dataQ (dataQ may have more than 1 message) and send ack for Metadata queue.
(If this works good for single consumer then I can add container model and process more than one message from Metadata queue meaning I will be able to process more than one data queue at same time.)
Updates as suggested, confused about how to get event in main listener and like how that flag will work with concurrency (sorry not used application event extensively so far)
package com.example;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
@Configuration
public class MyListener {
@Autowired
ConnectionFactory connectionFactory;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@RabbitListener(queues = "Metadata")
public void messageProcessing(String c) {
System.out.println(c);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(c);
container.setMessageListener(new MessageListenerAdapter(new DataHandler()));
container.setApplicationEventPublisher(applicationEventPublisher);
container.setIdleEventInterval(5000);
container.start();
// how to get container idle event here
// so we can call container.stop();
}
public class DataHandler {
public void handleMessage(byte[] text) {
System.out.println("Data Received: " + text);
}
}
@EventListener
public void onApplicationEvent(ApplicationEvent event) {
//I am getting idle event here
System.out.println(event.getSource());
}
}
Upvotes: 1
Views: 1007
Reputation: 174739
It would be pretty easy to fire up a new SimpleMessageListenerContainer
in the metadata listener, to process the data; but you can't ack the original message from a different listener.
You would have to hold up the metadata thread until the secondary listener completes, then release the metadata thread so it acks the original message. You could use a container idle event to detect the work is complete (unless you have some other mechanism to know all is finished).
Set the concurrency on the metadata listener container to determine how many you want to process this way concurrently.
@RabbitListener(queues = "meta")
public void handle(SomeObject message) {
// extract dataQ
// create a new SimpleMessageListenerContainer
// Inject an ApplicationEventPublisher instance
// start the container
// block here, waiting for a container idle event
// stop the container
return;
}
Bear in mind, though, that if the server crashes, the metadata message will be redelivered (by default) and you might have already processed some of the data messages.
EDIT
Regarding your comment below, I meant use your own publisher, so you don't have to figure out which container the event came from...
@SpringBootApplication
public class So42459257Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42459257Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("meta", "foo");
template.convertAndSend("foo", "baz");
template.convertAndSend("foo", "baz");
template.convertAndSend("meta", "bar");
template.convertAndSend("bar", "qux");
template.convertAndSend("bar", "qux");
context.getBean(So42459257Application.class).testCompleteLatch.await(10, TimeUnit.SECONDS);
context.close();
}
private final CountDownLatch testCompleteLatch = new CountDownLatch(2);
@Autowired
private ConnectionFactory connectionFactory;
@RabbitListener(queues = "meta")
public void handleMeta(final String queue) throws Exception {
System.out.println("Started processing " + queue);
final CountDownLatch startedLatch = new CountDownLatch(1);
final CountDownLatch finishedLatch = new CountDownLatch(1);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
container.setQueueNames(queue);
container.setMessageListener(new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public void handleMessage(String in) {
startedLatch.countDown();
System.out.println("Received " + in + " from " + queue);
}
}));
container.setIdleEventInterval(5000);
container.setApplicationEventPublisher(new ApplicationEventPublisher() {
@Override
public void publishEvent(Object event) {
}
@Override
public void publishEvent(ApplicationEvent event) {
if (event instanceof ListenerContainerIdleEvent) {
finishedLatch.countDown();
}
};
});
container.afterPropertiesSet();
container.start();
if (startedLatch.await(60, TimeUnit.SECONDS)) {
// handle container didn't receive any messages
}
if (finishedLatch.await(60, TimeUnit.SECONDS)) {
// handle container didn't go idle
}
System.out.println("Finished processing " + queue);
container.stop();
this.testCompleteLatch.countDown();
}
@Bean
public Queue meta() {
return new Queue("meta", false, false, true);
}
@Bean
public Queue foo() {
return new Queue("foo", false, false, true);
}
@Bean
public Queue bar() {
return new Queue("bar", false, false, true);
}
}
With the listener container concurrency set to 2 (with spring boot simply add
spring.rabbitmq.listener.concurrency=2
to application properties); if you are not using boot configure the factory yourself.
Result:
Started processing bar
Started processing foo
Received baz from foo
Received qux from bar
Received baz from foo
Received qux from bar
Finished processing bar
Finished processing foo
Upvotes: 1