Reputation: 540
I've been messing around with RabbitMQ and ran into a particular issue regarding publishing messages to a channel from the callback of a consumer. While I'm able to acknowledge the message fine, attempting to publish a message to the channel does nothing.
Below is an example:
// Stuff here...
final Connection connection = connectionFactory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos(16);
channel.basicConsume("transcoding", false, "someConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
executorService.submit(() -> {
TestClass testClass = new TestClass();
testClass.addObserver((o, arg) -> {
// And stuff here...
channel.basicAck(envelope.getDeliveryTag(), true);
channel.basicPublish("", "notification", null, "message");
});
testClass.run();
})
}
});
As shown, I'm handling messages on a separate thread using an ExecutoreService
. I have a class that implements Runnable
and extends Observable
. Once TestClass
has completed its work, the observer is notified, and I manually acknowledge the message. This works fine.
I now want to publish a new message, however RabbitMQ never receives the published message. I've tried creating a new channel and using that for publishing, but that doesn't work either. Next, I thought that it may be a threading issue, but publishing on a separate thread also does not work.
I was going to try to create separate RabbitMQ connection just for publishing, but this makes little sense to me as channels should be unidirectional. I've read through RabbitMQ's notes on concurrency, and don't see anything that sticks out.
What am I missing?
Upvotes: 0
Views: 620
Reputation: 540
So it turns out that that the issue was the message payload. As an aside, I've since created two separate channels - one for consumers, and another for producers.
Upvotes: 0