La-comadreja
La-comadreja

Reputation: 5755

Preventing concurrent modification to an object

I have 2 routes in Apache Camel, MultimapRoute (initializes a Multimap object on the start of the program, based on the results of a database query) and UpdatingRoute (a listener for updates to the database, where the Multimap object is updated when the database is updated). I would like to ensure that the MultimapRoute runs first and all the changes from the UpdatingRoute are processed in order. I would also like to ensure that all the concurrency issues are taken care of.

Here is the MultimapRoute class:

public class MultimapRoute implements org.apache.camel.Processor {
    static Multimap<String, String> m = new ArrayListMultimap<String, String>();
    static ReentrantLock l = new ReentrantLock();

    public void process(Exchange exchange) throws Exception {
        try {
            l.lock();
            //Query the database and fill out the map initially
        } finally {
            if (l.isHeldByCurrentThread()) l.unlock();
        }
    }
}

Here is the UpdatingRoute class:

public class UpdatingRoute implements org.apache.camel.Processor {
    public void process(Exchange exchange) throws Exception {
        try {
            l.lock();
            //Update the database based on latest event
        } finally {
            if (MultimapRoute.l.isHeldByCurrentThread())
                MultimapRoute.l.unlock();
        }
    }
}

I'm not sure what the thread safety, or other, problem is with this approach to multithreading, or how to make the program thread-safe. But I was told this was not the right way to handle multithreading for this particular application. Thanks so much for your help.

Upvotes: 0

Views: 397

Answers (1)

Alexey Malev
Alexey Malev

Reputation: 6533

First thing, you need to communicate somehow between two routes to implement this:

I would like to ensure that the MultimapRoute runs first and all the changes from the UpdatingRoute are processed in order.

To do this, I recommend using Condition based on ReentrantLock.

Now, second part of your question:

I'm not sure what the thread safety, or other, problem is with this approach to multithreading, or how to make the program thread-safe.

First of all, you need to clearly understand what exactly do you mean with your wish to make it thread-safe. My guess - you want to ensure that no thread is able to access the map m while some thread successfully started process() method (I mean, acquired lock). To do this, try changing the way you access the map. I mean, right now it is ok, but the reference to the map is available outside of these two classes. Here is my suggestion with both ideas implemented:

public class MultimapRoute implements org.apache.camel.Processor {
    private static Multimap<String, String> m = new ArrayListMultimap<String, String>();
    private static ReentrantLock l = new ReentrantLock();
    public static boolean routeFinishedFlag = false;
    public static final Condition multimapRouteFinished = l.newCondition();

    public void process(Exchange exchange) throws Exception {
        l.lock();
        try {
            //Query the database and load some data to newData object
            performMapUpdate(newData);
            routeFinishedFlag = true;
            multimapRouteFinished.signal(); 
        } catch (InterruptedException e) {
            //if we're here, it means someone interrupted the thread that invoked process() method while it was trying to acquire map lock
        } finally {
            l.unlock();
        }
    }

    public static void performMapUpdate(Object newData) throws InterruptedException {
        l.lock();
        try {
            //here do some magic with the map and newData object. You may need to change method signature, it is just an illustration of approach
        } finally {
            l.unlock();
        }
    }
}

public class UpdatingRoute implements org.apache.camel.Processor {
    public void process(Exchange exchange) throws Exception {
        l.lock();
        try {
            while (!routeFinishedFlag) {
                MultimapRoute.multimapRouteFinished.await();
            }
            //Update the database based on latest event - again, I assume you have newObject object
            MultimapRoute.performMapUpdate(newData);
        } catch (InterruptedException e) {
            //if we're here, it means someone interrupted the thread that invoked process() method while it was waiting for condition or trying to acquire map lock
        } finally {
             l.unlock();
        }
    }
}

Upvotes: 2

Related Questions