Edward Chen
Edward Chen

Reputation: 63

Java concurrency counter not properly clean up

This is a java concurrency question. 10 jobs need to be done, each of them will have 32 worker threads. Worker thread will increase a counter . Once the counter is 32, it means this job is done and then clean up counter map. From the console output, I expect that 10 "done" will be output, pool size is 0 and counterThread size is 0.

The issues are :

  1. most of time, "pool size: 0 and countThreadMap size:3" will be printed out. even those all threads are gone, but 3 jobs are not finished yet.

  2. some time, I can see nullpointerexception in line 27. I have used ConcurrentHashMap and AtomicLong, why still have concurrency exception.

Thanks

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

public class Test {
    final ConcurrentHashMap<Long, AtomicLong[]> countThreadMap = new ConcurrentHashMap<Long, AtomicLong[]>();
    final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    final ThreadPoolExecutor tPoolExecutor = ((ThreadPoolExecutor) cachedThreadPool);

    public void doJob(final Long batchIterationTime) {
        for (int i = 0; i < 32; i++) {
            Thread workerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (countThreadMap.get(batchIterationTime) == null) {
                        AtomicLong[] atomicThreadCountArr = new AtomicLong[2];
                        atomicThreadCountArr[0] = new AtomicLong(1);
                        atomicThreadCountArr[1] = new AtomicLong(System.currentTimeMillis()); //start up time
                        countThreadMap.put(batchIterationTime, atomicThreadCountArr);
                    } else {
                        AtomicLong[] atomicThreadCountArr = countThreadMap.get(batchIterationTime);
                        atomicThreadCountArr[0].getAndAdd(1);
                        countThreadMap.put(batchIterationTime, atomicThreadCountArr);
                    }

                    if (countThreadMap.get(batchIterationTime)[0].get() == 32) {
                        System.out.println("done");
                        countThreadMap.remove(batchIterationTime);
                    }
                }
            });
            tPoolExecutor.execute(workerThread);
        }
    }

    public void report(){
        while(tPoolExecutor.getActiveCount() != 0){
            //
        }
        System.out.println("pool size: "+ tPoolExecutor.getActiveCount() + " and countThreadMap size:"+countThreadMap.size());
    }

    public static void main(String[] args) throws Exception {
        Test test = new Test();
        for (int i = 0; i < 10; i++) {
            Long batchIterationTime = System.currentTimeMillis();
            test.doJob(batchIterationTime);
        }

        test.report();
        System.out.println("All Jobs are done");

    }
}

Upvotes: 0

Views: 259

Answers (2)

Holger
Holger

Reputation: 298389

Let’s dig through all the mistakes of thread related programming, one man can make:

