Amit
Amit

Reputation: 656

Missing updates with locks and ConcurrentHashMap

I have a scenario where I have to maintain a Map which can be populated by multiple threads, each modifying their respective List (unique identifier/key being the thread name), and when the list size for a thread exceeds a fixed batch size, we have to persist the records to the database.

Aggregator class

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReentrantLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.unlock();
    }

}

There is one more separate thread running after every 2 minutes (using the same lock) to persist all the records in Map (to make sure we have something persisted after every 2 minutes and the map size does not gets too big)

if(//Some condition) {
    Thread.sleep(//2 minutes);
    aggregator.getLock().lock();
    List<T> instrumentList = instrumentMap.values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
    if(instrumentList.size() > 0) {
        saver.persist(instrumentList);
        instrumentMap .values().parallelStream().forEach(x -> x.clear());
        aggregator.getLock().unlock();
    }
}

This solution is working fine in almost for every scenario that we tested, except sometimes we see some of the records went missing, i.e. they are not persisted at all, although they were added fine to the Map.

My questions are:

  1. What is the problem with this code?
  2. Is ConcurrentHashMap not the best solution here?
  3. Does the List that is used with the ConcurrentHashMap have an issue?
  4. Should I use the compute method of ConcurrentHashMap here (no need I think, as ReentrantLock is already doing the same job)?

Upvotes: 8

Views: 515

Answers (3)

Amit
Amit

Reputation: 656

The answer provided by @Slaw in the comments did the trick. We were letting the instrumentList instance escape in non-synchronized way i.e. access/operations are happening over list without any synchonization. Fixing the same by passing the copy to further methods did the trick.

Following line of code is the one where this issue was happening

recordSaver.persist(instrumentList); instrumentList.clear();

Here we are allowing the instrumentList instance to escape in non-synchronized way i.e. it is passed to another class (recordSaver.persist) where it was to be actioned on but we are also clearing the list in very next line(in Aggregator class) and all of this is happening in non-synchronized way. List state can't be predicted in record saver... a really stupid mistake.

We fixed the issue by passing a cloned copy of instrumentList to recordSaver.persist(...) method. In this way instrumentList.clear() has no affect on list available in recordSaver for further operations.

Upvotes: 2

vanOekel
vanOekel

Reputation: 6548

It looks like this was an attempt at optimization where it was not needed. In that case, less is more and simpler is better. In the code below, only two concepts for concurrency are used: synchronized to ensure a shared list is properly updated and final to ensure all threads see the same value.

import java.util.ArrayList;
import java.util.List;

public class Aggregator<T> implements Runnable {

    private final List<T> instruments = new ArrayList<>();

    private final RecordSaver recordSaver;
    private final int batchSize;


    public Aggregator(RecordSaver recordSaver, int batchSize) {
        super();
        this.recordSaver = recordSaver;
        this.batchSize = batchSize;
    }

    public synchronized void addAll(List<T> moreInstruments) {

        instruments.addAll(moreInstruments);
        if (instruments.size() >= batchSize) {
            storeInstruments();
        }
    }

    public synchronized void storeInstruments() {

        if (instruments.size() > 0) {
            // in case recordSaver works async
            // recordSaver.persist(new ArrayList<T>(instruments));
            // else just:
            recordSaver.persist(instruments);
            instruments.clear();
        }
    }


    @Override
    public void run() {

        while (true) {
            try { Thread.sleep(1L); } catch (Exception ignored) {
                break;
            }
            storeInstruments();
        }
    }


    class RecordSaver {
        void persist(List<?> l) {}
    }

}

Upvotes: 0

user1643723
user1643723

Reputation: 4222

I see, that you are using ConcurrentHashMap's parallelStream within a lock. I am not knowledgeable about Java 8+ stream support, but quick searching shows, that

  1. ConcurrentHashMap is a complex data structure, that used to have concurrency bugs in past
  2. Parallel streams must abide to complex and poorly documented usage restrictions
  3. You are modifying your data within a parallel stream

Based on that information (and my gut-driven concurrency bugs detector™), I wager a guess, that removing the call to parallelStream might improve robustness of your code. In addition, as mentioned by @Slaw, you should use ordinary HashMap in place of ConcurrentHashMap if all instrumentMap usage is already guarded by lock.

Of course, since you don't post the code of recordSaver, it is possible, that it too has bugs (and not necessarily concurrency-related ones). In particular, you should make sure, that the code that reads records from persistent storage — the one, that you are using to detect loss of records — is safe, correct, and properly synchronized with rest of your system (preferably by using a robust, industry-standard SQL database).

Upvotes: 0

Related Questions