Armen Arzumanyan
Armen Arzumanyan

Reputation: 2043

Vert.x how to pass/get messages from REST to message bus?

I want to pass messages to bus via REST, and get it back. But I cant correctly setup the message bus receiver, it throws java.lang.IllegalStateException: Response has already been written. In real life message bus should receive messages from different sources and pass a message to another target. Therefore we just need to publish the message to the bus. But how to correctly read messages and handle all of them? For example from a REST interface: read that messages! My simple app start:

 public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new RESTVerticle());
        vertx.deployVerticle(new Receiver());
        EventBus eventBus = vertx.eventBus();
        eventBus.registerDefaultCodec(MessageDTO.class, new CustomMessageCodec());

    }

REST part

public class RESTVerticle extends AbstractVerticle {

    private EventBus eventBus = null;

    @Override
    public void start() throws Exception {
        Router router = Router.router(vertx);
        eventBus = vertx.eventBus();
        router.route().handler(BodyHandler.create());
        router.route().handler(CorsHandler.create("*")
                .allowedMethod(HttpMethod.GET)
                .allowedHeader("Content-Type"));

        router.post("/api/message").handler(this::publishToEventBus);
       // router.get("/api/messagelist").handler(this::getMessagesFromBus);

        router.route("/*").handler(StaticHandler.create());
        vertx.createHttpServer().requestHandler(router::accept).listen(9999);
        System.out.println("Service running at 0.0.0.0:9999");

    }

private void publishToEventBus(RoutingContext routingContext) {
        System.out.println("routingContext.getBodyAsString() " + routingContext.getBodyAsString());
        final MessageDTO message = Json.decodeValue(routingContext.getBodyAsString(),
                MessageDTO.class);

        HttpServerResponse response = routingContext.response();
        response.setStatusCode(201)
                .putHeader("content-type", "application/json; charset=utf-8")
                .end(Json.encodePrettily(message));

        eventBus.publish("messagesBus", message);

    }

And the Receiver: I move it to a different class, but it does not help

public class Receiver extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        EventBus eventBus = vertx.eventBus();
        Router router = Router.router(vertx);

        router.route().handler(BodyHandler.create());
        router.route().handler(CorsHandler.create("*")
                .allowedMethod(HttpMethod.GET)
                .allowedHeader("Content-Type"));

        router.get("/api/messagelist").handler(this::getMessagesFromBus);
        router.route("/*").handler(StaticHandler.create());

        vertx.createHttpServer().requestHandler(router::accept).listen(9998);
        System.out.println("Service Receiver running at 0.0.0.0:9998");

private void getMessagesFromBus(RoutingContext routingContext) {
        EventBus eventBus = vertx.eventBus();
        eventBus.consumer("messagesBus", message -> {
            MessageDTO customMessage = (MessageDTO) message.body();
            HttpServerResponse response = routingContext.response();
            System.out.println("Receiver ->>>>>>>> " + customMessage);
            if (customMessage != null) {
                response.putHeader("content-type", "application/json; charset=utf-8")
                        .end(Json.encodePrettily(customMessage));
            }
            response.closed();

        });
    }

So if i post message to REST and handler publish it to the bus, when I am runtime get http://localhost:9998/api/messagelist it is return json, but second time it trow exception

java.lang.IllegalStateException: Response has already been written
    at io.vertx.core.http.impl.HttpServerResponseImpl.checkWritten(HttpServerResponseImpl.java:561)
    at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:154)
    at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:52)
    at com.project.backend.Receiver.lambda$getMessagesFromBus$0(Receiver.java:55)
    at io.vertx.core.eventbus.impl.HandlerRegistration.handleMessage(HandlerRegistration.java:207)
    at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:201)
    at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$127(EventBusImpl.java:498)
    at io.vertx.core.impl.ContextImpl.lambda$wrapTask$18(ContextImpl.java:335)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at java.lang.Thread.run(Thread.java:745)

Receiver ->>>>>>>> Message{username=Aaaewfewf2d, message=41414wefwef2d2}

How to correctly get all messages from the receiver? Or if the bus received messages, should I immediately store them to the db? Can a message bus keep messages and not lost them?

Thanks

Upvotes: 1

Views: 2418

Answers (2)

Luiz H. Rapatão
Luiz H. Rapatão

Reputation: 84

Each hit in the entry point "/api/messagelist" creates one new consumer with the request routing context.

The first request will create the consumer and reply to the request. When the second message was published, that consumer will receive the message and will reply to the previous request (instance) and this was closed.

I think that you misunderstood the event bus purpose and I really recommend you to read the documentation. http://vertx.io/docs/vertx-core/java/#event_bus

Upvotes: 2

Paulo Lopes
Paulo Lopes

Reputation: 5801

I did not had the chance to test your code but it seems that the publish operation is throwing an exception and vertx will try to send back an error message. However you already replied and ended the connection.

Now the error might be from your codec but due to the asynchronous nature of vertx you only see it at a later stage and mangled with the internal error handler.

Upvotes: 1

Related Questions