Rea Sand
Rea Sand

Reputation: 173

Http Websocket as Akka Stream Source

I'd like to listen on a websocket using akka streams. That is, I'd like to treat it as nothing but a Source.

However, all official examples treat the websocket connection as a Flow.

My current approach is using the websocketClientFlow in combination with a Source.maybe. This eventually results in the upstream failing due to a TcpIdleTimeoutException, when there are no new Messages being sent down the stream.

Therefore, my question is twofold:

  1. Is there a way – which I obviously missed – to treat a websocket as just a Source?
  2. If using the Flow is the only option, how does one handle the TcpIdleTimeoutException properly? The exception can not be handled by providing a stream supervision strategy. Restarting the source by using a RestartSource doesn't help either, because the source is not the problem.

Update

So I tried two different approaches, setting the idle timeout to 1 second for convenience

application.conf

akka.http.client.idle-timeout = 1s

Using keepAlive (as suggested by Stefano)

Source.<Message>maybe()
    .keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
    .viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
    { ... }

When doing this, the Upstream still fails with a TcpIdleTimeoutException.

Using RestartFlow

However, I found out about this approach, using a RestartFlow:

final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
    Duration.apply(3, TimeUnit.SECONDS),
    Duration.apply(30, TimeUnit.SECONDS),
    0.2,
    () -> createWebsocketFlow(system, websocketUri)
);

Source.<Message>maybe()
    .viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
    { ... }

(...)

private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
    return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}

This works in that I can treat the websocket as a Source (although artifically, as explained by Stefano) and keep the tcp connection alive by restarting the websocketClientFlow whenever an Exception occurs.

This doesn't feel like the optimal solution though.

Upvotes: 7

Views: 1218

Answers (2)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

  1. No. WebSocket is a bidirectional channel, and Akka-HTTP therefore models it as a Flow. If in your specific case you care only about one side of the channel, it's up to you to form a Flow with a "muted" side, by using either Flow.fromSinkAndSource(Sink.ignore, mySource) or Flow.fromSinkAndSource(mySink, Source.maybe), depending on the case.

  2. as per the documentation:

    Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.

    There is an ad-hoc combinator to inject keep-alive messages, see the example below and this Akka cookbook recipe. NB: this should happen on the client side.

    src.keepAlive(1.second, () => TextMessage.Strict("ping"))

Upvotes: 5

Sebastian
Sebastian

Reputation: 17443

I hope I understand your question correctly. Are you looking for asSourceOf?

path("measurements") {
  entity(asSourceOf[Measurement]) { measurements =>
    // measurement has type Source[Measurement, NotUsed]
    ...
  }
}

Upvotes: 0

Related Questions