firas_frikha
firas_frikha

Reputation: 17

Amqprpc alpakka producer not receiving a response back

I´m currently trying to implement alpakka amqp queues, and specifically I´m trying to use rabbitmq as an RPC mechanism. So what I´m trying to do is to send messages from amqprpc flow, and then the consumer will do some calculation and then the result should be sent back to the producer, but I´m not sure what´s not happening correctly, because I´m not getting the result back in the producer.

Here is the producer and consumer code:

this is the producer code:


import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.amqp.scaladsl._
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
object producer extends App {

  implicit val system = ActorSystem("AmqpRpcExample")
  implicit val materializer = ActorMaterializer()

  implicit val ec: ExecutionContext = system.dispatcher

  val amqpConnectionProvider = AmqpDetailsConnectionProvider("localhost", 5672)

  val queueName = "RPC-QUEUE"

  val queueDeclaration = QueueDeclaration(queueName)

  // RPC CLIENT

  val clientFlow: Flow[WriteMessage, ReadResult, Future[String]] = AmqpRpcFlow.atMostOnceFlow(
    AmqpWriteSettings(amqpConnectionProvider)
      .withRoutingKey(queueName)
      .withDeclaration(queueDeclaration)
      .withBufferSize(10)
      .withConfirmationTimeout(200.millis),
    10
  )

  val inputMessages: Source[WriteMessage, NotUsed] = Source(List(
    WriteMessage(ByteString("message 1")),
    WriteMessage(ByteString("message 2")),
    WriteMessage(ByteString("message 3")),
    WriteMessage(ByteString("message 4")),
    WriteMessage(ByteString("message 5")),
  ))


  val responseFutures: Future[Done] = inputMessages
    .via(clientFlow)
    .map(response => {
      println(response.bytes.utf8String)
      response.bytes.utf8String
    })
    .runWith(Sink.foreach(println))

  println("test")
}

And this is the consumer code:

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.amqp.scaladsl._
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl._
import akka.util.ByteString
import com.rabbitmq.client.Envelope
import com.rabbitmq.client.AMQP.BasicProperties

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

object consumer extends App {
  implicit val system = ActorSystem("AmqpRpcExample")
  implicit val materializer = ActorMaterializer()
  implicit val ec: ExecutionContext = system.dispatcher

  val amqpConnectionProvider = AmqpDetailsConnectionProvider("localhost", 5672)

  val queueName = "RPC-QUEUE"

  val queueDeclaration = QueueDeclaration(queueName)

  // RPC SERVER

  val rpcFlow: Flow[(Envelope, BasicProperties), WriteMessage, NotUsed] = Flow[(Envelope, BasicProperties)].map { case (envelope, properties) =>
    val input = envelope
    println(input)
    val output = s"Processed: $input"
    val replyTo = Option(properties.getReplyTo).getOrElse("")
    WriteMessage(ByteString(output)).withRoutingKey(replyTo)
  }




  val amqpSink: Sink[WriteMessage, Future[Done]] = AmqpSink.replyTo(
    AmqpReplyToSinkSettings (amqpConnectionProvider)
  )

  val amqpSource: Source[(Envelope, BasicProperties), NotUsed] = AmqpSource.atMostOnceSource(
    NamedQueueSourceSettings(amqpConnectionProvider, queueName)
      .withDeclaration(queueDeclaration),
    bufferSize = 10
  ).map(msg => (msg.envelope, msg.properties))

  val rpcServer: Future[Done] = amqpSource
    .via(rpcFlow)
    .runWith(amqpSink)

  println("RPC server started")
}

Upvotes: 0

Views: 84

Answers (1)

earthling paul
earthling paul

Reputation: 558

Thx for your example code. It inspired me to have a go at the rpc scenario. I think the cause of the hang is in the rpc server flow part, where the replyTo queue name is not wired properly.

  val rpcFlow: Flow[(Envelope, BasicProperties), WriteMessage, NotUsed] = Flow[(Envelope, BasicProperties)]
    .map { case (envelope, properties) =>
    val input = envelope
    println(input)
    val output = s"Processed: $input"
    val replyTo = Option(properties.getReplyTo).getOrElse("")
    WriteMessage(ByteString(output)).withRoutingKey(replyTo)
  }

I noticed that if I pass the properties of the recieved readResult to the reply msg the roundtrip works and I see replies in the sending client flow.

val rpcFlow: Flow[ReadResult, WriteMessage, NotUsed] = Flow[ReadResult]
  .map { readResult: ReadResult =>
    logger.info(s"RECEIVED on server envelope: ${readResult.envelope}")
    val output = s"Processed: ${readResult.bytes.utf8String}"
    // The on-the-fly created replyTo queue name is in the properties
    WriteMessage(ByteString(output)).withProperties(readResult.properties)
  }

Note that instead of the tuple (Envelope, BasicProperties), I use ReadResult, which contains envelope and properties. The full working example is here: https://github.com/pbernet/akka_streams_tutorial/blob/1f4ccdfad12530565ef557b8b852c9cec86bb078/src/main/scala/alpakka/amqp/AmqpEcho.scala#L96

Upvotes: 0

Related Questions