Amitoj
Amitoj

Reputation: 433

Reduce a Java stream to combine adjacent elements with equal property values

I have the following example data set that I want to transform/reduce using the Java Stream API based on direction's value

Direction    int[]
IN           1, 2
OUT          3, 4
OUT          5, 6, 7
IN           8
IN           9
IN           10, 11
OUT          12, 13
IN           14

to

Direction    int[]
IN           1, 2, 
OUT          3, 4, 5, 6, 7
IN           8, 9, 10, 11
OUT          12, 13
IN           14

Code that I've written so far:

enum Direction { IN, OUT }

class Tuple {
  Direction direction;
  int[] data;

  public Tuple merge(Tuple t) {
      return new Tuple(direction, concat(getData(), t.getData()));
  }
}

private static int[] concat(int[] first, int[] second) {
    int[] result = Arrays.copyOf(first, first.length + second.length);
    System.arraycopy(second, 0, result, first.length, second.length);
    return result;
}

List<Tuple> reduce = tupleStream.reduce(new ArrayList<>(), WDParser::add, WDParser::combine);

private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2) {
    System.out.println("combine");
    list1.addAll(list2);
    return list1;
}

private static List<Tuple> add(List<Tuple> list, Tuple t) {
    System.out.println("add");
    if (list.size() == 0) {
        list.add(t);
    } else if (list.size() > 0) {
        int lastIndex = list.size() - 1;
        Tuple last = list.get(lastIndex);
        if (last.getDirection() == t.getDirection())
            list.set(lastIndex, last.merge(t));
        else
            list.add(t);
    }

    return list;
}

I believe there is a better and simpler alternative to achieving the same.

Online examples and blogs I've found for Java Stream API reduce/combine use the Integer::sum function only. I'm hoping to build this up for more complex case scenarios.

Upvotes: 13

Views: 2353

Answers (5)

M. Justin
M. Justin

Reputation: 21347

Here's a solution that uses the groupRuns method of the StreamEx library to group the stream into runs of elements with the same direction value. It then merges those runs of elements together into a single new Tuple.

List<Tuple> result = StreamEx.of(list)
        .groupRuns((t1, t2) -> t1.getDirection() == t2.getDirection())
        .map(ts -> new Tuple(
                ts.getFirst().getDirection(),
                ts.stream().flatMapToInt(t -> Arrays.stream(t.getData())).toArray()))
        .toList();

Upvotes: 0

Malte Hartwig
Malte Hartwig

Reputation: 4553

I think your solution is pretty nice already, especially as using a reduction enables parallelism easily compared to collecting into a shared outside container. But it's easier to use collect instead of reduce as Holger pointed out. Furthermore, the conditions in the accumulator can be simplified a bit, and you forgot to merge the last and first elements in the combiner:

List<Tuple> reduce = tupleStream.collect(ArrayList::new, WDParser::add, WDParser::combine);

private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2)
{
    if (!list2.isEmpty())
    {
        add(list1, list2.remove(0)); // merge lists in the middle if necessary
        list1.addAll(list2);         // add all the rest
    }
    return list1;
}

private static List<Tuple> add(List<Tuple> list, Tuple t)
{
    int lastIndex = list.size() - 1;
    if (list.isEmpty() || list.get(lastIndex).getDirection() != t.getDirection())
    {
        list.add(t);
    }
    else
    {
        list.set(lastIndex, list.get(lastIndex).merge(t));
    }
    return list;
}

Instead of using indexes to access the first/last element you could even use LinkedList and the methods add/removeFirst/Last().

Upvotes: 5

Eugene
Eugene

Reputation: 121048

How about this. First define a small helper method:

private static Tuple mergeTwo(Tuple left, Tuple right) {
    int[] leftArray = left.getData();
    int[] rightArray = right.getData();
    int[] result = new int[leftArray.length + rightArray.length];
    System.arraycopy(leftArray, 0, result, 0, leftArray.length);
    System.arraycopy(rightArray, 0, result, leftArray.length, rightArray.length);
    return new Tuple(left.getDirection(), result);
}

