Michael Zinsmaier
Michael Zinsmaier

Reputation: 53

Akka Streams efficiently fold/merge substreams (WebSocket Frames -> Messages)

tldr. How do I efficiently drain BinaryMessages in Akka HTTP to create a Flow of ByteStrings where each ByteString matches one WS Object.

I want to build a Akka WebSocket server that streams complete WebSocket objects as ByteString i.e. assembles WebSocket frames until I have a full WS object and emits that downstream. Or more generally I have a stream of Sources and want to merge every Source into one element before forwarding downstream

E1(S1(a,b,c)), E2(S2(d,e,f,g)), E3(S3(h,i)) -> E1(abc), E2(defg), E3(hi)
// E = one element in the parent stream
// S a inner source, not all child elements might be available directly
// a-i the actual data elements

However I struggle a bit with the API / the best way to do it efficiently. I came up with the following code, that uses a Sink.fold to drain the sources:

def flattenSink[Mat](sink: Sink[ByteString, Mat], materializer: Materializer): Sink[BinaryMessage, Mat] = {
  Flow[BinaryMessage]
    .map(d => {
      val graph = d.dataStream.toMat(Sink.fold(ByteString.empty)((a, b) => a ++ b))(Keep.right)
      val future = graph.run()(materializer)
      Source.fromFuture(future)
    })
    .flatMapConcat(identity)
    .toMat(sink)(Keep.right)
}

// or similar with the WS API 

Flow[BinaryMessage]
  .map(d => d.toStrict(timeout, materializer))
  ...

but the added materializer looks to me as if this might become inefficient, there could be context switches to a different thread ...

is there a better way to do it? Preferred in a way that obviously runs as part of the main flow, without unnecessary context switches to another thread?

(I'm not concerned about the size that the WS objects might have, the time it might take to assemble them, both will be tiny in my case, I'm not going to stream Gigabyte sized objects)

thanks!

Upvotes: 0

Views: 134

Answers (1)

Michael Zinsmaier
Michael Zinsmaier

Reputation: 53

I found a solution using the build in functionality of flatMapConcat. Since flatMapConcat materializes a Source internally, it also allows to transform my source of WebSocket frames into a Source of a single ByteString without an external materializer

  def flattenSink[Mat](sink: Sink[ByteString, Mat]): Sink[BinaryMessage, Mat] = {
    Flow[BinaryMessage]
      .flatMapConcat(msg => if (msg.isStrict) {
        Source.single(msg.getStrictData)
      } else {
        msg.dataStream
          .fold(new ByteStringBuilder())((b, e) => b.append(e))
          .map(x => x.result())
      })
      .toMat(sink)(Keep.right)
  }
  • materializer: it should be the same that runs the Flow
  • bytestring concatenation: the builder should be as efficient as it gets
  • strict messages: wrapping them in a Source.single seems to be unnecessary but I couldn't find a way around it.

Upvotes: 1

Related Questions