Thread workerThread = new Thread(new Runnable() {
…
tPoolExecutor.execute(workerThread);

You create a Thread but don’t start it but submit it to an executor. It’s a historical mistake of the Java API to let Thread implement Runnable for no good reason. Now, every developer should be aware, that there is no reason to treat a Thread as a Runnable. If you don’t want to start a thread manually, don’t create a Thread. Just create the Runnable and pass it to execute or submit.

I want to emphasize the latter as it returns a Future which gives you for free what you are attempting to implement: the information when a task has been finished. It’s even easier when using invokeAll which will submit a bunch of Callables and return when all are done. Since you didn’t tell us anything about your actual task, it’s not clear whether you can let your tasks simply implement Callable (may return null) instead of Runnable.

If you can’t use Callables or don’t want to wait immediately on submission, you have to remember the returned Futures and query them at a later time:

static final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

public static List<Future<?>> doJob(final Long batchIterationTime) {
    final Random r=new Random();
    List<Future<?>> list=new ArrayList<>(32);
    for (int i = 0; i < 32; i++) {
        Runnable job=new Runnable() {
            public void run() {
                // pretend to do something
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(r.nextInt(10)));
            }
        };
        list.add(cachedThreadPool.submit(job));
    }
    return list;
}

public static void main(String[] args) throws Exception {
    Test test = new Test();
    Map<Long,List<Future<?>>> map=new HashMap<>();
    for (int i = 0; i < 10; i++) {
        Long batchIterationTime = System.currentTimeMillis();
        while(map.containsKey(batchIterationTime))
            batchIterationTime++;
        map.put(batchIterationTime,doJob(batchIterationTime));
    }
    // print some statistics, if you really need
    int overAllDone=0, overallPending=0;
    for(Map.Entry<Long,List<Future<?>>> e: map.entrySet()) {
        int done=0, pending=0;
        for(Future<?> f: e.getValue()) {
            if(f.isDone()) done++;
            else  pending++;
        }
        System.out.println(e.getKey()+"\t"+done+" done, "+pending+" pending");
        overAllDone+=done;
        overallPending+=pending;
    }
    System.out.println("Total\t"+overAllDone+" done, "+overallPending+" pending");
    // wait for the completion of all jobs
    for(List<Future<?>> l: map.values())
        for(Future<?> f: l)
            f.get();
    System.out.println("All Jobs are done");
}

But note that if you don’t need the ExecutorService for subsequent tasks, it’s much easier to wait for all jobs to complete:

cachedThreadPool.shutdown();
cachedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
System.out.println("All Jobs are done");

But regardless of how unnecessary the manual tracking of the job status is, let’s delve into your attempt, so you may avoid the mistakes in the future:

if (countThreadMap.get(batchIterationTime) == null) {

The ConcurrentMap is thread safe, but this does not turn your concurrent code into sequential one (that would render multi-threading useless). The above line might be processed by up to all 32 threads at the same time, all finding that the key does not exist yet so possibly more than one thread will then be going to put the initial value into the map.

                    AtomicLong[] atomicThreadCountArr = new AtomicLong[2];
                    atomicThreadCountArr[0] = new AtomicLong(1);
                    atomicThreadCountArr[1] = new AtomicLong(System.currentTimeMillis());
                    countThreadMap.put(batchIterationTime, atomicThreadCountArr);

That’s why this is called the “check-then-act” anti-pattern. If more than one thread is going to process that code, they all will put their new value, being confident that this was the right thing as they have checked the initial condition before acting but for all but one thread the condition has changed when acting and they are overwriting the value of a previous put operation.

                } else {
                    AtomicLong[] atomicThreadCountArr = countThreadMap.get(batchIterationTime);
                    atomicThreadCountArr[0].getAndAdd(1);
                    countThreadMap.put(batchIterationTime, atomicThreadCountArr);

Since you are modifying the AtomicInteger which is already stored into the map, the put operation is useless, it will put the very array that it retrieved before. If there wasn’t the mistake that there can be multiple initial values as described above, the put operation had no effect.

                }

                if (countThreadMap.get(batchIterationTime)[0].get() == 32) {

Again, the use of a ConcurrentMap doesn’t turn the multi-threaded code into sequential code. While it is clear that the only last thread will update the atomic integer to 32 (when the initial race condition doesn’t materialize), it is not guaranteed that all other threads have already passed this if statement. Therefore more than one, up to all threads can still be at this point of execution and see the value of 32. Or…

                    System.out.println("done");
                    countThreadMap.remove(batchIterationTime);

One of the threads which have seen the 32 value might execute this remove operation. At this point, there might be still threads not having executed the above if statement, now not seeing the value 32 but producing a NullPointerException as the array supposed to contain the AtomicInteger is not in the map anymore. This is what happens, occasionally…

Upvotes: 1

DoubleDouble
DoubleDouble

Reputation: 1493

After creating your 10 jobs, your main thread is still running - it doesn't wait for your jobs to complete before it calls report on the test. You try to overcome this with the while loop, but tPoolExecutor.getActiveCount() is potentially coming out as 0 before the workerThread is executed, and then the countThreadMap.size() is happening after the threads were added to your HashMap.

There are a number of ways to fix this - but I will let another answer-er do that because I have to leave at the moment.

Upvotes: 0

Related Questions