Reputation: 26
val ref = Source.actorRef[String](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
100000,
OverflowStrategy.dropNew
).to(Sink.foreachAsync(1){ elem =>
// how to reply to sender
Future.successful()
})
Above is example that does nearly what I need, with the exception that the underlying message does not know the sender. So it's impossible to reply. Is there a way or pattern which would allow me to reply to the sender, so that it can be used with ask pattern like:
import akka.pattern.ask
(ref ? "request").onComplete {
case Failure(exception) => logger.error(s"Couldn't receive response", exception)
case Success(value) => logger.info(s"Received response ${value}")
}
Upvotes: 0
Views: 87
Reputation: 20541
This isn't possible with the Classic actor-based API and Source.actorRef
, which as you note discards the sender.
However, if using Akka 2.6+, one could use the Typed ask pattern in conjunction with akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
to include the sending actor (in this case the synthetic actor for the ask) in the message being sent to Source.actorRef
.
For your example, you would rewrite the stream as something like:
import akka.actor.typed.{ ActorRef => TypedActorRef }
val ref = Source.actorRef[(String, TypedActorRef[Any])](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
100000,
OverflowStrategy.dropNew
).to(Sink.foreach { // Sink.foreachAsync(1) isn't doing anything here...
case (elem, sender) =>
// do stuff with elem?
sender ! "response"
}
And then to ask
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Scheduler
import akka.actor.typed.scaladsl.adapter.{ ClassicActorRefOps, ClassicActorSystemOps }
// actorSystem is the classic ActorSystem in use
implicit val scheduler: Scheduler = actorSystem.toTyped.scheduler
ref.toTyped.ask[Any]("request" -> _).onComplete {
case Failure(exception) => logger.error(s"Couldn't receive response", exception)
case Success(value) => logger.info(s"Received response ${value}")
}
Upvotes: 2