john
john

Reputation: 11669

Do not share same socket between two threads at the same time

I have around 60 sockets and 20 threads and I want to make sure each thread works on different socket everytime so I don't want to share same socket between two threads at all.

In my SocketManager class, I have a background thread which runs every 60 seconds and calls updateLiveSockets() method. In the updateLiveSockets() method, I iterate all the sockets I have and then start pinging them one by one by calling send method of SendToQueue class and basis on the response I mark them as live or dead. In the updateLiveSockets() method, I always need to iterate all the sockets and ping them to check whether they are live or dead.

Now all the reader threads will call getNextSocket() method of SocketManager class concurrently to get the next live available socket to send the business message on that socket. So I have two types of messages which I am sending on a socket:

So if pinger thread is pinging a socket to check whether they are live or not then no other business thread should use that socket. Similarly if business thread is using a socket to send data on it, then pinger thread should not ping that socket. And this applies to all the socket. But I need to make sure that in updateLiveSockets method, we are pinging all the available sockets whenever my background thread starts so that we can figure out which socket is live or dead.

Below is my SocketManager class:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!listOfEndPoints.isEmpty()) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}

And here is my SendToQueue class:

  // this method will be called by multiple reader threads (around 20) concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    if (!liveSocket.isPresent()) {
      // log error
      return false;
    }       
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

Problem Statement

Now as you can see above that I am sharing same socket between two threads. It seems getNextSocket() in SocketManager class could return a 0MQ socket to Thread A. Concurrently, the timer thread may access the same 0MQ socket to ping it. In this case Thread A and the timer thread are mutating the same 0MQ socket, which can lead to problems. So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.

One solution I can think of is using synchronization on a socket while sending the data but if many threads uses the same socket, resources aren't well utilized. Moreover If msg.send(socket); is blocked (technically it shouldn't) all threads waiting for this socket are blocked. So I guess there might be a better way to ensure that every thread uses a different single live socket at the same time instead of synchronization on a particular socket.

Upvotes: 13

Views: 1902

Answers (7)

Kevin Wang
Kevin Wang

Reputation: 182

For the problem of threads (Thread A and timer thread) accessing the same socket, I would keep 2 socket list for each datacenter:

  • list A: The sockets that are not in use
  • list B: The sockets that are in use

i.e.,

  • call synchronisation getNextSocket() to find an not-in-use socket from list A, remove it from list A and add it to list B;
  • call synchronisation returnSocket(Socket) upon receiving the reponse/ACK for a sent message (either business or ping), to move the socket from list B back to list A. Put a try {} finally {} block around "sending message" to make sure that the socket will be put back to list A even if there is an exception.

Upvotes: 0

GIA
GIA

Reputation: 1706

I have a simple solution maybe help you. I don't know if in Java you can add a custom attribute to each socket. In Socket.io you can. So I wanna considerate this (I will delete this answer if not).

You will add a boolean attribute called locked to each socket. So, when your thread check the first socket, locked attribute will be True. Any other thread, when ping THIS socket, will check if locked attribute is False. If not, getNextSocket.

So, in this stretch below...

...
for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
...

You will check if socket is locked or not. If yes, kill this thread or interrupt this or go to next socket. (I don't know how you call it in Java).

So the process is...

  • Thread get an unlocked socket
  • Thread set this socket.locked to True.
  • Thread ping socket and do any stuff you want
  • Thread set this socket.locked to False.
  • Thread go to next.

Sorry my bad english :)

Upvotes: -2

Claudio Corsi
Claudio Corsi

Reputation: 359

