user21043025
user21043025

Reputation: 1

Thread safety count problem,Why is there a problem with counting in the code?

Why is the following code thread unsafe?

In ConcurrentHashMap, the key is the current time, the value is MutableInteger, and the mutableInteger is used for counting, and the set method and get method have been locked. Why is the final value incorrect?

public class MutableInteger {
    private volatile int value;

    public MutableInteger() {
    }

    public synchronized int getValue() {
        return value;
    }

    public synchronized void setValue(int value) {
        this.value = value;
    }
}
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.concurrent.*;

public class LinkedQueue {

    static ConcurrentHashMap<String,MutableInteger> map = new ConcurrentHashMap();

    public static class CountQueue{
        static BlockingQueue<Calendar> queue = new LinkedBlockingQueue<Calendar>();

        public void produce() throws InterruptedException{
            queue.put(Calendar.getInstance());
        }
        public Calendar consume() throws InterruptedException{
            return queue.take();
        }
    }

    /**
     */
    public static void recordFlow(){
        final CountQueue queue = new CountQueue();
        class Producer implements Runnable {
            public void run() {
                try {
                    queue.produce();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
        class Consumer implements Runnable {
            public void run() {
                try {
                    getdata(queue.consume());
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }

        ExecutorService service = Executors.newCachedThreadPool();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        service.submit(producer);
        service.submit(consumer);
        service.shutdown();
    }

    /**
     *
     *
     * @param c 队列中取出的时间
     */
    public static void getdata(Calendar c){
        try{
            saveFlow(c);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    // 接口流量记录
    public static void saveFlow(Calendar c){
        System.out.println("Thread Name:"+Thread.currentThread().getName());
        SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
        SimpleDateFormat sdfHour = new SimpleDateFormat("yyyy-MM-dd HH");
        SimpleDateFormat sdfMinute = new SimpleDateFormat("yyyy-MM-dd HH:mm");

        String dayAcceptKey = sdfDate.format(c.getTime());
        String hourAcceptKey = sdfHour.format(c.getTime());
        String minuteAcceptKey = sdfMinute.format(c.getTime());
        
        MutableInteger newMinuteAcceptKey = new MutableInteger();
        newMinuteAcceptKey.setValue(1);
        MutableInteger oldMinuteAcceptKey = map.put(minuteAcceptKey, newMinuteAcceptKey);
        if(null != oldMinuteAcceptKey) {
            newMinuteAcceptKey.setValue(oldMinuteAcceptKey.getValue() + 1);
        }
        System.out.println("minute in map:"+ minuteAcceptKey + ","+ newMinuteAcceptKey.getValue());
        MutableInteger newHourAcceptKey = new MutableInteger();
        newHourAcceptKey.setValue(1);
        MutableInteger oldHourAcceptKey = map.put(hourAcceptKey, newHourAcceptKey);
        if(null != oldHourAcceptKey) {
            newHourAcceptKey.setValue(oldHourAcceptKey.getValue() + 1);
        }
        System.out.println("hour in map:" + hourAcceptKey + "," + newHourAcceptKey.getValue());
        MutableInteger newDayAcceptKey = new MutableInteger();
        newDayAcceptKey.setValue(1);
        MutableInteger oldDayAcceptKey = map.put(dayAcceptKey, newDayAcceptKey);
        if(null != oldDayAcceptKey) {
            newDayAcceptKey.setValue(oldDayAcceptKey.getValue() + 1);
        }
        System.out.println("day in map:" + dayAcceptKey + "," + newDayAcceptKey.getValue());
    }

    public static void main(String[] args){
        for(int i=0;i<1000;i++) {
            recordFlow();
        }
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("==========================");
        for(String key:map.keySet()){
            System.out.println("key and value:" +key + " " + map.get(key).getValue());
        }
    }
}

The get method and set method in MutableInteger have been locked and are used ConcurrentHashMap, but the result data in the main method is incorrect. Please help me see what the problem is,thank you

Upvotes: 0

Views: 120

Answers (1)

Joachim Sauer
Joachim Sauer

Reputation: 308131

You've got the same issue repeatedly, which is that you are interacting with a shared data structure in a non-synchronized way. The fact that map is a ConcurrentHashMap means that individual .get and .put calls will not break thread safety, you use multiple of those calls with the assumption that the map doesn't change between them.

    MutableInteger newMinuteAcceptKey = new MutableInteger();
    newMinuteAcceptKey.setValue(1);
    MutableInteger oldMinuteAcceptKey = map.put(minuteAcceptKey, newMinuteAcceptKey);
    // let's assume here something happens
    if(null != oldMinuteAcceptKey) {
        newMinuteAcceptKey.setValue(oldMinuteAcceptKey.getValue() + 1);
    }

If at the point marked with a comment above some other thread also puts a new MutableInteger into the map, then your later change to newMinuteAcceptKey (which should really be newMinuteAcceptValue, since it's not the key!) will be irrelevant, as that value is no longer in the map.

In order to fix this, replace this whole thing with a single compute or merge call. Then you also don't really need MutableInteger, but can switch to a boring old Integer, if you want:

map.merge(minuteAcceptKey, 1, Integer::sum);

Those methods will do the whole operation in an atomic way, i.e. make sure that no other calls manipulate the data until they are complete.

A short summary: If you need a ConcurrentHashMap, then you almost never want to have plain .put() calls (except maybe when populating the initial values in a single-threaded way), as those method have rather weak concurrency guarantees (i.e. they only guarantee to not break the map, but any other operation can easily overwrite their values at any time).

Upvotes: 3

Related Questions