Khalifeh
Khalifeh

Reputation: 11

How to synchronize operations?

I have this implementation for two-phase locking. The problem is it works perfect in most of the scenarios but not all of them. I figure out the problem come because of my usage of Synchronized which is used in wrong way some how! I need the order of operations to be like this:

  1. The thread who unlock any lock should finish this phase (ul method) completely before any other thread start locking the same lock.
  2. The thread who was waiting for a lock when it wakes up should get all the waiters of that lock before it starts executing it's operations.
  3. any addEdge or removeEdge need to be synchronized because I'm using the same graph object with all threads.

Here is the code:

class LockTable{
     private HashMap<Integer,MyLock> locks;
     public LockTable(){
         locks= new HashMap<Integer,MyLock>();   
    }

/*******************************************************************************
 * Acquire a shared/read lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void rl (int tid, int oid) throws InterruptedException
{

    MyLock lock=null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock

            if((lock != null) && (lock.lock.isWriteLocked())){

               wait = true;
            }
            if(lock == null){
             getIt = true;
             lock = new MyLock(tid, true);
             lock.lock.readLock().lock();
             lock.readers.add(tid);
             locks.put(oid, lock);
          }

        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
    }//synch
    if (wait){
        synchronized(this){
            //System.out.println("Transaction " + tid + " is waiting..");
            lock.waiters.add(tid);
            lock.waitersType.add('s');
            Main.g.addEdge(tid, lock.tid);
            //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
        }//sync
         if(Main.g.hasCycle()){
            //System.out.println("Detect Cycle in rl..Executing Restart");
            restart(tid);
        }
        //to exclude the restarted thread
        if(!Main.trans[tid].terminate){
            lock.lock.readLock().lock();
            lock.readers.add(tid);
          synchronized(this){
            lock.waitersType.remove(lock.waiters.indexOf(tid));
            lock.waiters.remove(lock.waiters.indexOf(tid));
            }//sync
            for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
            }//for
        }//if terminate
        else
            return;
    }
   else
    if(!getIt){
              //System.out.println("Getting Shared Lock without Waiting");
              lock.lock.readLock().lock();
              lock.readers.add(tid);
              for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
              } 
    }

} // rl

/*******************************************************************************
 * Acquire an exclusive/write lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void wl (int tid, int oid) throws InterruptedException
{
     //type to determine the last lock type in order
     //to be able to remove the edges from waitfor graph
    int type = 0;
    MyLock lock = null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock
            if(lock != null && (lock.lock.isWriteLocked() || lock.readers.size() > 0))
            {
                wait = true;
            }
            if(lock == null){
                getIt = true;
                lock = new MyLock(tid);
                lock.lock.writeLock().lock();
                locks.put(oid,lock);
            }
        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
  }
     if (wait){
            //System.out.println("Transaction " + tid + " is waiting..");
           synchronized(this) {
            if(lock.lock.isWriteLocked()){
                Main.g.addEdge(tid, lock.tid);
                //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
               }
            else{
                type = 1;
                for(int reader : lock.readers){
                     Main.g.addEdge(tid, reader);
                     //System.out.println("Edge has been added "+tid + "==>" + reader);
                }
                }//else
            if(Main.g.hasCycle()){
               //System.out.println("Detect Cycle");
               restart(tid);
             }//if
            //System.out.println("Graph: "+Main.g.toString());
         }//sync



       if(!Main.trans[tid].terminate){
            synchronized(this){
            lock.waiters.add(tid);
            lock.waitersType.add('w');
           }
            //System.out.println("I'm waiting here in wl");
            lock.lock.writeLock().lock();
            lock.tid = tid;
            //System.out.println("Wakeup..");
            synchronized(this){
                lock.waitersType.remove(lock.waiters.indexOf(tid));
                lock.waiters.remove(lock.waiters.indexOf(tid));
                //System.out.println("the number of waiters after wakeup: " + lock.waiters.size());
           }//sync
                for(int i =0 ; i < lock.waiters.size();i++){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                   // System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    //System.out.println("Graph: "+Main.g.toString());
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//for
         }
        else
            return; 
    }// if(wait) ==> for the lock to be released
    else
        if(!getIt){
            lock.lock.writeLock().lock();
            lock.tid = tid;
            for(int i =0 ; i < lock.waiters.size();i++){
                synchronized(Main.g){
                Main.g.addEdge(lock.waiters.get(i), tid);
                //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                }
                if(Main.g.hasCycle())
                    restart(lock.waiters.get(i));
            }//for

     }

} // wl

void restart(int tid){
 synchronized(this) {
    Main.rollBack++;
    MyLock lock;
    List<Integer> toRemove = new ArrayList();
    for(int i : locks.keySet()){
       lock = locks.get(i);
       //for all the locks in the lock table delete the restarted thread from the waiters list
       if(lock.waiters.contains(tid)){
           lock.waitersType.remove(lock.waiters.indexOf(tid));
           lock.waiters.remove(lock.waiters.indexOf(tid));
        }

           //lock.sem.release();
           if(lock.lock.isWriteLockedByCurrentThread()){


               //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), lock.tid);
               //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
               lock.lock.writeLock().unlock();
                //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());
               toRemove.add(i);

           }
       if(!lock.lock.isWriteLocked())
           if(lock.readers.contains(tid) && lock.lock.getReadLockCount()>0){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), tid);
                   // lock.numberOfReaders --;
                  //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
                  lock.readers.remove(lock.readers.indexOf(tid));
                  lock.lock.readLock().unlock();
                  //System.out.println("number of write holders: " + lock.lock.getWriteHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
                  toRemove.add(i);  

              }//if
    }//for
    Main.g.removeEdges(tid);
    Main.trans[tid].terminate = true;
    //System.out.println("Transaction" + tid + " restarted");

    }//sync
}

/*******************************************************************************
 * Unlock/release the lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void ul (int tid, int oid)
{
   MyLock lock = null;
    boolean error = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);                    // find the lock
            if( lock == null)
                System.out.println("println: lock not found");
        } catch(Exception e) {
            System.out.println("lock not found");   // lock not found
        } // try
    }
        if((lock != null) && (lock.lock.isWriteLockedByCurrentThread())){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), lock.tid);
                      }//sync
                   //System.out.println("tid: " + tid + " unlock object: " + oid);
                   lock.lock.writeLock().unlock();
                  //print out 
                  //System.out.println("done with unlock");
                  //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
      }// if lock != null
        else
          if((lock != null) && (lock.readers.size()>0)){
              synchronized(this){
              if(lock.readers.contains(tid)){
                lock.readers.remove(lock.readers.indexOf(tid));
                }
                //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), tid);
                      }
                lock.lock.readLock().unlock();
                //System.out.println("Transaction"+tid+" unlocked shared lock on object "+oid);
                //System.out.println("number of write holders: " + lock.lock.readLock().);
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());

              }//if lock.readers
          }//if
    if (error) 
        System.out.println ("Error: ul: no lock for oid = " + oid + " found/owned");
} // ul

Upvotes: 1

Views: 793

Answers (1)

rodion
rodion

Reputation: 15029

There are couple of problems with your usage of synchronized.

  1. Except very rare cases, all read/write (especially write) access to a shared object must be synchronized on the same object. You are synchronizing on Main.g when you write to it in some places, but then there are places where it's not synchronized (e.g inside if(wait) of wl method).

  2. As far as I understood the objective of your code, the state of lock.{waiters|readers} has to be in sync with the graph's state (Main.g). This means that whenever you make a change to, say, waiters and then update Main.g accordingly, those two operations must be performed atomically. To do this you need to wrap them in one un-broken synchronized statement. I can see that you understand this concept, because you are doing this in some places, but then you seem to missing it out in others. For example: in rl method, inside the if(!getIt){ you update lock.readers and then Main.g without any synchronization.

Overall, I cannot give you anything specific, since the code is quite complicated so it's hard to tell what is its intention. But I think you can resolve some locking problems by locking larger sections of your code. For example you could just synchronize the whole methods rl, wl and ul on this by adding synchronized keyword to the method declaration, and get rid of all fine-grained locking inside the methods. Fine-grained locking can give you better performance, but at a cost of high complexity. I would recommend you start with simpler locking schemes first, and then gradually improve them (if you really really need to!).

Upvotes: 1

Related Questions