Reputation: 173
I'd like to listen on a websocket using akka streams. That is, I'd like to treat it as nothing but a Source
.
However, all official examples treat the websocket connection as a Flow
.
My current approach is using the websocketClientFlow
in combination with a Source.maybe
. This eventually results in the upstream failing due to a TcpIdleTimeoutException
, when there are no new Message
s being sent down the stream.
Therefore, my question is twofold:
Source
?Flow
is the only option, how does one handle the TcpIdleTimeoutException
properly? The exception can not be handled by providing a stream supervision strategy. Restarting the source by using a RestartSource
doesn't help either, because the source is not the problem.So I tried two different approaches, setting the idle timeout to 1 second for convenience
application.conf
akka.http.client.idle-timeout = 1s
Source.<Message>maybe()
.keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
.viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
{ ... }
When doing this, the Upstream still fails with a TcpIdleTimeoutException
.
However, I found out about this approach, using a RestartFlow
:
final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
Duration.apply(3, TimeUnit.SECONDS),
Duration.apply(30, TimeUnit.SECONDS),
0.2,
() -> createWebsocketFlow(system, websocketUri)
);
Source.<Message>maybe()
.viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
{ ... }
(...)
private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}
This works in that I can treat the websocket as a Source (although artifically, as explained by Stefano) and keep the tcp connection alive by restarting the websocketClientFlow
whenever an Exception
occurs.
This doesn't feel like the optimal solution though.
Upvotes: 7
Views: 1218
Reputation: 9023
No. WebSocket is a bidirectional channel, and Akka-HTTP therefore models it as a Flow
. If in your specific case you care only about one side of the channel, it's up to you to form a Flow
with a "muted" side, by using either Flow.fromSinkAndSource(Sink.ignore, mySource)
or Flow.fromSinkAndSource(mySink, Source.maybe)
, depending on the case.
as per the documentation:
Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.
There is an ad-hoc combinator to inject keep-alive messages, see the example below and this Akka cookbook recipe. NB: this should happen on the client side.
src.keepAlive(1.second, () => TextMessage.Strict("ping"))
Upvotes: 5
Reputation: 17443
I hope I understand your question correctly. Are you looking for asSourceOf
?
path("measurements") {
entity(asSourceOf[Measurement]) { measurements =>
// measurement has type Source[Measurement, NotUsed]
...
}
}
Upvotes: 0