Reputation: 2043
I want to pass messages to bus via REST, and get it back. But I cant correctly setup the message bus receiver, it throws java.lang.IllegalStateException
: Response has already been written. In real life message bus should receive messages from different sources and pass a message to another target. Therefore we just need to publish the message to the bus. But how to correctly read messages and handle all of them? For example from a REST interface: read that messages!
My simple app start:
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new RESTVerticle());
vertx.deployVerticle(new Receiver());
EventBus eventBus = vertx.eventBus();
eventBus.registerDefaultCodec(MessageDTO.class, new CustomMessageCodec());
}
REST part
public class RESTVerticle extends AbstractVerticle {
private EventBus eventBus = null;
@Override
public void start() throws Exception {
Router router = Router.router(vertx);
eventBus = vertx.eventBus();
router.route().handler(BodyHandler.create());
router.route().handler(CorsHandler.create("*")
.allowedMethod(HttpMethod.GET)
.allowedHeader("Content-Type"));
router.post("/api/message").handler(this::publishToEventBus);
// router.get("/api/messagelist").handler(this::getMessagesFromBus);
router.route("/*").handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router::accept).listen(9999);
System.out.println("Service running at 0.0.0.0:9999");
}
private void publishToEventBus(RoutingContext routingContext) {
System.out.println("routingContext.getBodyAsString() " + routingContext.getBodyAsString());
final MessageDTO message = Json.decodeValue(routingContext.getBodyAsString(),
MessageDTO.class);
HttpServerResponse response = routingContext.response();
response.setStatusCode(201)
.putHeader("content-type", "application/json; charset=utf-8")
.end(Json.encodePrettily(message));
eventBus.publish("messagesBus", message);
}
And the Receiver: I move it to a different class, but it does not help
public class Receiver extends AbstractVerticle {
@Override
public void start() throws Exception {
EventBus eventBus = vertx.eventBus();
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.route().handler(CorsHandler.create("*")
.allowedMethod(HttpMethod.GET)
.allowedHeader("Content-Type"));
router.get("/api/messagelist").handler(this::getMessagesFromBus);
router.route("/*").handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router::accept).listen(9998);
System.out.println("Service Receiver running at 0.0.0.0:9998");
private void getMessagesFromBus(RoutingContext routingContext) {
EventBus eventBus = vertx.eventBus();
eventBus.consumer("messagesBus", message -> {
MessageDTO customMessage = (MessageDTO) message.body();
HttpServerResponse response = routingContext.response();
System.out.println("Receiver ->>>>>>>> " + customMessage);
if (customMessage != null) {
response.putHeader("content-type", "application/json; charset=utf-8")
.end(Json.encodePrettily(customMessage));
}
response.closed();
});
}
So if i post message to REST and handler publish it to the bus, when I am runtime get http://localhost:9998/api/messagelist it is return json, but second time it trow exception
java.lang.IllegalStateException: Response has already been written
at io.vertx.core.http.impl.HttpServerResponseImpl.checkWritten(HttpServerResponseImpl.java:561)
at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:154)
at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:52)
at com.project.backend.Receiver.lambda$getMessagesFromBus$0(Receiver.java:55)
at io.vertx.core.eventbus.impl.HandlerRegistration.handleMessage(HandlerRegistration.java:207)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:201)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$127(EventBusImpl.java:498)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$18(ContextImpl.java:335)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Receiver ->>>>>>>> Message{username=Aaaewfewf2d, message=41414wefwef2d2}
How to correctly get all messages from the receiver? Or if the bus received messages, should I immediately store them to the db? Can a message bus keep messages and not lost them?
Thanks
Upvotes: 1
Views: 2418
Reputation: 84
Each hit in the entry point "/api/messagelist" creates one new consumer with the request routing context.
The first request will create the consumer and reply to the request. When the second message was published, that consumer will receive the message and will reply to the previous request (instance) and this was closed.
I think that you misunderstood the event bus purpose and I really recommend you to read the documentation. http://vertx.io/docs/vertx-core/java/#event_bus
Upvotes: 2
Reputation: 5801
I did not had the chance to test your code but it seems that the publish operation is throwing an exception and vertx will try to send back an error message. However you already replied and ended the connection.
Now the error might be from your codec but due to the asynchronous nature of vertx you only see it at a later stage and mangled with the internal error handler.
Upvotes: 1