Clifford M. Roche
Clifford M. Roche

Reputation: 51

How do I send message to ActorRef at start of Akka-Stream 2.0 flow?

The goal is to send the WSConnectEvent once a client is connected and the stream starts. With akka-streams 1.0 I was able to accomplish this with the following:

Flow(Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)) {
  implicit builder =>
    sdpSource =>

      // Incoming SDP offer flow
      val fromWebsocket = builder.add(Flow[Message].collect {
        case TextMessage.Strict(txt) => {
          val event = txt.parseJson.convertTo[WSResponseEvent]
          WSMessageEvent(callUUID, userUUID, event.id, event.data)
        }
      })

      // Outgoing SDP answer flow
      val toWebsocket = builder.add(Flow[WSResponseEvent].map {
        case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
      })

      val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
      val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
      val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))

      // Join events, also sends actor for sending stuff
      val merge = builder.add(Merge[CallControlEvent](2))
      val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID, userUUID, actor))

      fromWebsocket ~> merge.in(0)
      actorAsSource ~> merge.in(1)

      merge ~> callActorSink
      sdpSource ~> toWebsocket
      (fromWebsocket.inlet, toWebsocket.outlet)
}

In trying to upgrade this to work with Akka-Streams 2.0.1 I changed to the following code, but I am no loner receiving the WSConnectEvent message. I'm not sure if this is because my source is setup incorrectly, or I am not materializing the ActorRef properly.

val sdpSource = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)

Flow.fromGraph(
  GraphDSL.create() { implicit builder =>

    // Incoming SDP offer flow
    val fromWebsocket = builder.add(Flow[Message].collect {
      case TextMessage.Strict(txt) => {
        val event = txt.parseJson.convertTo[WSResponseEvent]
        WSMessageEvent(callUUID, userUUID, event.id, event.data)
      }
    })

    // Outgoing SDP answer flow
    val toWebsocket = builder.add(Flow[WSResponseEvent].map {
      case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
    })

    val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
    val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
    val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))

    // Join events, also sends actor for sending stuff
    val merge = builder.add(Merge[CallControlEvent](2))
    val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID, userUUID, _))

    fromWebsocket ~> merge.in(0)
    actorAsSource ~> merge.in(1)

    merge ~> callActorSink
    sdpSource ~> toWebsocket
    FlowShape(fromWebsocket.in, toWebsocket.out)
  }
)

Upvotes: 4

Views: 1612

Answers (2)

Clifford M. Roche
Clifford M. Roche

Reputation: 51

Thanks to the help of johanandren, mapMaterializedValue was not the correct approach, instead I needed to construct a flow to send the WSConnectEvent and connect the output of builder.materializeValue through it through to the 'merge' in port, like so:

// Join events, also sends actor for sending stuff
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))

builder.materializedValue ~> actorConnected ~> merge.in(1)

The full working example:

val sdpSource: Source[WSResponseEvent, ActorRef] = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)

Flow.fromGraph(GraphDSL.create(sdpSource) {
  implicit builder =>
    { (responseSource) =>
      import GraphDSL.Implicits._

      // Incoming SDP offer flow
      val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match {
        case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_))
        case bm: BinaryMessage =>
          bm.dataStream.runWith(Sink.ignore)
          Future.successful(None)
      }).collect {
        case Some(txt) => {
          val event = txt.parseJson.convertTo[WSResponseEvent]
          WSMessageEvent(callUUID, userUUID, event.id, event.data)
        }
      })

      // Outgoing SDP answer flow
      val toWebsocket = builder.add(Flow[WSResponseEvent].map {
        case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
      })

      val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
      val callActorRef = Await.result(callActorSelection.resolveOne(), 2.minutes);
      val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID)))

      // Join events, also sends actor for sending stuff
      val merge = builder.add(Merge[CallControlEvent](2))
      val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))

      fromWebsocket ~> merge.in(0)
      builder.materializedValue ~> actorConnected ~> merge.in(1)

      merge ~> toCallActor
      responseSource ~> toWebsocket

      FlowShape.of(fromWebsocket.in, toWebsocket.out)
    }
})

Upvotes: 1

johanandren
johanandren

Reputation: 11479

The call to sdpSource.mapMaterializedValue(...) only transforms the materialized value from one type (ActorRef to WSConnectEvent), it does not in any way emit it as an element from the Source.

Source.materializedValue however, provides a source that will emit the materialized value once the graph is materialized.

So, what you want to do is:

fromWebsocket                   ~> merge.in(0)
actorAsSource.materializedValue ~> merge.in(1)

Upvotes: 0

Related Questions