This is close to your concat/merge I guess, but a single one. Basically a way to merge two Tuple(s) together.

And a helper method to produce the needed Collector, you can put this into a utility so that it can be re-used:

private static Collector<Tuple, ?, List<Tuple>> mergedTuplesCollector() {
    class Acc {

        ArrayDeque<Tuple> deque = new ArrayDeque<>();

        void add(Tuple elem) {
            Tuple head = deque.peek();
            if (head == null || head.getDirection() != elem.getDirection()) {
                deque.offerFirst(elem);
            } else {
                deque.offerFirst(mergeTwo(deque.poll(), elem));
            }
        }

        Acc merge(Acc right) {

            Tuple lastLeft = deque.peekLast();
            Tuple firstRight = right.deque.peekFirst();

            if (lastLeft.getDirection() == firstRight.getDirection()) {
                deque.offerLast(mergeTwo(deque.pollLast(), right.deque.pollFirst()));
            } else {
                deque.addAll(right.deque);
            }

            return this;
        }

        public List<Tuple> finisher() {
            return new ArrayList<>(deque);
        }

    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}

And usage would be, for example:

List<Tuple> merged = tuples.stream()
            .parallel()
            .collect(mergedTuplesCollector());

Upvotes: 3

Flown
Flown

Reputation: 11740

I've got two ideas on this topic. First one is getting the indices like in this answer and group it accordingly.

The second idea - if you already got a Stream a custom Collector should be used (similar to the other solutions, though using Deque):

private Collector<Tuple, ?, List<Tuple>> squashTuples() {
  return new Collector<Tuple, Deque<Tuple>, List<Tuple>>() {
    @Override
    public Supplier<Deque<Tuple>> supplier() {
      return ArrayDeque::new;
    }

    @Override
    public BiConsumer<Deque<Tuple>, Tuple> accumulator() {
      return (acc, e) -> {
        Objects.requireNonNull(e);
        if (!acc.isEmpty() && acc.peekLast().getDirection() == e.getDirection()) {
          acc.offerLast(acc.pollLast().merge(e));
        } else {
          acc.offerLast(e);
        }
      };
    }

    @Override
    public BinaryOperator<Deque<Tuple>> combiner() {
      return (left, right) -> {
        if (!left.isEmpty() && !right.isEmpty() && left.peekLast().getDirection() == right.peekFirst().getDirection()) {
          left.offerLast(left.pollLast().merge(right.pollFirst()));
        }
        left.addAll(right);
        return left;
      };
    }

    @Override
    public Function<Deque<Tuple>, List<Tuple>> finisher() {
      return ArrayList::new;
    }

    @Override
    public Set<Characteristics> characteristics() {
      return EnumSet.noneOf(Characteristics.class);
    }
  };
}

Upvotes: 0

ernest_k
ernest_k

Reputation: 45339

This is an alternative approach that uses slightly different data structures.

If this is an option, changing from int[] to List<Integer> allows for more flexibility (not to mention avoiding creating/copying arrays multiple times):

class Tuple {
    Direction direction;
    List<Integer> data;
}

And the following function does the merging on a Deque collection:

private static List<Integer> next(Deque<Tuple> t, Direction d) {
    if (!t.isEmpty() && t.peekLast().getDirection() == d) {
        return t.peekLast().getData();
    } else {
        Tuple next = new Tuple();
        next.direction = d;
        next.data = new ArrayList<>();
        t.addLast(next);
        return next.data;
    }
}

And with that, the stream can look as simple as:

Deque<Tuple> deq = new LinkedList<>(); //the final collection of tuples

tuples.stream()
.flatMap(tp -> tp.getData().stream()
                 .map(d -> Pair.of(tp.getDirection(), Integer.valueOf(d))))
.forEach(el -> next(deq, el.getLeft()).add(el.getRight()));

Upvotes: 1

Related Questions