antonpuz
antonpuz

Reputation: 3316

Creating a new accumulator on each update step of UDAF

I am implementing an UDAF according to UDAF example. the update phase there looks like this:

    public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
        String inputKey = input.getString(0);
        Map<String, Long> inputValues = input.<String, Long>getJavaMap(1);
        Map<String, Map<String, Long>> newData = new HashMap<>();

        if (!buffer.isNullAt(0)) {
            Map<String, Map<String, Long>> currData = buffer.<String, Map<String, Long>>getJavaMap(0);
            newData.putAll(currData);
        }
        newData.put(inputKey, inputValues);
        buffer.update(0, newData);
    }
}

You can see that on every step a new HashMap is created (newData) and the data from the previous buffer is copied into it. It looks like an awful waste, having to create new Maps and copying all the elements. So I tried (in my case I have a map with a slightly different types):

bufferJavaMap = buffer.<String, Integer>getJavaMap(0);
bufferJavaMap.put("aaaa", 1);
buffer.update(0, bufferJavaMap);

I receive the following error:

java.lang.UnsupportedOperationException
   at java.util.AbstractMap.put(AbstractMap.java:209)
   at dns.MergeMapUDAF.update(MergeMapUDAF.java:84)

Isn't it possible to update the existing Map? what is the best method update this Map?

Upvotes: 1

Views: 190

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

Isn't it possible to update the existing Map?

It is not possible, but the problem is more complex than the one identified in your. Spark makes a full copy of the structure on both get and update so even removing explicit copy wouldn't resolve the problem.

If performance is required, you should avoid using UserDefinedAggregateFunction with non-atomic types.

Upvotes: 1

Related Questions