Mark
Mark

Reputation: 23

Java 8 Stream: Defining collectors based on other collectors

I'm new to using Java 8 Stream APIs but I'm looking to use it for the following problem. Say I have a POJO called InputRecord containing name, fieldA, and fieldB properties that can represent each row record of the following:

name | fieldA | fieldB
----------------------
A    | 100    | 1.1
A    | 150    | 2.0
B    | 200    | 1.5
A    | 120    | 1.3

InputRecord would look like:

public class InputRecord {
    private String name;
    private Integer fieldA;
    private BigDecimal fieldB;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getFieldA() {
        return fieldA;
    }

    public void setFieldA(Integer fieldA) {
        this.fieldA = fieldA;
    }

    public BigDecimal getFieldB() {
        return fieldB;
    }

    public void setFieldB(BigDecimal fieldB) {
        this.fieldB = fieldB;
    }
}

Those four records above need to be combined into two records grouped by name, where:

  1. Property fieldA is summed
  2. Property fieldB is summed
  3. The combined records includes a fieldC property which is the result of multiplying the accumulating sums of both fieldA and fieldB.

Therefore the results for above would be:

name | sumOfFieldA | sumOfFieldB | fieldC (sumOfFieldA*sumOfFieldB)
-------------------------------------------------------------------
A    | 370         | 4.4         | 1628
B    | 200         | 1.5         | 300

A different POJO called OutputRecord would represent each row record of the combined records:

public class OutputRecord {
    private String name;
    private Integer sumOfFieldA;
    private BigDecimal sumOfFieldB;
    private BigDecimal fieldC;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getSumOfFieldA() {
        return sumOfFieldA;
    }

    public void setSumOfFieldA(Integer sumOfFieldA) {
        this.sumOfFieldA = sumOfFieldA;
    }

    public BigDecimal getSumOfFieldB() {
        return sumOfFieldB;
    }

    public void setSumOfFieldB(BigDecimal sumOfFieldB) {
        this.sumOfFieldB = sumOfFieldB;
    }

    public BigDecimal getFieldC() {
        return fieldC;
    }

    public void setFieldC(BigDecimal fieldC) {
        this.fieldC = fieldC;
    }
}

What are some good approaches/solutions for transforming a list of InputRecords into a list of OutputRecords?

I was seeing if the following link would help but I got stuck trying to put collectors for fieldA and fieldB together in order to form a new collector for fieldC: Java 8 Stream: groupingBy with multiple Collectors

Collector<InputRecord, ?, Integer> fieldACollector = Collectors.summingInt(InputRecord::getFieldA);
Collector<InputRecord, ?, BigDecimal> fieldBCollector = Collectors.reducing(BigDecimal.ZERO, InputRecord::getFieldB, BigDecimal::add);

List<Collector<InputRecord, ?, ?>> collectors = Arrays.asList(fieldACollector, fieldBCollector); // need a fieldCCollector object in the list

The collectors object would then be used to create a complexCollector object (as per the accepted answer by Tagir Valeev in the above link).

Upvotes: 2

Views: 607

Answers (4)

123-xyz
123-xyz

Reputation: 637

