sarah w
sarah w

Reputation: 3515

how to use ActorSink.actorRefWithBackpressure in akka stream using akka typed

i am trying to learn akka streams using akka typed the documentations is bit abstract when it comes to akka typed

the Sink.actorRefWithBackpressure example is pretty straightforward and easy to understand where as ActorSink.actorRefWithBackpressure example is abstract

in the first example we have AckingReceiver actor which does the required work but when it comes to second example

there is no implementation of case classes as it was in AckingReceiver

val actor: ActorRef[Protocol] = targetActor()

i have seen this code some where but i am unable to understand it as well

def targetActor(): ActorRef[Protocol] = ???

how can we provide the implementation of target actor which deals with case classes any help would be appreciated

Upvotes: 1

Views: 694

Answers (1)

Levi Ramsey
Levi Ramsey

Reputation: 20591

The ActorRef[Protocol] is a typed actor like any other. Obtaining an ActorRef outside of the ActorSystem in typed is somewhat more involved than in classic, which is likely why the docs elide that (as it's not important for explaining how to use ActorSink.actorRefWithBackpressure).

Typically you would setup a typed ActorSystem and ask that ActorSystem for an ActorRef:

import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl._

object MainSystem {
  sealed trait Command
  case class ObtainProtocolActor(replyTo: ActorRef[ProtocolActorIs])

  sealed trait Reply
  case class ProtocolActorIs(actor: ActorRef[Protocol])

  def apply(): Behavior[Command] =
    Behaviors.receive { (context, msg) =>
      case ObtainProtocolActor(replyTo) =>
        val protocolActor: ActorRef[Protocol] = context.spawnAnonymous(
          // Define the protocol actor
          Behaviors.receive[Protocol] { (context, msg) =>
            case Init(ackTo) =>
              println(s"Actor ${context.self.path} initializing")
              ackTo ! Ack
              Behaviors.same
            case Message(ackTo, msg) =>
              println(s"Actor ${context.self.path} received $msg")
              ackTo ! Ack
              Behaviors.same
            case Complete =>
              context.stop()  // Delayed until the message is processed
              ackTo ! Ack
              Behaviors.same
            case Fail(ex) =>
              println(s"Actor ${context.self.path} got failure from stream: ${ex.getMessage}")
              Behaviors.same
          })
        context.watch(protocolActor)
        replyTo ! ProtocolActorIs(protocolActor)
    }.receiveSignal {
      case (context, Terminated(ref)) =>
        println(s"Actor ${ref.path} terminated")
    }
}

val actorSystem = ActorSystem(MainSystem(), "main")

def targetActor(): ActorRef[Protocol] = Await.result(
  actorSystem.ask(MainSystem.ObtainProtocolActor(_)).map(_.replyTo),
  15.second
)

This shows probably the two biggest practical, but perhaps non-obvious, differences between classic and typed:

  • the ActorSystem in typed is an actor (it's actually possible to have an ActorRef[Protocol] in this example be the ActorSystem, though it's unlikely you'd actually want to do this)
  • the ask pattern is changed in a fairly dramatic way

Upvotes: 2

Related Questions