Reputation: 1054
Assuming I have an actor UserActor
that know how to work with incoming messages and how to send new ones, I want to handle web sockets inside Akka-Http, so I creating Flow[Message, Message, NotUsed]
.
Here we getting new messages as JSON and sending them to UserActor
. When source completed I receive SourceDied
message:
val incomingMessages: Sink[Message, NotUsed] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(text) => Future.successful(text)
case TextMessage.Streamed(msg) => msg.runFold("")(_ + _)
}
.map(decode[IncomingMessage])
.collect { case Right(msg) => msg }
.map(_.toMessage)
.to(Sink.actorRef[ChatMessage](userActor, SourceDied))
Here I register out
for my UserActor
at which It'll send messages:
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[ChatMessage](20, OverflowStrategy.fail)
.mapMaterializedValue { outActor =>
userActor ! Connect(outActor)
NotUsed
}
.map((x: ChatMessage) => OutgoingMessage.fromMessage(x))
.map((outMsg: OutgoingMessage) => TextMessage(outMsg.asJson.toString))
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
However, UserActor is one per user, and each user can have multiple sockets open simultaneously. So I just collecting outs
to set inside UserActor
and sending info to each of them. It works nice.
But when source sending me terminating message (SourceDied
in my case), I don't know to which exactly out
this source
was assigned — and I can't decide which out
I should inform about completion and then delete from my outs
set.
Upvotes: 1
Views: 218
Reputation: 1054
So, as I understood from @chunjef answer, there is no direct way to do so. And personally I decided not to use random ID generation, but to use one more Actor between socket and UserActor
.
Basically, now I have SocketHandlerActor(userActor: ActorRef)
to replace UserActor
in socket creation part. It just connects to UserActor
and sending all messages between socket and UserActor
.
But when SocketHandlerActor
receives SourceDied
message, it just do stuff and then kills out
and self with PoisionPill
. UserActor
removes it from own outs
list, as soon as receives Termination
message.
Upvotes: 0
Reputation: 19527
One idea is to change your Flow
to take a unique identifier for each connection:
def websocketFlow(connectionId: String): Flow[Message, Message, NotUsed] = {
val incomingMessages: Sink[Message, NotUsed] =
...
.to(Sink.actorRef[ChatMessage](userActor, SourceDied(connectionId)))
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[ChatMessage](20, OverflowStrategy.fail)
.mapMaterializedValue { outActor =>
userActor ! Connect(connectionId, outActor)
NotUsed
}
...
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}
Obviously you would need to adjust the SourceDied
and Connect
messages to include a connection ID (which in this case could be generated with something like java.util.UUID.randomUUID.toString
, for example). Then in UserActor
, replace the Set
with a Map
, the keys for which are the connection IDs. Using a Map
will enable you to look up connection actors and remove them as needed.
Upvotes: 1