Reputation: 422
Background: using Vertx 3.3.3 Core and Web on Java side as a server, using vertx-web-3.3.3-client.js as the client with sockjsclient1.1.2.js
Issue: I am successfully making a connection to the eventbus from the client when on my local machine or LAN. When I go through a proxy, the wss eventbus connection is being blocked (in Firefox I see "Firefox can't establish a connection to "wss://..."; in Chromium I see "WebSocket connection to wss://... failed: Error during WebSocket handshake: Unexpected response code: 400", then I see "https ://.../eventbus/.../xhr_send?t=... Failed to load resource: the server responded with a status code of 500"). However, the onopen fires and I receive some data (connection downgraded to an accepted protocol?). Immediately after this, onclose fires and I have lost connection. I know that I am successfully reaching the Java vertx server because my static web and API calls are working.
Questions: I have looked through the Vertx and SockJS documentation extensively. Is there:
Thanks in advance for any advice/help!
EDIT 1: Adding the following code for both Java server and JavaScript web client sides. The web side is very basic (and what is failing). The Java side is using Spring for dependency injection and application config, has an Eventbus connection, one API call, and serves static web content.
The API call from client to server works, and the server sources the web contents correctly, so accessing the tool is working. However, the proxy is causing wss to fail (as expected) but the downgrade to xhr-streaming is failing (I think)
Javascript:
var EB;
var URL;
var APICall = "api/eventbus/publish/";
var IncomingAddress = "heartbeat-test";
var OutgoingAddress = "client-test";
function createConnection(){
URL = $("#serveraddressinput").val(); //Getting url from html text box
console.log("Creating Eventbus connection at " + URL + "eventbus"); //Eventbus address is '<link>/eventbus'
EB = new EventBus(URL + "eventbus");
testAPICall();
EB.onopen = function(){
console.log("Eventbus connection successfully made at " + URL + "eventbus");
console.log("Registering Eventbus handler for messages at " + IncomingAddress);
EB.registerHandler(IncomingAddress, function(error, message){
console.log("Received Eventbus message " + JSON.stringify(message));
};
EB.onclose = function(){
console.log("Eventbus connection at " + URL + " has been lost");
URL = "";
};
}
function testAPICall(){
var link = URL + APICall + "heartbeat-test";
console.log("Testing API call to " + link);
$.ajax({
url: link,
type: 'POST',
data: JSON.stringify({"testFromClient": "Test message sent from Client via API Call"}),
dataType: 'json',
success: function (data, textStatus) {
console.log("API Call Success: " + JSON.stringify(data));
},
error: function (request, error) {
console.log("API Call ERROR: " + JSON.stringify(request) + " " + error);
}
});
}
function sendTestMessage(){
console.log("Sending test message to address " + OutgoingAddress);
EB.send(OutgoingAddress, "Testing 1, 2, 3...");
}
Java:
...
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.handler.sockjs.BridgeEvent;
import io.vertx.ext.web.handler.sockjs.BridgeEventType;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.ext.web.handler.sockjs.PermittedOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class MyTestVerticle extends AbstractVerticle {
private final static Logger log = LoggerFactory.getLogger(MyTestVerticle.class);
final Level ACCESS = Level.forName("ACCESS", 450);
private boolean started;
private int port;
@Value("${webserver.testpath.enabled}")
private boolean testPathEnabled;
@Value("${webserver.urlpath.test}")
private String testUrlPath;
@Value("${webserver.filepath.test}")
private String testFilePath;
@Value("${webserver.caching.enabled}")
private boolean cachingEnabled;
@Value("${webserver.ssl.enabled}")
private boolean sslEnabled;
private BridgeOptions bridgeOptions;
private SockJSHandler sockJsHandler;
private Router router;
private JksOptions sslKeyStoreOptions;
private JksOptions sslTrustStoreOptions;
public MyTestVerticle() {
this.started = false;
}
@Override
public void start(Future<Void> fut) throws Exception {
log.info("start() -- starting Vertx Verticle with eventbus, API handler, and static file handler");
// grab the router
router = getRouter();
// enable CORS for the router
CorsHandler corsHandler = CorsHandler.create("*"); //Wildcard(*) not allowed if allowCredentials is true
corsHandler.allowedMethod(HttpMethod.OPTIONS);
corsHandler.allowedMethod(HttpMethod.GET);
corsHandler.allowedMethod(HttpMethod.POST);
corsHandler.allowedMethod(HttpMethod.PUT);
corsHandler.allowedMethod(HttpMethod.DELETE);
corsHandler.allowCredentials(false);
corsHandler.allowedHeader("Access-Control-Request-Method");
corsHandler.allowedHeader("Access-Control-Allow-Method");
corsHandler.allowedHeader("Access-Control-Allow-Credentials");
corsHandler.allowedHeader("Access-Control-Allow-Origin");
corsHandler.allowedHeader("Access-Control-Allow-Headers");
corsHandler.allowedHeader("Content-Type");
// enable handling of body
router.route().handler(BodyHandler.create());
router.route().handler(corsHandler);
router.route().handler(this::handleAccessLogging);
// publish a payload to provided eventbus destination
router.post("/api/eventbus/publish/:destination").handler(this::publish);
// open up all for outbound and inbound traffic
bridgeOptions = new BridgeOptions();
bridgeOptions.addOutboundPermitted(new PermittedOptions().setAddressRegex(".*"));
bridgeOptions.addInboundPermitted(new PermittedOptions().setAddressRegex(".*"));
// sockJsHandler = SockJSHandler.create(vertx).bridge(bridgeOptions);
sockJsHandler = SockJSHandler.create(vertx);
sockJsHandler.bridge(bridgeOptions, be -> {
try {
if (be.type() == BridgeEventType.SOCKET_CREATED) {
handleSocketOpenEvent(be);
}
else if(be.type() ==BridgeEventType.REGISTER) {
handleRegisterEvent(be);
}
else if(be.type() ==BridgeEventType.UNREGISTER) {
handleUnregisterEvent(be);
}
else if(be.type() ==BridgeEventType.SOCKET_CLOSED) {
handleSocketCloseEvent(be);
}
} catch (Exception e) {
} finally {
be.complete(true);
}
});
router.route("/eventbus/*").handler(sockJsHandler);
if(testPathEnabled){
router.route("/" + testUrlPath + "/*").handler(StaticHandler.create(testFilePath).setCachingEnabled(cachingEnabled));
}
// create periodic task, pushing all current EventBusRegistrations
vertx.setPeriodic(1000, handler -> {
JsonObject obj =new JsonObject();
obj.put("testMessage", "Periodic test message from server...");
vertx.eventBus().publish("heartbeat-test", Json.encodePrettily(obj));
});
EventBus eb = vertx.eventBus();
eb.consumer("client-test", message -> {
log.info("Received message from client: " + Json.encodePrettily(message.body()) + " at " + System.currentTimeMillis());
});
HttpServerOptions httpOptions = new HttpServerOptions();
if(sslEnabled){
httpOptions.setSsl(true);
httpOptions.setKeyStoreOptions(sslKeyStoreOptions);
}
log.info("starting web server on port: " + port);
vertx
.createHttpServer(httpOptions)
.requestHandler(router::accept).listen(
port,
result -> {
if (result.succeeded()) {
setStarted(true);
log.info("Server started and ready to accept requests");
fut.complete();
} else {
setStarted(false);
fut.fail(result.cause());
}
}
);
}
private void handleSocketOpenEvent(BridgeEvent be){
String host =be.socket().remoteAddress().toString();
String localAddress = be.socket().localAddress().toString();
log.info("Socket connection opened! Host: " + host + " Local address: " + localAddress);
}
private void handleRegisterEvent(BridgeEvent be){
String host =be.socket().remoteAddress().toString();
String localAddress = be.socket().localAddress().toString();
String address = be.getRawMessage().getString("address").trim();
log.info("Eventbus register event! Address: " + address + " Host: " + host + " Local address: " + localAddress);
}
private void handleUnregisterEvent(BridgeEvent be){
String host =be.socket().remoteAddress().toString();
String localAddress = be.socket().localAddress().toString();
String address = be.getRawMessage().getString("address").trim();
log.info("Eventbus unregister event! Address: " + address + " Host: " + host + " Local address: " + localAddress);
}
private void handleSocketCloseEvent(BridgeEvent be){
String host =be.socket().remoteAddress().toString();
String localAddress = be.socket().localAddress().toString();
log.info("Socket connection closed! Host: " + host + " Local address: " + localAddress);
}
//Method handles logging at custom level for access logging to files
private void handleAccessLogging(RoutingContext routingContext){
Marker accessMarker = MarkerFactory.getMarker("ACCESS");
if(routingContext.normalisedPath().contains("/api")){
log.info(accessMarker, "Api access log: request= " + routingContext.normalisedPath() + " source=" + routingContext.request().remoteAddress());
}
else{
log.info(accessMarker, "Web access log: path= " + routingContext.normalisedPath() + " source= " + routingContext.request().remoteAddress());
}
routingContext.next();
}
/**
* Accept a payload (anything) and publish to the provided destination
*
* @param routingContext
*/
private void publish(RoutingContext routingContext) {
String destination = routingContext.request().getParam("destination");
String payload = routingContext.getBodyAsString();
if ((destination == null) || (payload == null)) {
Exception e = new Exception("Missing arguments");
routingContext.response().setStatusCode(406);
routingContext.fail(e);
} else {
log.info("API Call -> Publishing to destination: " + destination + " payload: " + payload);
vertx.eventBus().publish(destination, payload);
routingContext
.response()
.setStatusCode(200)
.putHeader("content-type", "application/json; charset=utf-8")
.end(payload);
}
}
public boolean isStarted() {
return started;
}
public void setStarted(boolean started) {
this.started = started;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Router getRouter(){
if(router == null){
router = Router.router(vertx);
}
return router;
}
public void setRouter(Router router){
this.router = router;
}
public void setSslOptions(JksOptions keyStoreOptions, JksOptions trustStoreOptions) {
this.sslKeyStoreOptions = keyStoreOptions;
this.sslTrustStoreOptions = trustStoreOptions;
}
}
Upvotes: 2
Views: 1969
Reputation: 422
This error can be resolved by doing the following:
In the Java verticle, move the Eventbus handler to the top, before any other handlers. I believe the BodyHandler or CorsHandler were messing it up and causing the 500 error.
...
router.route("/eventbus/*").handler(sockJsHandler);
...
// enable handling of body
router.route().handler(BodyHandler.create());
router.route().handler(corsHandler);
router.route().handler(this::handleAccessLogging);
// publish a payload to provided eventbus destination
router.post("/api/eventbus/publish/:destination").handler(this::publish);
Upvotes: 3