Reputation: 3316
I am implementing an UDAF according to UDAF example. the update
phase there looks like this:
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
String inputKey = input.getString(0);
Map<String, Long> inputValues = input.<String, Long>getJavaMap(1);
Map<String, Map<String, Long>> newData = new HashMap<>();
if (!buffer.isNullAt(0)) {
Map<String, Map<String, Long>> currData = buffer.<String, Map<String, Long>>getJavaMap(0);
newData.putAll(currData);
}
newData.put(inputKey, inputValues);
buffer.update(0, newData);
}
}
You can see that on every step a new HashMap is created (newData) and the data from the previous buffer is copied into it. It looks like an awful waste, having to create new Maps and copying all the elements. So I tried (in my case I have a map with a slightly different types):
bufferJavaMap = buffer.<String, Integer>getJavaMap(0);
bufferJavaMap.put("aaaa", 1);
buffer.update(0, bufferJavaMap);
I receive the following error:
java.lang.UnsupportedOperationException
at java.util.AbstractMap.put(AbstractMap.java:209)
at dns.MergeMapUDAF.update(MergeMapUDAF.java:84)
Isn't it possible to update the existing Map? what is the best method update this Map?
Upvotes: 1
Views: 190
Reputation: 35229
Isn't it possible to update the existing Map?
It is not possible, but the problem is more complex than the one identified in your. Spark makes a full copy of the structure on both get
and update
so even removing explicit copy wouldn't resolve the problem.
If performance is required, you should avoid using UserDefinedAggregateFunction
with non-atomic types.
Upvotes: 1