Reputation: 12397
I have a block of code provided below:
Map<String, BigDecimal> salesMap = new HashMap<>();
orderItems.parallelStream().forEach(orderItem -> {
synchronized (this) {
int itemId = orderItem.getItemId();
Item item = settingsClient.getItemByItemId(itemId);
String revenueCenterName = itemIdAndRevenueCenterNameMap.get(itemId);
updateSalesMap(salesMap, "Gross Sales: " + revenueCenterName, orderItem.getNetSales().toPlainString());
}
});
private void updateSalesMap(Map<String,BigDecimal> salesMap, String key, String amount) {
BigDecimal bd = getSalesAmount(salesMap, key);
int scale = 2;
if (StringUtils.isBlank(amount)) {
amount = "0.00";
}
BigDecimal addMe = BigDecimal.valueOf(Double.valueOf(amount)).setScale(scale, RoundingMode.HALF_UP);
salesMap.put(key, bd.add(addMe));
}
The code works fine, but if I don't use the synchronized
block, it will end of varying data in the map. As far I know, the streams are thread safe, so I get curious about whats happening. I tried to use ConcurrentHashMap but it seems nothing changed.
My idea is the map data is not written in the RAM and read/ write is done in the thread cache and hence, we end up having various data.
Is it correct? If so, I will use volatile
keyword then using a synchronized block.
Note: just find that I cant declare a variable volatile
inside a method.
Upvotes: 0
Views: 129
Reputation: 12397
I make the updateSalesMap thread-safe and that works for me:
protected synchronized void updateSalesMap(Map<String, BigDecimal> salesMap, String s, String amount) {
BigDecimal bd = updateSalesAmount(salesMap, s);
int scale = 2;
if (StringUtils.isBlank(amount)) {
amount = "0.00";
}
BigDecimal addMe = BigDecimal.valueOf(Double.valueOf(amount)).setScale(scale, RoundingMode.HALF_UP);
salesMap.put(s, bd.add(addMe));
}
Upvotes: 0
Reputation: 3131
As far I know, the streams are thread safe, so I get curious about whats happening.
They are. As long as you only operate on the stream itself. The problem is that you try to manipulate other variable at the same time (map in this case). The idea of streams is that operations on each of elements are totally independent - check idea of funcional programming.
I tried to use
ConcurrentHashMap
but it seems nothing changed.
The issue comes from your approach. The general idea is that atomic operations on ConcurrentHashMap
are thread safe. However, if you perform two thread safe operations together, it won't be atomic and thread safe. You need to synchronize it yourself or come up with some other solution.
In updateSalesMap()
method you first get value from the map, do some calculations and then update the value. This sequence of operations isn't atomic - performing them on ConcurrentHashMap
won't change much.
One of possible ways to achieve concurrency in this case would be to utilize CuncurrentHashMap.compute()
Javadocs
Upvotes: 3
Reputation: 1199
You are doing read operation using getSalesAmount(salesMap, key)
and write operation using salesMap.put(key, bd.add(addMe))
, in separate statements. The non-atomicity of this breakup of these operations is not going to change, irrespective of the kind of Map, you use. The synchronized
block will solve this ofcourse.
Alternatively, You can use ConcurrentHashMap
's compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)
, for the kind of atomicity, you are looking for.
Upvotes: 2