rayman
rayman

Reputation: 21596

How to offload blocking operation to a worker Verticle using websockets and async request

I implementing websockets using Vert.x 3.

The scenario is simple: opening socket from client doing some 'blocking' work at the vertex verticle worker and when finish response with the answer to the client(via the open socket)

Please tell me if I am doing it right:

Created VertxWebsocketServerVerticle. as soon as the websocket is opening and request coming from the client I am using eventBus and passing the message to

EventBusReceiverVerticle. there I am doing blocking operation.

how I am actually sending back the response back to VertxWebsocketServerVerticle and sending it back to the client?

code:

Main class:

 public static void main(String[] args) throws InterruptedException {
        Vertx vertx = Vertx.vertx();
       vertx.deployVerticle(new EventBusReceiverVerticle("R1"),new DeploymentOptions().setWorker(true));
        vertx.deployVerticle(new VertxWebsocketServerVerticle());
}

VertxWebsocketServerVerticle:

public class VertxWebsocketServerVerticle extends AbstractVerticle {


    public void start() {
        vertx.createHttpServer().websocketHandler(webSocketHandler -> {

            System.out.println("Connected!");
            Buffer buff = Buffer.buffer().appendInt(12).appendString("foo");
            webSocketHandler.writeFinalBinaryFrame(buff);
            webSocketHandler.handler(buffer -> {
                String inputString = buffer.getString(0, buffer.length());
                System.out.println("inputString=" + inputString);
                vertx.executeBlocking(future -> {
                    vertx.eventBus().send("anAddress", inputString, event -> System.out.printf("got back from reply"));
                    future.complete();
                }, res -> {
                    if (res.succeeded()) {
                        webSocketHandler.writeFinalTextFrame("output=" + inputString + "_result");
                    }
                });

            });
        }).listen(8080);
    }


    @Override
    public void stop() throws Exception {
        super.stop();
    }
}

EventBusReceiverVerticle :

public class EventBusReceiverVerticle extends AbstractVerticle {

        private String name = null;

        public EventBusReceiverVerticle(String name) {
            this.name = name;
        }

        public void start(Future<Void> startFuture) {
            vertx.eventBus().consumer("anAddress", message -> {
                System.out.println(this.name +
                        " received message: " +
                        message.body());
                try {
                    //doing some looong work..
                    Thread.sleep(10000);
                    System.out.printf("finished waiting\n");
                    startFuture.complete();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

I always get:

WARNING: Message reply handler timed out as no reply was received - it will be removed

github project at: https://github.com/IdanFridman/VertxAndWebSockets thank you, ray.

Upvotes: 1

Views: 2582

Answers (1)

tmarwen
tmarwen

Reputation: 16354

Since you are blocking your websocket handler until it receives a reply for the sent message to the EventBus, which will not, in fact, be received until the set up delay of 10s laps, you certainly will get warning since the reply handler of the event bus will timeout -> Message sent but no response received before the timeout delay.

Actually I don't know if you are just experimenting the Vert.x toolkit or you are trying to fulfill some requirement, but certainly you have to adapt your code to match in the Vert.x spirit:

  • First you should better not block until a message is received in your websocket handler, keep in mind that everything is asynchrounous when it comes to Vert.x.
  • In order to sleep for some time, use the Vert.x way and not the Thread.sleep(delay), i.e. vertx.setTimer(...).

Upvotes: 0

Related Questions