Reputation: 3579
There are writer which updates prices by calling putPrice
method. Reader is using getPrice
to get a latest price. hasChangedMethod
returns a boolean identifying if price has been changed since last time getPrice
has been invoked.
I am looking for fastest solution. I am trying to achieve thread-safe consistent read/writes into the map on key level.
I think that locking the whole map may cause a performance issue thats why I decided to make it on the key level. Unfortunately it doesn't work as expected and blocks the whole map. Why? Could you please help me to figure out what I am doing wrong here?
UPDATE:
I guess we can summarise in two questions: 1. how do I provide free access to rest of the keys if one is in the update process. 2. How do I guarantee atomic operations of my methods since they require multiple operations read/write. eg getPrice()
- get price and update hasChanged
flag.
PriceHolder.java
public final class PriceHolder {
private ConcurrentMap<String, Price> prices;
public PriceHolder() {
this.prices = new ConcurrentHashMap<>();
//Receive starting prices..
Price EUR = new Price();
EUR.setHasChangedSinceLastRead(true);
EUR.setPrice(new BigDecimal(0));
Price USD = new Price();
USD.setHasChangedSinceLastRead(true);
USD.setPrice(new BigDecimal(0));
this.prices.put("EUR", EUR);
this.prices.put("USD", USD);
}
/** Called when a price ‘p’ is received for an entity ‘e’ */
public void putPrice(
String e,
BigDecimal p) throws InterruptedException {
synchronized (prices.get(e)) {
Price currentPrice = prices.get(e);
if (currentPrice != null && !currentPrice.getPrice().equals(p)) {
currentPrice.setHasChangedSinceLastRead(true);
currentPrice.setPrice(p);
} else {
Price newPrice = new Price();
newPrice.setHasChangedSinceLastRead(true);
newPrice.setPrice(p);
prices.put(e, newPrice);
}
}
}
/** Called to get the latest price for entity ‘e’ */
public BigDecimal getPrice(String e) {
Price currentPrice = prices.get(e);
if(currentPrice != null){
synchronized (prices.get(e)){
currentPrice.setHasChangedSinceLastRead(false);
prices.put(e, currentPrice);
}
return currentPrice.getPrice();
}
return null;
}
/**
* Called to determine if the price for entity ‘e’ has
* changed since the last call to getPrice(e).
*/
public boolean hasPriceChanged(String e) {
synchronized (prices.get(e)){
return prices.get(e) != null ? prices.get(e).isHasChangedSinceLastRead() : false;
}
}
}
Price.java
public class Price {
private BigDecimal price;
public boolean isHasChangedSinceLastRead() {
return hasChangedSinceLastRead;
}
public void setHasChangedSinceLastRead(boolean hasChangedSinceLastRead) {
this.hasChangedSinceLastRead = hasChangedSinceLastRead;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
private boolean hasChangedSinceLastRead = false;
}
Upvotes: 12
Views: 7325
Reputation: 33
I think you can put lock on you specific key only.
import java.util.HashMap;
public class CustomHashmap<K, V> extends HashMap<K, V>{
private static final long serialVersionUID = 1L;
@Override
public V put(K key, V value) {
V val = null;
synchronized (key) {
val = super.put(key, value);
}
return val;
}
@Override
public V get(Object key) {
return super.get(key);
}
}
Upvotes: 0
Reputation: 8938
The best way to get what you want is probably to go ahead and use a ConcurrentMap for the map, and to put all other synchronization in your Price class. This will lead to simpler code, which is always highly valuable in a multithreaded environment to avoid subtle bugs, while also achieving your goals of simultaneous access to the map and tracking of whether there has been a write since the last read for each currency.
In the Price class, whenever you set a price, you also want to set hasChangedSinceLastRead; these two things go together as one operation that should be atomic. Whenever you read the price, you also want to clear hasChangedSinceLastRead; this you also want to be atomic. Thus, the class should only permit these two operations to modify hasChangedSinceLastRead, rather than leaving the logic to other classes, and the methods should be synchronized to ensure that price
and hasChangedSinceLastRead
cannot get out of sync due to access by multiple threads. The class should now look like this:
public class Price {
public boolean isHasChangedSinceLastRead() {
return hasChangedSinceLastRead;
}
// setHasChangedSinceLastRead() removed
public synchronized BigDecimal getPrice() {
hasChangedSinceLastRead = false;
return price;
}
public synchronized void setPrice(BigDecimal newPrice) {
if (null != price && price.equals(newPrice) {
return;
}
price = newPrice;
hasChangedSinceLastRead = true;
}
private BigDecimal price;
private volatile boolean hasChangedSinceLastRead = false;
}
Note that you can either make isHasChangedSinceLastRead() synchronized, or hasChangedSinceLastRead volatile; I chose the latter, leaving isHasChangedSinceLastRead() unsynchronized, because synchronizing the method requires a full memory barrier, while making the variable volatile requires only the read half of a memory barrier when the variable is read.
The reason a read of hasChangedSinceLastRead requires some sort of memory barrier is because synchronized methods, blocks, and volatile access only guarantee effective execution order - the "happens before" relationship - with other synchronized methods, blocks, and volatile access. If isHasChangedSinceLastRead is not synchronized and the variable is not volatile, no "happens before" relationship exists; in this case isHasChangedSinceLastRead could return "true" but the thread calling it might not see the change in price. This is because the setting of price
and hasChangedSinceLastRead
in setPrice()
might be seen in the reverse order by other threads if the "happens before" relationship has not been established.
Now that all of the necessary synchronization is in the ConcurrentMap and Price classes, you no longer need to do any synchronization at all in PriceHolder, and PriceHolder no longer has to worry about updating hasChangedSinceLastRead. The code is simplified to:
public final class PriceHolder {
private ConcurrentMap<String, Price> prices;
public PriceHolder() {
prices = new ConcurrentHashMap<>();
//Receive starting prices..
Price EUR = new Price();
EUR.setPrice(new BigDecimal(0));
this.prices.put("EUR", EUR);
Price USD = new Price();
USD.setPrice(new BigDecimal(0));
this.prices.put("USD", USD);
}
/** Called when a price ‘p’ is received for an entity ‘e’ */
public void putPrice(
String e,
BigDecimal p
) throws InterruptedException {
Price currentPrice = prices.get(e);
if (null == currentPrice) {
currentPrice = new Price();
currentPrice = prices.putIfAbsent(e);
}
currentPrice.setPrice(p);
}
/** Called to get the latest price for entity ‘e’ */
public BigDecimal getPrice(String e) {
Price currentPrice = prices.get(e);
if (currentPrice != null){
return currentPrice.getPrice();
}
return null;
}
/**
* Called to determine if the price for entity ‘e’ has
* changed since the last call to getPrice(e).
*/
public boolean hasPriceChanged(String e) {
Price currentPrice = prices.get(e);
return null != currentPrice ? currentPrice.isHasChangedSinceLastRead() : false;
}
}
Upvotes: 0
Reputation: 48874
hasChangedMethod
returns a boolean identifying if price has been changed since last timegetPrice
has been invoked.
This is a problematic pattern, since hasPriceChanged
essentially needs to return something different per-thread. If you can elaborate a little more on what you're actually trying to do (i.e. why you think you need this pattern) it may be possible to offer an alternative. For example, consider doing away with hasPriceChanged
entirely, and simply treating this data structure as canonical and querying its current value every time.
That said, here's how I might implement the behavior you're looking for. There may well be alternatives, this is just a first pass.
Keep a ConcurrentHashMap<String, ThreadLocal<Boolean>>
; the ThreadLocal
will store the status of get calls per-thread. I also use a separate, private map of locks.
ConcurrentHashMap<String, Price> pricesMap;
ConcurrentHashMap<String, ThreadLocal<Boolean>> seenMap;
ConcurrentHashMap<String, Object> lockMap;
private Object getLock(String key) {
return lockMap.computeIfAbsent(key, k -> new Object());
}
private ThreadLocal<Boolean> getSeen(String key) {
return seenMap.computeIfAbsent(e,
ThreadLocal.<Boolean>withInitial(() -> false));
}
public void putPrice(String e, BigDecimal p) {
synchronized (getLock(e)) {
// price has changed, clear the old state to mark all threads unseen
seenMap.remove(e);
pricesMap.get(e).setPrice(p);
}
}
public BigDecimal getPrice(String e) {
synchronized (getLock(e)) {
// marks the price seen for this thread
getSeen(e).set(true);
BigDecimal price = pricesMap.get(e);
return price != null ? price.getPrice() : null;
}
}
public boolean hasPriceChanged(String e) {
synchronized (getLock(e)) {
return !getSeen(e).get();
}
}
Notice that while the data structure is thread-safe there is still a risk of a race condition here - you could call hasPriceChanged()
and get back false
, immediately after which the price is changed by another thread. Doing away with this hasPriceChanged()
behavior will likely simplify your code.
Upvotes: 1
Reputation: 298539
The use of a ConcurrentMap
heavily depends on the Java version. When you are using Java 8 or newer, you get almost everything for free:
public final class PriceHolder {
private ConcurrentMap<String, Price> prices;
public PriceHolder() {
this.prices = new ConcurrentHashMap<>();
//Receive starting prices..
Price EUR = new Price();
EUR.setHasChangedSinceLastRead(true);
EUR.setPrice(BigDecimal.ZERO);
Price USD = new Price();
USD.setHasChangedSinceLastRead(true);
USD.setPrice(BigDecimal.ZERO);
this.prices.put("EUR", EUR);
this.prices.put("USD", USD);
}
/** Called when a price ‘p’ is received for an entity ‘e’ */
public void putPrice(String e, BigDecimal p) {
prices.compute(e, (k, price)-> {
if(price==null) price=new Price();
price.setHasChangedSinceLastRead(true);
price.setPrice(p);
return price;
});
}
/** Called to get the latest price for entity ‘e’ */
public BigDecimal getPrice(String e) {
Price price = prices.computeIfPresent(e, (key, value) -> {
value.setHasChangedSinceLastRead(false);
return value;
});
return price==null? null: price.getPrice();
}
/**
* Called to determine if the price for entity ‘e’ has
* changed since the last call to getPrice(e).
*/
public boolean hasPriceChanged(String e) {
final Price price = prices.get(e);
return price!=null && price.isHasChangedSinceLastRead();
}
}
The compute…
methods on a concurrent map lock the affected entry for the duration of the computation while letting updates of all other entries proceed. For simple get
access like in hasPriceChanged
, no additional synchronization is necessary as long as you call it only once in a method, i.e. keep the result in a local variable while examining.
Before Java 8, things are more complicated. There, all ConcurrentMap
offers, are certain atomic update methods which can be used to build more high-level update methods in a try-and-repeat fashion.
To use it cleanly, the best way is to make the value class immutable:
public final class Price {
private final BigDecimal price;
private final boolean hasChangedSinceLastRead;
Price(BigDecimal value, boolean changed) {
price=value;
hasChangedSinceLastRead=changed;
}
public boolean isHasChangedSinceLastRead() {
return hasChangedSinceLastRead;
}
public BigDecimal getPrice() {
return price;
}
}
Then use it to always construct a new object reflecting the desired new state and perform atomic updates using either putIfAbsent
or replace
:
public final class PriceHolder {
private ConcurrentMap<String, Price> prices;
public PriceHolder() {
this.prices = new ConcurrentHashMap<>();
//Receive starting prices..
Price EUR = new Price(BigDecimal.ZERO, true);
Price USD = EUR; // we can re-use immutable objects...
this.prices.put("EUR", EUR);
this.prices.put("USD", USD);
}
/** Called when a price ‘p’ is received for an entity ‘e’ */
public void putPrice(String e, BigDecimal p) {
Price old, _new=new Price(p, true);
do old=prices.get(e);
while(old==null? prices.putIfAbsent(e,_new)!=null: !prices.replace(e,old,_new));
}
/** Called to get the latest price for entity ‘e’ */
public BigDecimal getPrice(String e) {
for(;;) {
Price price = prices.get(e);
if(price==null) return null;
if(!price.isHasChangedSinceLastRead()
|| prices.replace(e, price, new Price(price.getPrice(), false)))
return price.getPrice();
}
}
/**
* Called to determine if the price for entity ‘e’ has
* changed since the last call to getPrice(e).
*/
public boolean hasPriceChanged(String e) {
final Price price = prices.get(e);
return price!=null && price.isHasChangedSinceLastRead();
}
}
Upvotes: 3
Reputation: 48874
- how do I provide free access to rest of the keys if one is in the update process.
Simply using a ConcurrentHashMap
is sufficient to ensure free access to the keys; get
's do not introduce any contention, and put
s only lock a subset of keys rather than the whole map.
- How do I guarantee atomic operations of my methods since they require multiple operations read/write.
To ensure consistency you need to synchronize on some shared object (or use another locking mechanism, like ReentrantLock
); I would suggest creating a ConcurrentHashMap<String, Object>
of lock objects, so that you can do:
synchronized (locks.get(e)) { ... }
Just populate the map with new Object()
's. The risk with the pattern you use (of locking on the Price objects) is now these objects must persist and never be replaced. It's easier to enforce that by having a dedicated private collection of locks rather than overloading your value type as a lock mechanism.
As an aside, if you're trying to do money operations in Java, you should absolutely be using the Joda-Money library, rather than reinventing the wheel.
Upvotes: 1
Reputation: 1035
I would recommend using the observable - observer pattern. No need to re-invent the wheel. See Observer and Observable
I would also recommend looking into Condition Since no need to lock up the complete object for all readers. Reading can be concurrent but writing can't.
If a collection is concurrent it doesn't mean that it magically synchronizes everything. It just guarantees that their methods are thread-safe. Once you leave the function scope the lock is released. Because you need a more advanced way of controlling synchronization it is best you take this into your own hands and use the normal HashMap.
Some notes:
You are overusing the HashMap.get. Consider getting it once and storing it in a variable.
synchronized (prices.get(e))
This may return null and you should be checking for it. synchronized on null objects is not allowed.
prices.put(e, currentPrice);
I'm not sure if this is intended, but this action is not needed. See this
Upvotes: 0
Reputation: 12630
How about something like
class AtomicPriceHolder {
private volatile BigDecimal value;
private volatile boolean dirtyFlag;
public AtomicPriceHolder( BigDecimal initialValue) {
this.value = initialValue;
this.dirtyFlag = true;
}
public synchronized void updatePrice( BigDecimal newPrice ) {
if ( this.value.equals( newPrice ) == false) {
this.value = newPrice;
this.dirtyFlag = true;
}
}
public boolean isDirty() {
return this.dirtyFlag;
}
public BigDecimal peek() {
return this.value;
}
public synchronized BigDecimal read() {
this.dirtyFlag = false;
return this.value;
}
}
...
public void updatePrice( String id, BigDecimal value ) {
AtomicPriceHolder holder;
synchronized( someGlobalSyncObject ) {
holder = prices.get(id);
if ( holder == null ) {
prices.put( id, new AtomicPriceHolder( value ) );
return;
}
}
holder.updatePrice( value );
}
Note though that it probably does not make any sense this way, because the actual atomic modification of the price's value is so fast that you cannot expect to gain anything from unlocking the map before.
The conditional operations "check if it's in the map, create a new one and insert if not" must be atomic, and should be done by locking the whole map for that brief period. Anything else would require a dedicated synchronization object for each key. These would have to be stored and managed somewhere, and access to that store would have to be synchronized again &c.
Just do the coarse-grained locking to ensure you have correctness and then move on.
Upvotes: 1