It looks like you should consider using the try-with-resource feature here. You have the SocketHolder or Option class implement the AutoCloseable interface. For instance, let us assume that Option implements this interface. The Option close method will then add back the instance to the container. I created a simple example that shows what I mean. It is not complete but it gives you an idea on how to implement this in your code.

    public class ObjectManager implements AutoCloseable {

    private static class ObjectManagerFactory {
        private static ObjectManager objMgr = new ObjectManager();
    }

    private ObjectManager() {}

    public static ObjectManager getInstance() { return ObjectManagerFactory.objMgr; }

    private static final int SIZE = 10;

    private static BlockingQueue<AutoCloseable> objects = new LinkedBlockingQueue<AutoCloseable>();

    private static ScheduledExecutorService sch;
    static {
        for(int cnt = 0 ; cnt < SIZE ; cnt++) {
            objects.add(new AutoCloseable() {

                @Override
                public void close() throws Exception {
                    System.out.println(Thread.currentThread() + " - Adding object back to pool:" + this + " size: " + objects.size());
                    objects.put(this);
                    System.out.println(Thread.currentThread() + " - Added object back to pool:" + this);
                }

            });
        }
        sch = Executors.newSingleThreadScheduledExecutor();
        sch.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                updateObjects();
            }

        }, 10, 10, TimeUnit.MICROSECONDS);
    }

    static void updateObjects() {
        for(int cnt = 0 ; ! objects.isEmpty() && cnt < SIZE ; cnt++ ) {
            try(AutoCloseable object = objects.take()) {
                System.out.println(Thread.currentThread() + " - updateObjects - updated object: " + object + " size: " + objects.size());
            } catch (Throwable t) {
                // TODO Auto-generated catch block
                t.printStackTrace();
            }
        }
    }

    public AutoCloseable getNext() {
        try {
            return objects.take();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) {
        try (ObjectManager mgr = ObjectManager.getInstance()) {

            for (int cnt = 0; cnt < 5; cnt++) {
                try (AutoCloseable o = mgr.getNext()) {
                    System.out.println(Thread.currentThread() + " - Working with " + o);
                    Thread.sleep(1000);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Throwable tt) {
            tt.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        // TODO Auto-generated method stub
        ObjectManager.sch.shutdownNow();
    }
}

Upvotes: 3

Gray
Gray

Reputation: 116878

So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.

There are certainly a number of different ways to do this. For me this seems like a BlockingQueue is the right thing to use. The business threads would take a socket from the queue and would be guaranteed that no one else would be using that socket.

private final BlockingQueue<SocketHolder> socketHolderQueue = new LinkedBlockingQueue<>();
...
public Optional<SocketHolder> getNextSocket() {
   SocketHolder holder = socketHolderQueue.poll();
   return holder;
}
...
public void finishedWithSocket(SocketHolder holder) {
   socketHolderQueue.put(holder);
}

I think that synchronizing on the socket is not a good idea for the reasons that you mention – the ping thread will be blocking the business thread.

There are a number of ways of handling the ping thread logic. I would store your Socket with a last use time and then your ping thread could every so often take each of the sockets from the same BlockingQueue, test it, and put each back onto the end of the queue after testing.

public void testSockets() {
   // one run this for as many sockets as are in the queue
   int numTests = socketHolderQueue.size();
   for (int i = 0; i < numTests; i++) {
      SocketHolder holder = socketHolderQueue.poll();
      if (holder == null) {
         break;
      }
      if (socketIsOk(socketHolder)) {
          socketHolderQueue.put(socketHolder);
      } else {
          // close it here or something
      }
   }
}

You could also have the getNextSocket() code that dequeues the threads from the queue check the timer and put them on a test queue for the ping thread to use and then take the next one from the queue. The business threads would never be using the same socket at the same time as the ping thread.

Depending on when you want to test the sockets, you can also reset the timer after the business thread returns it to the queue so the ping thread would test the socket after X seconds of no use.

Upvotes: 3

ddarellis
ddarellis

Reputation: 3960

I will make some points here. In the getNextSocket method getOrderedDatacenters method will always return the same ordered list, so you will always pick from the same datacenters from start to end (it's not a problem).

How do you guarantee that two threads wont get the same liveSocket from getNextSocket?

What you are saying here it is true:

Concurrently, the timer thread may access the same 0MQ socket to ping it.

I think the main problem here is that you don't distinguish between free sockets and reserved sockets.

One option as you said is to synchronize up on each socket. An other option is to keep a list of reserved sockets and when you want to get a next socket or to update sockets, to pick only from the free sockets. You don't want to update a socket which is already reserved.

Also you can take a look at here if it fits your needs.

Upvotes: 2

Max Vynohradov
Max Vynohradov

Reputation: 1548

Mouhammed Elshaaer is right, but in additional you can also use any concurrent collection, for example ConcurrentHashMap where you can track that each thread works on different socket (for example ConcurrentHashMap key: socket hash code, value: thread hash code or smth else). As for me it's a little bit stupid solution, but it can be used to.

Upvotes: 1

Shaaer
Shaaer

Reputation: 557

There's a concept in operating systems software engineering called the critical section. A critical section occurs when 2 or more processes have shared data and they are concurrently executed, in this case, no process should modify or even read this shared data if there's another process accessing these data. So as a process enters the critical section it should notify all other concurrently executed processes that it's currently modifying the critical section, so all other processes should be blocked-waiting-to enter this critical section. you would ask who organize what process enters, this is another problem called process scheduling that controls what process should enter this critical section and the operating system do that for you.

so the best solution to you is using a semaphore where the value of the semaphore is the number of sockets, in your case, I think you have one socket so you will use a semaphore-Binary Semaphore- initialized with a semaphore value = 1, then your code should be divided into four main sections: critical section entry, the critical section, critical section exiting and remainder section.

  • Critical section entry: where a process enters the critical section and block all other processes. The semaphore will allow one Process-Thread-to enter the critical section-use a socket- and the value of the semaphore will be decremented-equal to zero-.
  • The critical section: the critical section code that the process should do.
  • Critical section exiting: the process releasing the critical section for another process to enter. The semaphore value will be incremented-equal to 1-allowing another process to enter
  • Remainder section: the rest of all your code excluding the previous 3 sections.

Now all you need is to open any Java tutorials about semaphores to know how to apply a semaphore in Java, it's really easy.

Upvotes: 1

Related Questions