Reputation: 17
I´m currently trying to implement alpakka amqp queues, and specifically I´m trying to use rabbitmq as an RPC mechanism. So what I´m trying to do is to send messages from amqprpc flow, and then the consumer will do some calculation and then the result should be sent back to the producer, but I´m not sure what´s not happening correctly, because I´m not getting the result back in the producer.
Here is the producer and consumer code:
this is the producer code:
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.amqp.scaladsl._
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
object producer extends App {
implicit val system = ActorSystem("AmqpRpcExample")
implicit val materializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher
val amqpConnectionProvider = AmqpDetailsConnectionProvider("localhost", 5672)
val queueName = "RPC-QUEUE"
val queueDeclaration = QueueDeclaration(queueName)
// RPC CLIENT
val clientFlow: Flow[WriteMessage, ReadResult, Future[String]] = AmqpRpcFlow.atMostOnceFlow(
AmqpWriteSettings(amqpConnectionProvider)
.withRoutingKey(queueName)
.withDeclaration(queueDeclaration)
.withBufferSize(10)
.withConfirmationTimeout(200.millis),
10
)
val inputMessages: Source[WriteMessage, NotUsed] = Source(List(
WriteMessage(ByteString("message 1")),
WriteMessage(ByteString("message 2")),
WriteMessage(ByteString("message 3")),
WriteMessage(ByteString("message 4")),
WriteMessage(ByteString("message 5")),
))
val responseFutures: Future[Done] = inputMessages
.via(clientFlow)
.map(response => {
println(response.bytes.utf8String)
response.bytes.utf8String
})
.runWith(Sink.foreach(println))
println("test")
}
And this is the consumer code:
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.amqp.scaladsl._
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl._
import akka.util.ByteString
import com.rabbitmq.client.Envelope
import com.rabbitmq.client.AMQP.BasicProperties
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
object consumer extends App {
implicit val system = ActorSystem("AmqpRpcExample")
implicit val materializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher
val amqpConnectionProvider = AmqpDetailsConnectionProvider("localhost", 5672)
val queueName = "RPC-QUEUE"
val queueDeclaration = QueueDeclaration(queueName)
// RPC SERVER
val rpcFlow: Flow[(Envelope, BasicProperties), WriteMessage, NotUsed] = Flow[(Envelope, BasicProperties)].map { case (envelope, properties) =>
val input = envelope
println(input)
val output = s"Processed: $input"
val replyTo = Option(properties.getReplyTo).getOrElse("")
WriteMessage(ByteString(output)).withRoutingKey(replyTo)
}
val amqpSink: Sink[WriteMessage, Future[Done]] = AmqpSink.replyTo(
AmqpReplyToSinkSettings (amqpConnectionProvider)
)
val amqpSource: Source[(Envelope, BasicProperties), NotUsed] = AmqpSource.atMostOnceSource(
NamedQueueSourceSettings(amqpConnectionProvider, queueName)
.withDeclaration(queueDeclaration),
bufferSize = 10
).map(msg => (msg.envelope, msg.properties))
val rpcServer: Future[Done] = amqpSource
.via(rpcFlow)
.runWith(amqpSink)
println("RPC server started")
}
Upvotes: 0
Views: 84
Reputation: 558
Thx for your example code. It inspired me to have a go at the rpc scenario.
I think the cause of the hang is in the rpc server flow part, where the replyTo
queue name is not wired properly.
val rpcFlow: Flow[(Envelope, BasicProperties), WriteMessage, NotUsed] = Flow[(Envelope, BasicProperties)]
.map { case (envelope, properties) =>
val input = envelope
println(input)
val output = s"Processed: $input"
val replyTo = Option(properties.getReplyTo).getOrElse("")
WriteMessage(ByteString(output)).withRoutingKey(replyTo)
}
I noticed that if I pass the properties
of the recieved readResult
to the reply msg the roundtrip works and I see replies in the sending client flow.
val rpcFlow: Flow[ReadResult, WriteMessage, NotUsed] = Flow[ReadResult]
.map { readResult: ReadResult =>
logger.info(s"RECEIVED on server envelope: ${readResult.envelope}")
val output = s"Processed: ${readResult.bytes.utf8String}"
// The on-the-fly created replyTo queue name is in the properties
WriteMessage(ByteString(output)).withProperties(readResult.properties)
}
Note that instead of the tuple (Envelope, BasicProperties)
, I use ReadResult
, which contains envelope
and properties
. The full working example is here:
https://github.com/pbernet/akka_streams_tutorial/blob/1f4ccdfad12530565ef557b8b852c9cec86bb078/src/main/scala/alpakka/amqp/AmqpEcho.scala#L96
Upvotes: 0