Foreign
Foreign

Reputation: 405

Issue with reactive streams

ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
q.add("1");
q.add("2");
q.add("3");

WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();

webSocketClient.execute(new URI("wss://echo.websocket.org"), session -> session
        .send(Flux.just("INIT").map(session::textMessage))
        .thenMany(session
                .send(Flux.<String>generate(sink ->
                {
                    if (q.peek() != null)
                        sink.next(q.poll());
                    else
                        sink.complete();
                }).map(session::textMessage))
        )
        .thenMany(session
                .receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(s -> "Received: " + s)
                .log()
        )
        .then())
        .subscribe();

int i = 0;
while (true)
{
    String msg = "MSG #" + i++;
    q.add(msg);
    Thread.sleep(1000);
}

Output:

INFO reactor.Flux.Map.1 - onNext(Received: INIT)
INFO reactor.Flux.Map.1 - onNext(Received: 1)
INFO reactor.Flux.Map.1 - onNext(Received: 2)
INFO reactor.Flux.Map.1 - onNext(Received: 3)
INFO reactor.Flux.Map.1 - onNext(Received: MSG #0)

And then it stops. The while (true) is always populating the queue. To my understanding, the way I used thenMany was supposed to generate a new Flux with the ConcurrentLinkedQueue content every time the previous was marked as complete(). But that doesn't seems to be working.

Edit: Basically what I want is to send data to the websocket from outside the lambda scope. Thats why I created a queue and used .thenMany(session.send(Flux.<String>generate..... I expected that it would keep reading from the queue while other threads add data to it.

Upvotes: 2

Views: 854

Answers (2)

Eugene Khyst
Eugene Khyst

Reputation: 10315

The problem is that you combine sending and receiving with thenMany method on Mono and Flux. thenMany method makes Flux ignore element from this flux and react only on completion signal.

So, nothing happens unless you call sink.complete(). But after calling complete method no further events will be sent even if requested.

Sending and receiving should be made independent.

Also, instead of ConcurrentLinkedQueue a FluxProcessor and FluxSink can be used. EmitterProcessor can emit to several subscribers while honoring backpressure for each of its subscribers. When it has no subscriber, it can still accept a few data pushes up to a configurable bufferSize.

int bufferSize = 10;
FluxProcessor<String, String> processor =
    EmitterProcessor.<String>create(bufferSize).serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);

sink.next("1");
sink.next("2");
sink.next("3");

WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new URI("wss://echo.websocket.org"),
    session -> {
      Flux<WebSocketMessage> out = Flux.just("INIT")
          .concatWith(processor)
          .map(session::textMessage);

      session.send(out)
          .subscribe(); //instead of thenMany

      return session.receive()
          .map(WebSocketMessage::getPayloadAsText)
          .map(s -> "Received: " + s)
          .log()
          .then();
    })
    .subscribe();

for (int i = 1; i <= 10; i++) {
  sink.next("MSG #" + i);
  TimeUnit.SECONDS.sleep(1);
}
sink.complete();

Logs:

17:57:54.177 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
17:57:54.178 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - request(unbounded)
17:57:54.304 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: INIT)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 1)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 2)
17:57:54.306 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 3)
17:57:54.307 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #1)
17:57:54.396 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #2)
17:57:55.454 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #3)
17:57:56.480 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #4)
17:57:57.505 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #5)
17:57:58.412 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #6)
17:57:59.448 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #7)
17:58:00.484 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #8)
17:58:01.496 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #9)
17:58:02.434 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #10)

Upvotes: 1

Ricard Kollcaku
Ricard Kollcaku

Reputation: 1702

As i see you are searching for a queue that always emit the data that come to. so im suggesting you to use WebFlux Processor and to separate send and receive in 2 different streams ;

    EmitterProcessor<String> e = EmitterProcessor.create();
    e.onNext("1");

    WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();



    webSocketClient.execute(new java.net.URI("wss://echo.websocket.org"), webSocketSession -> {
        return    Mono.zip( listenData(webSocketSession),
                sendSms(webSocketSession,e),(aVoid, aVoid2) -> aVoid);

    }).subscribe();



    int i = 0;
    while (true)
    {
        String msg = "MSG #" + i++;

        e.onNext(msg);
        Thread.sleep(1000);
    }


 private Mono<Void> sendSms(WebSocketSession webSocketSession,EmitterProcessor<String> e) {
   return webSocketSession.send(e.map(webSocketSession::textMessage).log());
}

private Mono<Void> listenData(WebSocketSession webSocketSession) {
    return webSocketSession
            .receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(s -> "Received: " + s)
            .log()
            .then();
}

Upvotes: 0

Related Questions