Reputation: 405
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
q.add("1");
q.add("2");
q.add("3");
WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new URI("wss://echo.websocket.org"), session -> session
.send(Flux.just("INIT").map(session::textMessage))
.thenMany(session
.send(Flux.<String>generate(sink ->
{
if (q.peek() != null)
sink.next(q.poll());
else
sink.complete();
}).map(session::textMessage))
)
.thenMany(session
.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(s -> "Received: " + s)
.log()
)
.then())
.subscribe();
int i = 0;
while (true)
{
String msg = "MSG #" + i++;
q.add(msg);
Thread.sleep(1000);
}
Output:
INFO reactor.Flux.Map.1 - onNext(Received: INIT)
INFO reactor.Flux.Map.1 - onNext(Received: 1)
INFO reactor.Flux.Map.1 - onNext(Received: 2)
INFO reactor.Flux.Map.1 - onNext(Received: 3)
INFO reactor.Flux.Map.1 - onNext(Received: MSG #0)
And then it stops. The while (true)
is always populating the queue. To my understanding, the way I used thenMany
was supposed to generate a new Flux with the ConcurrentLinkedQueue
content every time the previous was marked as complete()
. But that doesn't seems to be working.
Edit:
Basically what I want is to send data to the websocket from outside the lambda scope. Thats why I created a queue and used .thenMany(session.send(Flux.<String>generate.....
I expected that it would keep reading from the queue while other threads add data to it.
Upvotes: 2
Views: 854
Reputation: 10315
The problem is that you combine sending and receiving with thenMany
method on Mono
and Flux
.
thenMany
method makes Flux
ignore element from this flux and react only on completion signal.
So, nothing happens unless you call sink.complete()
.
But after calling complete
method no further events will be sent even if requested.
Sending and receiving should be made independent.
Also, instead of ConcurrentLinkedQueue
a FluxProcessor
and FluxSink
can be used.
EmitterProcessor
can emit to several subscribers while honoring backpressure for each of its subscribers. When it has no subscriber, it can still accept a few data pushes up to a configurable bufferSize
.
int bufferSize = 10;
FluxProcessor<String, String> processor =
EmitterProcessor.<String>create(bufferSize).serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
sink.next("1");
sink.next("2");
sink.next("3");
WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new URI("wss://echo.websocket.org"),
session -> {
Flux<WebSocketMessage> out = Flux.just("INIT")
.concatWith(processor)
.map(session::textMessage);
session.send(out)
.subscribe(); //instead of thenMany
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(s -> "Received: " + s)
.log()
.then();
})
.subscribe();
for (int i = 1; i <= 10; i++) {
sink.next("MSG #" + i);
TimeUnit.SECONDS.sleep(1);
}
sink.complete();
Logs:
17:57:54.177 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
17:57:54.178 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - request(unbounded)
17:57:54.304 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: INIT)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 1)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 2)
17:57:54.306 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 3)
17:57:54.307 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #1)
17:57:54.396 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #2)
17:57:55.454 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #3)
17:57:56.480 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #4)
17:57:57.505 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #5)
17:57:58.412 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #6)
17:57:59.448 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #7)
17:58:00.484 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #8)
17:58:01.496 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #9)
17:58:02.434 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #10)
Upvotes: 1
Reputation: 1702
As i see you are searching for a queue that always emit the data that come to. so im suggesting you to use WebFlux Processor and to separate send and receive in 2 different streams ;
EmitterProcessor<String> e = EmitterProcessor.create();
e.onNext("1");
WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new java.net.URI("wss://echo.websocket.org"), webSocketSession -> {
return Mono.zip( listenData(webSocketSession),
sendSms(webSocketSession,e),(aVoid, aVoid2) -> aVoid);
}).subscribe();
int i = 0;
while (true)
{
String msg = "MSG #" + i++;
e.onNext(msg);
Thread.sleep(1000);
}
private Mono<Void> sendSms(WebSocketSession webSocketSession,EmitterProcessor<String> e) {
return webSocketSession.send(e.map(webSocketSession::textMessage).log());
}
private Mono<Void> listenData(WebSocketSession webSocketSession) {
return webSocketSession
.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(s -> "Received: " + s)
.log()
.then();
}
Upvotes: 0