Kirill Stoianov
Kirill Stoianov

Reputation: 121

Session Management Jetty Websocket Server

The problem when receiving a message in one websocket, forward this message to others connected websocket's. I use jetty-9.2.20.v20161216.

So I initialize the server..

JettyWSServer websocketServer = new JettyWSServer("localhost", 8000, new MySocketHandler(), new QueuedThreadPool(128));

public <T extends WebSocketHandler> JettyWSServer(String hostName, int port, T webscoketHandler, QueuedThreadPool threadPool) {
    this.hostName = hostName;
    this.port = port;
    this.handler = webscoketHandler;
    this.threadPool = threadPool;
    this.socket = null;

    //create server
    this.server = new Server(this.threadPool);

    //set connector
    ServerConnector connector = new ServerConnector(server);
    connector.setHost(this.hostName);
    connector.setPort(this.port);
    this.server.addConnector(connector);

    //set handler
    this.server.setHandler(this.handler);

    //set listener
    setLifecycleListener();
}

MySocketHandler.java

public class MySocketHandler extends WebSocketHandler {

    private final String TAG = MySocketHandler.class.getSimpleName();
    private MySocketCreator creator;

    @Override
    public void configure(WebSocketServletFactory webSocketServletFactory) {
        this.creator = new MySocketCreator();
        webSocketServletFactory.setCreator(this.creator);
    }


    public Set<ServerSocket> getSockets(){
        return this.creator.getSockets();
    }
}

MySocketCreator.java

public class MySocketCreator implements WebSocketCreator {

    private static final String TAG = MySocketCreator.class.getSimpleName();
    private static Log log = new Log(TAG, true);

    private Set<ServerSocket> sockets = new HashSet<>();
    private Set<Session> guests = new HashSet<>();
    private ConcurrentHashMap<ServiceUser, ArrayList<WSDeviceSessionWrapper>> users = new ConcurrentHashMap<>();


    @Override
    public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
        ServerSocket socket = new ServerSocket(statusCallback);
        sockets.add(socket);
        return socket;
    }


    private OnSessionStatusListener statusCallback = new OnSessionStatusListener() {
        @Override
        public void onGuestIn(Session session) {


            synchronized (this) {
                guests.add(session);
                Integer totalAgeReduce = users.values()
                        .stream()
                        .map(wsDeviceSessionWrappers -> {
                            return 1;
                        })
                        .reduce(
                                0,
                                (a, b) -> a + b);
                log.debug("onGuestIn() " + "Guests: " + guests.size() + " Registered: " + totalAgeReduce);
            }
        }

        @Override
        public void onUserIn(Session session, ServiceUser user, Device device) {


            synchronized (this) {

                if (guests.contains(session)) guests.remove(session);

                if (!users.containsKey(user)) {
                    users.put(user, new ArrayList<WSDeviceSessionWrapper>());
                }
                users.get(user).add(new WSDeviceSessionWrapper(session, device));

                log.debug("onUserIn() " + "Guests: " + guests.size() + " Registered: " + users.size());
            }

        }

        @Override
        public void sendResponse(ArrayList<ServiceUser> users, WSResponse response) {
        log.debug("Send message to [" + (users != null ? users.size() : null) + "] current users " + MySocketCreator.this.users.size());
         MySocketCreator.this.users.keySet().forEach(user -> {
               users.forEach(u -> {
                   if (user.equals(u)) {

                       ArrayList<WSDeviceSessionWrapper> wsDeviceSessionWrappers = MySocketCreator.this.users.get(user);
                       new ArrayList<>(wsDeviceSessionWrappers).forEach(wrapper -> {
                               wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());
                           }
                       });

                   }
               });
           });


        }

        @Override
        public void sendResponse(ServiceUser user, WSResponse response, Device excludeDevice) {
                MySocketCreator.this.users.get(user).forEach(wrapper -> {
                    wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());
                });
        }

        @Override
        public void onExit(Session session, ServiceUser user, Device device) {

            synchronized (this) {

                //remove from guest sessions
                if (session != null && guests.contains(session)) guests.remove(session);


                if (user != null && device != null && users.containsKey(user)) {
                    ArrayList<WSDeviceSessionWrapper> wrappers = users.get(user);
                    Iterator<WSDeviceSessionWrapper> iterator = wrappers.iterator();
                    while (iterator.hasNext()) {
                        WSDeviceSessionWrapper wrapper = iterator.next();
                        if (wrapper.getSession() == session || wrapper.getSession().equals(session) && wrapper.getDevice() == device || wrapper.getDevice().equals(device)) {
                            //remove session for current device
                            iterator.remove();

                            //if user does not have session on server
                            //remove him from current server users
                            if (wrappers.size() == 0) {
                                users.remove(user);
                            }
                        }
                    }
                }

                Integer totalRegisteredDevices = users.values()
                        .stream()
                        .map(wsDeviceSessionWrappers -> {
                            return 1;
                        })
                        .reduce(
                                0,
                                (a, b) -> a + b);
                log.debug("onExit() " + "Guests: " + guests.size() + " Registered: " + totalRegisteredDevices);
            }


        }
    };

    public Set<ServerSocket> getSockets() {
        return sockets;
    }

}

