user1906450
user1906450

Reputation: 547

Java atomicity / proper use of synchronizers & atomicity to achieve a simple use case

Working in a very simple use case challenge( just something self inflicted)
Use case: A collection, that just writes data to it.
The caveat is : once it reaches a certain size (eg:25 size), the data currently in it should be flushed out( say written to disk eg). At no point should the data breached the specified limit and at no point should flush size exceed the data breach.
Which essentially also means, while flush is ongoing, data writes should not be allowed. Since add/flush is operating on same data structure.

The algorithm works well when everything is single threaded its essentially as simple as

while(addToDataAndSizeBreached())
flushData()

However as expected in multithreaded applications this fails severely. Use case still works but at no point is the data breach respected ( eg : sometimes size of collection would exceed 25, sometimes size of flush would be anything but 25, etc etc) I went through and reread through my entire collection of java multithreaded dos and donts, however i am still not able to solve it.



The only thing that works well is taking a lock on the entire thing

ReentrantReadWriteLock.WriteLock writeLock = new ReentrantReadWriteLock().writeLock();
try{
    writeLock.lock();
    while(addToDataAndSizeBreached())
    flushData()
}finally{
writeLock.unlock();
}


However i am not satisfied with the above approach and wanted to check with you all for a better solution. Kindly guide, i am ok to research and attempt on my own post that
Things that i have tried:

  • Taking a complete lock on the entire thing : works well but looking for a better non blocking solution
  • Tried using a combination of read and write lock, but since if multiple thread have read lock,write lock waits..this ends up with data structure having a size more than 25
  • Tried approaching with synchronizers such as countdownlatch ( not sure how it would work since it does not reset), and cyclicbarrrier( could not understand how to use it , since i would not know the thread count before hand)
  • Tried the below solution as well, however data size is not respected and somehow also leads to deadlock, which i cant figure out why

  • package com.org.store.ecommerce;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class Runner {
    
        static ArrayList l = new ArrayList();
        static AtomicBoolean full = new AtomicBoolean(false);
        static AtomicBoolean flushOnGoing = new AtomicBoolean(false);
        static ReentrantReadWriteLock.WriteLock writeLock = new ReentrantReadWriteLock().writeLock();
    
        static void addToData(){
    
            while(addToDataAndCheckSize()){ // while size==maxsize : flushComplete.wait()
                System.out.println(String.format("Add attempt %s thread %d time  %d array size %d number of locks held", Thread.currentThread().getName(), System.nanoTime(), l.size(), writeLock.getHoldCount()));
               try{
                   writeLock.lock();
                   flushOnGoing.getAndSet(true);
                   flushData();
               }
                finally {
                   System.out.println(String.format("Flush complete %s thread %d time flushed %d flush size", Thread.currentThread().getName(), System.nanoTime(), l.size()));
                   flushOnGoing.getAndSet(false);
                   writeLock.unlock();
               }
    
            }
    
        }
    
        private static boolean flushData() {
            System.out.println(String.format("%s thread %d time flushed %d flush size %d number of locks held", Thread.currentThread().getName(), System.nanoTime(), l.size(), writeLock.getHoldCount()));
            l.clear();
            return l.size()==0;
        }
    
        private static boolean addToDataAndCheckSize() {
            System.out.println(String.format("%s thread %d time add to data ", Thread.currentThread().getName(), System.nanoTime()));
            String value = UUID.randomUUID().toString();
            if (!flushOnGoing.get())
                l.add(value);
            System.out.println(String.format("%s thread %d inside add size==%d",Thread.currentThread().getName(), System.nanoTime(),l.size()));
            return l.size()==2;
        }
    
        public static void main(String[] args) {
    //        addToData();
    //        addToData();
    //        addToData();
            ExecutorService ex = Executors.newFixedThreadPool(2);
            List<CompletableFuture<String>> cfList = new ArrayList<>();
            for(int n =0; n<=5;n++){
                  cfList.add(CompletableFuture.supplyAsync(()->{
                      addToData();
                      return "D";
                      }, ex));
    
            }
            for(CompletableFuture cf:cfList)
                cf.join();
    
        }
    
    
    }
    

    Upvotes: 0

    Views: 64

    Answers (0)

    Related Questions