user1950349
user1950349

Reputation: 5146

Return an object from an array list in a thread safe way?

I have a class in which I am populating a map liveSocketsByDatacenter from a single background thread every 30 seconds inside updateLiveSockets() method and then I have a method getNextSocket() which will be called by multiple reader threads to get a live socket available which uses the same map to get this information.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
      new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      updatedLiveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(addedColoSockets));
    }
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
  }

  private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        this.liveSocketsByDatacenter.get();
    Optional<SocketHolder> liveSocket = Optional.absent();
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  // is there any concurrency or thread safety issue or race condition here?
  private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
    if (!CollectionUtils.isEmpty(endpoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
      for (SocketHolder obj : endpoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        Collections.shuffle(liveOnly);
        return Optional.of(liveOnly.get(0));
      }
    }
    return Optional.absent();
  }

  // Added the modifier synchronized to prevent concurrent modification
  // it is needed because to build the new map we first need to get the
  // old one so both must be done atomically to prevent concistency issues
  private synchronized void updateLiveSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        new HashMap<>(this.liveSocketsByDatacenter.get());

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) { // LINE A
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;
        // is there any problem the way I am using `SocketHolder` class?
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(liveUpdatedSockets));
    }
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
  }
}

As you can see in my class:

I have my code working fine without any issues and wanted to see if there is any better or more efficient way to write this. I also wanted to get an opinion on thread safety issues or any race conditions if any are there, but so far I haven't seen any but I could be wrong.

I am mostly worried about updateLiveSockets() method and getLiveSocketX() method. I am iterating liveSockets which is a List of SocketHolder at LINE A and then making a new SocketHolder object and adding to another new list. Is this ok here?

Note: SocketHolder is an immutable class.

Upvotes: 2

Views: 920

Answers (1)

Alexandre Dupriez
Alexandre Dupriez

Reputation: 3036

Neither of codes B or C is thread-safe.

Code B

When you are iterating on the enpoints list to copy it, nothing prevents another thread to modify, i.e. elements to be added and/or removed.

Code C

Assuming endpoints is not null, you are doing three calls to the list object: isEmpty, size, and get. There are several problems from a concurrency perspective:

  1. Based on the type List<SocketHolder> of the argument, there is no guarantee that these methods enforce internal changes to the list to be propagated to other threads (memory visibility), let apart race conditions (if the list is modified while while your thread execute one of this function).

  2. Let's suppose that the list endpoints provide the guarantee described just before - e.g. it has been wrapped with Collections.synchronizedList(). In this case, thread safety is still missing because between each of the calls to isEmpty, size, and get, the list can be modified while your thread executes the getLiveSocketX method. This could make your code use an outdated state of the list. For instance, your could use a size returned by endpoints.size() which is no longer valid because an element has been added to or removed from the list.

Edit - after code update

In the code you provided, it seems at first sight that:

  1. You are indeed not co-modifying the endpoints list we were discussing about before in the method getLiveSocketX, because the method updateLiveSockets creates a new list liveUpdatedSockets which you populate from the existing liveSockets.

  2. You use an AtomicReference to keep a map of Datacenters to the lists of sockets of interest. The consequences of this AtomicReference is to force memory visibility from this map down to all the lists and their elements. This means that, by side-effect, you are protected from memory inconsistencies between your "producer" and "consumer" threads (executing updateLiveSockets and getLiveSocket respectively). You are still exposed to race conditions, though - let's imagine updateLiveSockets and getLiveSocket running at the same time. Consider a socket S which status just switches from alive to closed. updateLiveSockets will see the status of a socket S as non-alive and create a new SocketHolder accordingly. However, getLiveSocket which is running at the exact same time will see an outdated state of S - since it will still use the list of sockets which updateLiveSockets is re-creating.

  3. The synchronized keyword used on the method updateLiveSockets does not provide you any guarantee here, because no other part of the code is also synchronized.

To summarize, I would say:

  1. The code of getLiveSocketX as it is written is not inherently thread-safe;
  2. However, the way you copy the lists prevents concurrent modifications; and you are benefiting from a side-effect of the AtomicReference to have the minimal guarantee on memory visibility one would expect to get consistent list of sockets in getNextSocket after those have been generated from another thread;
  3. You are still exposed to race conditions as described in (2), but this may be fine depending on the specifications you wish to confer to the getLiveSocket and getNextSocket methods - you may accept one socket returned by the getLiveSocket to be unavailable and have a retry mechanism.

All of that being said, I would thoroughly review and refactor the code to exhibit a more readable and explicit thread-safe consumer/producer pattern. Extra care should be taken with the use of the AtomicReference and the single synchronized, which seem to me being improperly used - although in fine the AtomicReference does help you as discussed before.

Upvotes: 2

Related Questions