CaptainHastings
CaptainHastings

Reputation: 1597

Lock Free: Spilitting compound, non-atomic operation into simpler atomic operations

I am writing a financial market data distributor application as part of my M.S project. The application is almost finished but the app is not scaling well.

This is how it works, I subscribe to a "fake" exchange to get market data. After subscribing, I get an initial snapshot and then I receive deltas continously ...

1) Subscribe for IBM.
2) Snapshot : Buy:50|Size:1400||Sell:49|Size:1000
(At buy price of 50, 1400 shares and at sell price of 49, 1000 shares available)
3) Update1: -1|10||-2|25
(Buy price is now 49 (50-1), buy Size is 1410. Sell price is 47 and sell size is 1025)
4) Update2 ...
.
.
.

I have one class that processes the market data, the dataUpdate() below is a callback method which the "fake" exchange application calls in one thread.

class MarketDataProcessor:
Map<String, MarketData> marketDataMap      = new HashMap<String, MarketData>(1000);
ConcurrentMap<String, MarketData> distMap  = new ConcurrentHashMap<String, MarketData>();

//**Always called by one thread (MarketDataThread)**
    public void dataUpdate( MarketDataUpdate dataUpdate ){

            try{
                 //IBM
                 String symbol   = dataUpdate .getSymbol();             
                 MarketData data = marketDataMap.get( symbol );

                 if ( data = null ){
                    data = new MarketData( dataUpdate );
                 }else{
                    //Apply the delta here and save it ...
                    marketDataMap.put( symbol, data );
                 }

                 distMap.put( symbol, data );

            }catch( Exception e ){
               LOGGER.warn("Exception while processing market data.");
               LOGGER.warn("Exception: {}", e);
            }
    }

After getting market data from the exchange, I need to distribute it in a thread safe manner. This is the method that doesnt scale well as it could be called by 20+ threads and it uses an external lock to ensure atomicity.

public final double[] getData( String symbol, Side side ){
     double[] dataArray = {0.0, 0.0};

     synchronized( LOCK ){
        MarketData data = distMap.get( symbol );
        dataArray       = ( side == BUY ) ? getBuyData(data) : getSellData(data); 
     }

  return dataArray;
}

Here is my proposed solution is to split the above method into two.

//No external lock as it uses a ConcurrentHashMap
public final MarketData getData( String symbol ){
       return distMap.get( symbol );
}

//State of this method is now confimed to the
//stack of the calling thread, therefore thread safe.
public final double[] getData( MarketData data, Side side ){
       return ( side == BUY ) ? getBuyData(data) : getSellData(data); 
}

Conceding that this will change the api and make the users call two methods than one, does it not make it thread-safe without using an external lock?

Thank you.

Upvotes: 2

Views: 209

Answers (3)

Michael Deardeuff
Michael Deardeuff

Reputation: 10697

While the other answers provide good advise, there is another option available that is lock free: use a persistent map (like from pcollections). The reader threads would always have a consistent snapshot of the data; the writer thread would be creating creating new versions of the map-but without copying all the data.

Upvotes: 0

Adrian Shum
Adrian Shum

Reputation: 40036

First, I have no idea what is the resources that your LOCK is controlling accessing. Which part of your code is unsafe and why you think using the LOCK there is helping? Your new methods are also not providing same signature as your original one. Do you expect people to use it like

getData(getData(symbol), side)

?

if the original code without lock is not thread safe, your new way has nothing different from that.

Go back to your original implementation: The only resources you are accessing in the LOCK-protected piece of code is distMap. No other place is using the LOCK. Therefore what you are doing here is you want only 1 thread to get data from distMap. However I don't see rationale for that: First distMap is a ConcurrentMap, it is thread safe to be accessed by multiple thread. Second, for every kind of Map, simply getting data is always thread safe, because no one is changing the state. Normally what you need to perform synchronization control should include the state-changing logic, which is where you put data in the map (though in this specific piece of code, there is no reason adding extra synchronization control), but you are not doing so. I simply cannot understand the whole purpose for your LOCK.

Some comments to you: simply blindly using synchronized/LOCK etc is not changing your code automatically to thread-safe. You need to know what is not thread-safe and do your synchronization control accordingly. Another comment is: breaking a non-atomic method into atomic piece is NOT going to make your code thread-safe. Just think in another way: almost all lowest level action in Java is atomic. Does that mean your code is automatically thread safe because you are calling a bunch of atomic actions? Obviously not.

Upvotes: 1

Bill K
Bill K

Reputation: 62769

It's not a great idea to have many threads dumping into a single synchronized thread that takes time.

Could you dump those results into a synchronized queue to be analyzed later? A single thread can pull items out of the queue while many threads put it in.

Upvotes: 2

Related Questions