Reputation: 53
tldr. How do I efficiently drain BinaryMessages in Akka HTTP to create a Flow of ByteStrings where each ByteString matches one WS Object.
I want to build a Akka WebSocket server that streams complete WebSocket objects as ByteString i.e. assembles WebSocket frames until I have a full WS object and emits that downstream. Or more generally I have a stream of Sources and want to merge every Source into one element before forwarding downstream
E1(S1(a,b,c)), E2(S2(d,e,f,g)), E3(S3(h,i)) -> E1(abc), E2(defg), E3(hi)
// E = one element in the parent stream
// S a inner source, not all child elements might be available directly
// a-i the actual data elements
However I struggle a bit with the API / the best way to do it efficiently. I came up with the following code, that uses a Sink.fold to drain the sources:
def flattenSink[Mat](sink: Sink[ByteString, Mat], materializer: Materializer): Sink[BinaryMessage, Mat] = {
Flow[BinaryMessage]
.map(d => {
val graph = d.dataStream.toMat(Sink.fold(ByteString.empty)((a, b) => a ++ b))(Keep.right)
val future = graph.run()(materializer)
Source.fromFuture(future)
})
.flatMapConcat(identity)
.toMat(sink)(Keep.right)
}
// or similar with the WS API
Flow[BinaryMessage]
.map(d => d.toStrict(timeout, materializer))
...
but the added materializer looks to me as if this might become inefficient, there could be context switches to a different thread ...
is there a better way to do it? Preferred in a way that obviously runs as part of the main flow, without unnecessary context switches to another thread?
(I'm not concerned about the size that the WS objects might have, the time it might take to assemble them, both will be tiny in my case, I'm not going to stream Gigabyte sized objects)
thanks!
Upvotes: 0
Views: 134
Reputation: 53
I found a solution using the build in functionality of flatMapConcat. Since flatMapConcat materializes a Source internally, it also allows to transform my source of WebSocket frames into a Source of a single ByteString without an external materializer
def flattenSink[Mat](sink: Sink[ByteString, Mat]): Sink[BinaryMessage, Mat] = {
Flow[BinaryMessage]
.flatMapConcat(msg => if (msg.isStrict) {
Source.single(msg.getStrictData)
} else {
msg.dataStream
.fold(new ByteStringBuilder())((b, e) => b.append(e))
.map(x => x.result())
})
.toMat(sink)(Keep.right)
}
Upvotes: 1