Yevhenii Popadiuk
Yevhenii Popadiuk

Reputation: 1054

How to wire input and output when creating flow for web socket?

Assuming I have an actor UserActor that know how to work with incoming messages and how to send new ones, I want to handle web sockets inside Akka-Http, so I creating Flow[Message, Message, NotUsed].

Here we getting new messages as JSON and sending them to UserActor. When source completed I receive SourceDied message:

    val incomingMessages: Sink[Message, NotUsed] =
      Flow[Message]
        .mapAsync(1) {
          case TextMessage.Strict(text)  => Future.successful(text)
          case TextMessage.Streamed(msg) => msg.runFold("")(_ + _)
        }
        .map(decode[IncomingMessage])
        .collect { case Right(msg) => msg }
        .map(_.toMessage)
        .to(Sink.actorRef[ChatMessage](userActor, SourceDied))

Here I register out for my UserActor at which It'll send messages:

    val outgoingMessages: Source[Message, NotUsed] =
      Source
        .actorRef[ChatMessage](20, OverflowStrategy.fail)
        .mapMaterializedValue { outActor =>
          userActor ! Connect(outActor)
          NotUsed
        }
        .map((x: ChatMessage) => OutgoingMessage.fromMessage(x))
        .map((outMsg: OutgoingMessage) => TextMessage(outMsg.asJson.toString))

    Flow.fromSinkAndSource(incomingMessages, outgoingMessages)

However, UserActor is one per user, and each user can have multiple sockets open simultaneously. So I just collecting outs to set inside UserActor and sending info to each of them. It works nice.

But when source sending me terminating message (SourceDied in my case), I don't know to which exactly out this source was assigned — and I can't decide which out I should inform about completion and then delete from my outs set.

Upvotes: 1

Views: 218

Answers (2)

Yevhenii Popadiuk
Yevhenii Popadiuk

Reputation: 1054

So, as I understood from @chunjef answer, there is no direct way to do so. And personally I decided not to use random ID generation, but to use one more Actor between socket and UserActor.

Basically, now I have SocketHandlerActor(userActor: ActorRef) to replace UserActor in socket creation part. It just connects to UserActor and sending all messages between socket and UserActor.

But when SocketHandlerActor receives SourceDied message, it just do stuff and then kills out and self with PoisionPill. UserActor removes it from own outs list, as soon as receives Termination message.

Upvotes: 0

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

One idea is to change your Flow to take a unique identifier for each connection:

def websocketFlow(connectionId: String): Flow[Message, Message, NotUsed] = {
  val incomingMessages: Sink[Message, NotUsed] =
    ...
    .to(Sink.actorRef[ChatMessage](userActor, SourceDied(connectionId)))

  val outgoingMessages: Source[Message, NotUsed] =
    Source
      .actorRef[ChatMessage](20, OverflowStrategy.fail)
      .mapMaterializedValue { outActor =>
        userActor ! Connect(connectionId, outActor)
        NotUsed
      }
      ...

  Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}

Obviously you would need to adjust the SourceDied and Connect messages to include a connection ID (which in this case could be generated with something like java.util.UUID.randomUUID.toString, for example). Then in UserActor, replace the Set with a Map, the keys for which are the connection IDs. Using a Map will enable you to look up connection actors and remove them as needed.

Upvotes: 1

Related Questions