Reputation: 51
The goal is to send the WSConnectEvent
once a client is connected and the stream starts. With akka-streams 1.0 I was able to accomplish this with the following:
Flow(Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)) {
implicit builder =>
sdpSource =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID, userUUID, actor))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
(fromWebsocket.inlet, toWebsocket.outlet)
}
In trying to upgrade this to work with Akka-Streams 2.0.1 I changed to the following code, but I am no loner receiving the WSConnectEvent
message. I'm not sure if this is because my source is setup incorrectly, or I am not materializing the ActorRef
properly.
val sdpSource = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(
GraphDSL.create() { implicit builder =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
FlowShape(fromWebsocket.in, toWebsocket.out)
}
)
Upvotes: 4
Views: 1612
Reputation: 51
Thanks to the help of johanandren, mapMaterializedValue
was not the correct approach, instead I needed to construct a flow to send the WSConnectEvent
and connect the output of builder.materializeValue
through it through to the 'merge' in port, like so:
// Join events, also sends actor for sending stuff
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))
builder.materializedValue ~> actorConnected ~> merge.in(1)
The full working example:
val sdpSource: Source[WSResponseEvent, ActorRef] = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(GraphDSL.create(sdpSource) {
implicit builder =>
{ (responseSource) =>
import GraphDSL.Implicits._
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match {
case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_))
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Future.successful(None)
}).collect {
case Some(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), 2.minutes);
val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID)))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
builder.materializedValue ~> actorConnected ~> merge.in(1)
merge ~> toCallActor
responseSource ~> toWebsocket
FlowShape.of(fromWebsocket.in, toWebsocket.out)
}
})
Upvotes: 1
Reputation: 11479
The call to sdpSource.mapMaterializedValue(...)
only transforms the materialized value from one type (ActorRef
to WSConnectEvent
), it does not in any way emit it as an element from the Source
.
Source.materializedValue
however, provides a source that will emit the materialized value once the graph is materialized.
So, what you want to do is:
fromWebsocket ~> merge.in(0)
actorAsSource.materializedValue ~> merge.in(1)
Upvotes: 0