Instead of defining the customized Collectors(which is I think it's complicated and difficult to maintain), I think a general utility method to combine multiple Collectors will be much better. For Example:

public static <T, A1, A2, R1, R2> Collector<T, Tuple2<A1, A2>, Tuple2<R1, R2>> combine(final Collector<? super T, A1, R1> collector1,
        final Collector<? super T, A2, R2> collector2) {
 ...
}

Working with the combine method, the solution will be:

Collector<InputRecord, ?, Integer> fieldACollector = MoreCollectors.summingInt(InputRecord::getFieldA);
Collector<InputRecord, ?, BigDecimal> fieldBCollector = MoreCollectors.reducing(BigDecimal.ZERO, InputRecord::getFieldB, BigDecimal::add);

inputRecords.stream().collect(MoreCollectors.groupingBy(InputRecord::getName, 
                            MoreCollectors.combine(fieldACollector, fieldBCollector)))
        .entrySet().stream()
        .map(e -> new OutputRecord(e.getKey(), e.getValue()._1, e.getValue()._2))
        .collect(Collectors.toList());

Here is the sample implementation for the combine in AbacusUtil

StreamEx.of(inputRecords)
        .groupBy(InputRecord::getName, MoreCollectors.combine(fieldACollector, fieldBCollector))
        .map(e -> new OutputRecord(e.getKey(), e.getValue()._1, e.getValue()._2)).toList();

Upvotes: 0

Ianislav Trendafilov
Ianislav Trendafilov

Reputation: 56

You can use Stream.reduce(..) to transform two records into a single one. It creates a bunch of temporary objects that needs to be garbage collected by the JVM.

Collection<InputRecord> input = Arrays.asList(
        new InputRecord("A", 100, new BigDecimal(1.1)),
        new InputRecord("A", 150, new BigDecimal(2.0)),
        new InputRecord("B", 200, new BigDecimal(1.5)),
        new InputRecord("A", 120, new BigDecimal(1.3)));

Collection<OutputRecord> output = input.stream()
        // group records for particular Name into a List
        .collect(Collectors.groupingBy(InputRecord::getName))
        .values().stream()
        // Reduce every List to a single records, performing summing
        .map(records -> records.stream()
                .reduce((a, b) ->
                        new InputRecord(a.getName(),
                                a.getFieldA() + b.getFieldA(),
                                a.getFieldB().add(b.getFieldB()))))
        .filter(Optional::isPresent)
        .map(Optional::get)
        // Finally transform the InputRecord to OutputRecord
        .map(record -> new OutputRecord(record.getName(),
                record.getFieldA(),
                record.getFieldB(),
                record.getFieldB().multiply(new BigDecimal(record.getFieldA()))))
        .collect(Collectors.toList());

Upvotes: 1

Eugene
Eugene

Reputation: 120848

To me the cleanest way is to build a custom collector for that. There are multiple lines of code here, but you can hide it under a method, so your ultimate operation would look like this:

Collection<OutputRecord> output = List.of(first, second, thrid, fourth)
            .stream()
            .parallel()
            .collect(toOutputRecords());

While the actual toOutputRecords would be:

 private static Collector<InputRecord, ?, Collection<OutputRecord>> toOutputRecords() {
    class Acc {

        Map<String, OutputRecord> map = new HashMap<>();

        void add(InputRecord elem) {
            String value = elem.getName();
            // constructor without fieldC since you compute it at the end
            OutputRecord record = new OutputRecord(value, elem.getFieldA(), elem.getFieldB());
            mergeIntoMap(map, value, record);
        }

        Acc merge(Acc right) {
            Map<String, OutputRecord> leftMap = map;
            Map<String, OutputRecord> rightMap = right.map;

            for (Entry<String, OutputRecord> entry : rightMap.entrySet()) {
                mergeIntoMap(leftMap, entry.getKey(), entry.getValue());
            }
            return this;
        }

        private void mergeIntoMap(Map<String, OutputRecord> map, String value, OutputRecord record) {

            map.merge(value, record, (left, right) -> {
                left.setSumOfFieldA(left.getSumOfFieldA() + right.getSumOfFieldA());
                left.setSumOfFieldB(left.getSumOfFieldB().add(right.getSumOfFieldB()));

                return left;
            });
        }

        public Collection<OutputRecord> finisher() {
            for (Entry<String, OutputRecord> e : map.entrySet()) {
                OutputRecord output = e.getValue();
                output.setFieldC(output.getSumOfFieldB().multiply(BigDecimal.valueOf(output.getSumOfFieldA())));
            }
            return map.values();
        }

    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}

Upvotes: 3

Chota Bheem
Chota Bheem

Reputation: 1116

You can can use combine and aggregate functions to generate the list of OutputRecords from List of InputRecords.

Map<String, OutputRecord> result = inputRecords.stream().collect(() -> new HashMap<>(),
                (HashMap<String, OutputRecord> map, InputRecord inObj) -> {
                    OutputRecord out = map.get(inObj.getName());
                    if (out == null) {
                        out = new OutputRecord();
                        out.setName(inObj.getName());
                        out.setSumOfFieldA(inObj.getFieldA());
                        out.setSumOfFieldB(inObj.getFieldB());
                    } else {

                        Integer s = out.getSumOfFieldA();
                        out.setSumOfFieldA(s + inObj.getFieldA());
                        BigDecimal bd = out.getSumOfFieldB();
                        out.setSumOfFieldB(bd.add(inObj.getFieldB()));
                    }
                    out.setFieldC(out.getSumOfFieldB().multiply(new BigDecimal(out.getSumOfFieldA())));
                    map.put(out.getName(), out);

                }, (HashMap<String, OutputRecord> out1, HashMap<String, OutputRecord> out2) -> {
                    out1.putAll(out2);
                });

        System.out.println(result);

Upvotes: 0

Related Questions