manuel mourato
manuel mourato

Reputation: 799

RabbitMQ - How to extract message body from consumer tag with DefaultConsumer?

I am trying to do a simple application with a RabbitMQ Publisher and Consumer, the producer is written in Java and the consumer is in Scala.

This is the Publisher I am using, which successfully inserts data into the "queue1" queue in the rabbitMQ server:

String queue="queue1";


String exchange="queue1";
String routing_key="queue1";
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queue, true, false, false, null);
channel.exchangeDeclare(exchange, "direct");
channel.queueBind(queue,exchange,routing_key);
channel.basicQos(1);

String msg = "Hello";

channel.basicPublish(exchange,routing_key_one, new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(1).priority(0).
build(),msg.getBytes());
channel.close();
connection.close();

The issue now, is that I have a Consumer on a separate script, but I can't make it return the "msg" string that I sent from the publisher; I can only get the consumer-tag as a return value. This is my Consumer:

import com.rabbitmq.client.{ConnectionFactory,Connection,Channel,Consumer,DefaultConsumer,Envelope,BasicProperties}

val rabbit_host="localhost"
val queue="queue1"
val exchange="queue1"
val routing_key="queue1"

val factory: ConnectionFactory = new ConnectionFactory()
factory.setHost (rabbit_host)

val connection: Connection = factory.newConnection()
val channel: Channel = connection.createChannel()

channel.queueBind(queue_name, exchange, routing_key, null)
channel.basicQos(1)

val consumer:Consumer=new DefaultConsumer(channel){
  def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]):String ={
    new String(body,"UTF-8")
  }
}


val msg: String = channel.basicConsume(queue_name,true, consumer)


channel.close()
connection.close()

If I then print "msg", this is the value I get: "amq.ctag-miRiLnyTcsq9MwzHyVshZw"

So my question is, is there a way for me to get "hello" (the original value) as the return for the basicConsume function?

Thank you for your time.

Upvotes: 0

Views: 2121

Answers (1)

CucumisSativus
CucumisSativus

Reputation: 320

According to documentation

val msg: String = channel.basicConsume(queue_name,true, consumer)

Returns

the consumerTag associated with the new consumer

Due to asynchronous nature of rabbitMq it is not easy to wait for a message. You can try using Promise and block for a result

val p = Promise[String]()
val f = p.future

val consumer:Consumer=new DefaultConsumer(channel){
  def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]):String ={
    val parsedBody = new String(body,"UTF-8")
    p.success(parsedBody)
  }
}
val msg: String = Await.result(f, 5 seconds)

Upvotes: 0

Related Questions