Vistritium
Vistritium

Reputation: 26

How to create akka source that is materialized to ActorRef in which the incoming messages know the sender

  val ref = Source.actorRef[String](
    completionMatcher =  PartialFunction.empty,
    failureMatcher = PartialFunction.empty,
    100000,
    OverflowStrategy.dropNew
  ).to(Sink.foreachAsync(1){ elem =>
    // how to reply to sender
    Future.successful()
  })

Above is example that does nearly what I need, with the exception that the underlying message does not know the sender. So it's impossible to reply. Is there a way or pattern which would allow me to reply to the sender, so that it can be used with ask pattern like:

  import akka.pattern.ask
  (ref ? "request").onComplete {
    case Failure(exception) => logger.error(s"Couldn't receive response", exception)
    case Success(value) => logger.info(s"Received response ${value}")
  }

Upvotes: 0

Views: 87

Answers (1)

Levi Ramsey
Levi Ramsey

Reputation: 20541

This isn't possible with the Classic actor-based API and Source.actorRef, which as you note discards the sender.

However, if using Akka 2.6+, one could use the Typed ask pattern in conjunction with akka.actor.typed.scaladsl.adapter.ClassicActorRefOps to include the sending actor (in this case the synthetic actor for the ask) in the message being sent to Source.actorRef.

For your example, you would rewrite the stream as something like:

import akka.actor.typed.{ ActorRef => TypedActorRef }

val ref = Source.actorRef[(String, TypedActorRef[Any])](
  completionMatcher =  PartialFunction.empty,
  failureMatcher = PartialFunction.empty,
  100000,
  OverflowStrategy.dropNew
).to(Sink.foreach { // Sink.foreachAsync(1) isn't doing anything here...
  case (elem, sender) =>
    // do stuff with elem?
    sender ! "response"
}

And then to ask

import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Scheduler
import akka.actor.typed.scaladsl.adapter.{ ClassicActorRefOps, ClassicActorSystemOps }

// actorSystem is the classic ActorSystem in use
implicit val scheduler: Scheduler = actorSystem.toTyped.scheduler

ref.toTyped.ask[Any]("request" -> _).onComplete {
  case Failure(exception) => logger.error(s"Couldn't receive response", exception)
  case Success(value) => logger.info(s"Received response ${value}")
}

Upvotes: 2

Related Questions