Concurrently reading a Map while a single background thread regularly modifies it Concurrently reading a Map while a single background thread regularly modifies it multithreading multithreading

Concurrently reading a Map while a single background thread regularly modifies it


You use the following synchronization techniques.

  1. The map with live socket data is behind an atomic reference, this allows safely switching the map.
  2. The updateLiveSockets() method is synchronized (implicitly on this), this will prevent switching the map by two threads simultaneously.
  3. You make a local reference to the map when using it to avoid mixups if the switch happens during the getNextSocket() method.

Is it thread safe, as it is now?

Thread safety always hinges on whether there is proper synchronization on shared mutable data. In this case the shared mutable data is the map of datacenters to their list of SocketHolders.

The fact that the map is in an AtomicReference, and making a local copy for use is enough synchronization on the map. Your methods take a version of the map and use that, switching versions is thread safe due to the nature of AtomicReference. This could also have been achieved with just making the member field for the map volatile, as all you do is update the reference (you don't do any check-then-act operations on it).

As scheduleAtFixedRate() guarantees that the passed Runnable will not be run concurrently with itself, the synchronized on updateLiveSockets() is not needed, however, it also doesn't do any real harm.

So yes, this class is thread safe, as it is.

However, it's not entirely clear if a SocketHolder can be used by multiple threads simultaneously. As it is, this class just tries to minimize concurrent use of SocketHolders by picking a random live one (no need to shuffle the entire array to pick one random index though). It does nothing to actually prevent concurrent use.

Can it be made more efficient?

I believe it can. When looking at the updateLiveSockets() method, it seems it builds the exact same map, except that the SocketHolders may have different values for the isLive flag. This leads me to conclude that, rather than switching the entire map, i just want to switch each of the lists in the map. And for changing entries in a map in a thread safe manner, I can just use ConcurrentHashMap.

If I use a ConcurrentHashMap, and don't switch the map, but rather, the values in the map, I can get rid of the AtomicReference.

To change the mapping I can just build the new list and put it straight into the map. This is more efficient, as I publish data sooner, and I create fewer objects, while my synchronization just builds on ready made components, which benefits readability.

Here's my build (omitted some parts that were less relevant, for brevity)

public class SocketManager {    private static final Random random = new Random();    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap    private final ZContext ctx = new ZContext();    // ...    private SocketManager() {      connectToZMQSockets();      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);    }    // during startup, making a connection and populate once    private void connectToZMQSockets() {      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map      }    }    // ...          // this method will be called by multiple threads to get the next live socket    // is there any concurrency or thread safety issue or race condition here?    public Optional<SocketHolder> getNextSocket() {      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>        if (liveSocket.isPresent()) {          return liveSocket;        }      }      return Optional.absent();    }    // is there any concurrency or thread safety issue or race condition here?    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {      if (!CollectionUtils.isEmpty(listOfEndPoints)) {        // The list of live sockets        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());        for (SocketHolder obj : listOfEndPoints) {          if (obj.isLive()) {            liveOnly.add(obj);          }        }        if (!liveOnly.isEmpty()) {          // The list is not empty so we shuffle it an return the first element          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one        }      }      return Optional.absent();    }    // no need to make this synchronized    private void updateLiveSockets() {      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();        for (SocketHolder liveSocket : liveSockets) { // LINE A          Socket socket = liveSocket.getSocket();          String endpoint = liveSocket.getEndpoint();          Map<byte[], byte[]> holder = populateMap();          Message message = new Message(holder, Partition.COMMAND);          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);          boolean isLive = (status) ? true : false;          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);          liveUpdatedSockets.add(zmq);        }        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.      }    }}


If SocketHolder and Datacenters, are immutable, your programs looks fine. Here is some minor feedback, though.

1. Usage of AtomicReference

AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter

This member variable does not need to be wrapped in a AtomicReference. You are not doing any atomic CAS operation with it. You could simply declare a volative Map<Datacenters, List<SocketHolder>>, and when reading it, simply create a local reference to it. This is enough to guarantee an atomic swap of the reference to the new Map.

2. Synchronized method

private synchronized void updateLiveSockets()

This method is called from a single thread executor, so there is no need for it to be synchronized.

3. Some simplifications

  • From your current usage of this class, it seems like you could filter out sockets which are not alive in updateLiveSockets, avoiding to filter every time a client calls getNextSocket

  • You can replaceMap<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERSby Set<Datacenters> datacenters = Utils.SERVERS.keySet() and work with the keys.

    4. Java 8

If possible, switch to Java 8. Streams together with Java8's Optional would remove a lot of boilerplate code and make your code much easier to read.