kupi
kupi

Reputation: 3

Play 2.6 WebSocket proxy using Akka - How to handle Streamed Text vs Strict Text

While writing a WebSocket proxy in Play 2.6 (based on Websocket Proxy using Play 2.6 and akka streams), I am facing a problem handling streamed text.

Concerned code:

def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
  Flow[String].map(s => TextMessage(s))
  .via(websocketFlow)
  .map(_.asTextMessage.getStrictText)
}

This is working for proxying to a local websocket server. But while proxying to a remote server, it leads to the following error:

java.lang.IllegalStateException: Cannot get strict text for streamed message.

We can get stream message via _.asTextMessage.getStreamedText, but I am not able to figure out how to convert it into String.

Upvotes: 0

Views: 342

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19497

As the documentation for Akka HTTP (the underlying engine in Play) states, one cannot expect the message to always be Strict:

When receiving data from the network connection the WebSocket implementation tries to create a Strict message whenever possible, i.e. when the complete data was received in one chunk. However, the actual chunking of messages over a network connection and through the various streaming abstraction layers is not deterministic from the perspective of the application. Therefore, application code must be able to handle both streamed and strict messages and not expect certain messages to be strict. (Particularly, note that tests against localhost will behave differently than tests against remote peers where data is received over a physical network connection.)

To handle both Strict and Streamed messages, you could do something like the following (which is inspired by this answer):

def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
  Flow[String]
    .map(TextMessage(_))
    .via(websocketFlow)
    .collect {
      case TextMessage.Strict(text) =>
        Future.successful(text)
      case TextMessage.Streamed(textStream) =>
        textStream
          .limit(100)
          .completionTimeout(10 seconds)
          .runFold("")(_ + _)
          .flatMap(x => Future.successful(x))
    }
    .mapAsync(parallelism = 3)(identity)
}

Upvotes: 1

Related Questions