The logic of this code:

In the class of MySocketCreator, when creating a new socket I pass to callback. Next in the socket in the event onOpen I call the callback and pass it the session, and the session is stored in MySocketCreator class, after this session, I associate with the user and the device.

The problem is that when I try to send a message to all users, from any websocket through callback method

    @Override
    public void sendResponse(ArrayList<ServiceUser> users, WSResponse response) {
    log.debug("Send message to [" + (users != null ? users.size() : null) + "] current users " + MySocketCreator.this.users.size());
       MySocketCreator.this.users.keySet().forEach(user -> {
           users.forEach(u -> {
               if (user.equals(u)) {
                   ArrayList<WSDeviceSessionWrapper> wsDeviceSessionWrappers = MySocketCreator.this.users.get(user);
                   new ArrayList<>(wsDeviceSessionWrappers).forEach(wrapper -> {
                           wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());
                       }
                   });

               }
           });
       });
    }

string wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());

lock thread and server does not work again. Trying to replace it

wrapper.getSession().getRemote().sendString(response.toJSON());

throw exceptions

java.lang.IllegalStateException: Blocking message pending 10000 for BLOCKING

org.eclipse.jetty.websocket.api.WebSocketException: RemoteEndpoint unavailable, outgoing connection not open

These two options do not work for 300 connections/

Question: How can I send a message to all users?

Upvotes: 0

Views: 2948

Answers (2)

iawom
iawom

Reputation: 1

i send messages on a same Remote with multiple threads , then i got the same exception.

try

wrapper.getSession().getRemote().sendString(msg, new WriteCallback() {
                        @Override
                        public void writeFailed(Throwable x) {

                        }

                        @Override
                        public void writeSuccess() {

                        }
                    });

Upvotes: 0

Joakim Erdfelt
Joakim Erdfelt

Reputation: 49462

You cannot blindly send a message to all users.

What if some users are congested and not reading as fast as other users?

What if one user is generating messages at a rate faster then your slowest users internet connection?

In your scenario, you have to handle the Future you get back on sendStringByFuture() (or alternatively the sendString(String,WriteCallback)) and if that connection is slow, or fails, you have to remove it from your list of users. If you get the event that means the message got send, then you know that particular user/client is uncongested and free to send another message to.

Your loop that sends has to queue messages for EACH user/client, not as a whole, but each user/client has its own queue.

Your loop has to only send if that user is known to be uncongested at the time of the send.

You might even have a need to drop messages to really slow clients, or disconnect them entirely if their queue gets too large.

Yes, this is complicated, and that's why there are so many libraries built on top of websocket that do just this.

Consider using cometd and its websocket transport.

Upvotes: 1

Related Questions