john
john

Reputation: 11679

Avoid using synchronized lock on an object

I have my below method in a SocketManager class which is called by background thread every 60 seconds. It will ping a socket and check whether it is live or not and put everything in a liveSocketsByDatacenter map.

  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();

  // runs every 60 seconds to ping all the socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;

        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }

Also I have below methods in the same SocketManager class. getNextSocket() method will be called by multiple reader threads (let's say 10 threads max) concurrently to get the next live socket.

  // this method will be called by multiple threads concurrently to get the next live socket
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!listOfEndPoints.isEmpty()) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

Problem Statement:

I want to make sure all those 10 threads + timer thread should never use same socket after calling getNextSocket() method.

What is the best way to solve this problem? I can have synchronize on a socket from those 10 reader threads plus in timer thread as well which will guarantee that only thread is working on that socket but I don't want to use synchronization here. There must be a better way to ensure that every thread uses a different single live socket at the same time instead of synchronization on a particular socket. I have around 60 socket and around 10 reader threads plus 1 timer threads. Do I need to use ThreadLocal concept here?

Upvotes: 0

Views: 251

Answers (3)

CodesInTheDark
CodesInTheDark

Reputation: 191

The best solution for your problem is to use ConcurrentQueue You don't need to use ThreadLocal. ConcurrentQueue is non-blocking and very efficient for multi-threading environment. For example this is how would you remove sockets that are not alive and keep the ones that are alive.

    private final Map<Datacenters, ConcurrentLinkedQueue<SocketHolder>> liveSocketsByDatacenter =
          new ConcurrentHashMap<>();


// runs every 60 seconds to ping 70 sockets the socket to make sure whether they are alive or not (it does not matter if you ping more sockets than there are in the list because you are rotating the que)
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
    Queue<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      for (int i = 0; i<70; i++) {
            SocketHolder s = liveSockets.poll();
        Socket socket = s.getSocket();
        String endpoint = s.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;

        SocketHolder zmq = new SocketHolder(socket, s.getContext(), endpoint, isLive);
        liveSockets.add(zmq);
      }
    }
  }

Upvotes: 1

tsolakp
tsolakp

Reputation: 5948

Seems like you need to implement some type of Socket pool and have the Threads, once they are done, to release the socket back to the pool. The synchronization would only happen only at retrieval and release of the socket from/to the pool. Once the socket is retrieved all the access to it can be done without synchronization. IMHO, I dont see how ThreadLocal's can help you here.

Here is code showing the approach:

  public synchrnozed Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if ( liveSocket.isPresent() && !liveSocket.isUsed() ) {
        liveSocket.setUsed(true);
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  public synchrnozed release(SocketHolder s){
     s.setUsed(false);
  }

and each thread will look like this:

  new Thread( new Runnable(){
     public void run(){
       SocketHolder sh = null;
       try{
           sh = socketManager.getNextSocket();
           //do some work on socker
       }finally{
           if (sh != null) socketManager.release(sh);
       }
     }
  }; ) ).start();

Upvotes: 0

Andr&#233; Alexandre
Andr&#233; Alexandre

Reputation: 26

Just from you first requirement: "I want to make sure all those 10 threads + timer thread should never use same socket after calling getNextSocket() method.". My answer is that you can't do this without a ThreadLocal.

As for maximising the usage of those sockets, you definitely need some sort of socket pool to manage them, so that you can request and release them. Every time you request a socket, you "mark it" as used by the current thread, that's here the ThreadLocal comes in where you move the socket reference into the ThreadLocal, so that the next call to the getNextSocket() just goes in there and checks if exists and returns, if not go to the socket poll and pull a new one and set it on the ThreadLocal. You will always need to release the socket back into the pool, don't forget that. Maybe create some sort of abstraction to the socket itself so that you can use the AutoCloseable interface to make the release to the socket pool, much more easy.

Upvotes: 0

Related Questions