Reputation: 433
I have the following example data set that I want to transform/reduce using the Java Stream API based on direction
's value
Direction int[]
IN 1, 2
OUT 3, 4
OUT 5, 6, 7
IN 8
IN 9
IN 10, 11
OUT 12, 13
IN 14
to
Direction int[]
IN 1, 2,
OUT 3, 4, 5, 6, 7
IN 8, 9, 10, 11
OUT 12, 13
IN 14
Code that I've written so far:
enum Direction { IN, OUT }
class Tuple {
Direction direction;
int[] data;
public Tuple merge(Tuple t) {
return new Tuple(direction, concat(getData(), t.getData()));
}
}
private static int[] concat(int[] first, int[] second) {
int[] result = Arrays.copyOf(first, first.length + second.length);
System.arraycopy(second, 0, result, first.length, second.length);
return result;
}
List<Tuple> reduce = tupleStream.reduce(new ArrayList<>(), WDParser::add, WDParser::combine);
private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2) {
System.out.println("combine");
list1.addAll(list2);
return list1;
}
private static List<Tuple> add(List<Tuple> list, Tuple t) {
System.out.println("add");
if (list.size() == 0) {
list.add(t);
} else if (list.size() > 0) {
int lastIndex = list.size() - 1;
Tuple last = list.get(lastIndex);
if (last.getDirection() == t.getDirection())
list.set(lastIndex, last.merge(t));
else
list.add(t);
}
return list;
}
I believe there is a better and simpler alternative to achieving the same.
Online examples and blogs I've found for Java Stream API reduce/combine use the Integer::sum
function only. I'm hoping to build this up for more complex case scenarios.
Upvotes: 13
Views: 2353
Reputation: 21347
Here's a solution that uses the groupRuns
method of the StreamEx library to group the stream into runs of elements with the same direction
value. It then merges those runs of elements together into a single new Tuple
.
List<Tuple> result = StreamEx.of(list)
.groupRuns((t1, t2) -> t1.getDirection() == t2.getDirection())
.map(ts -> new Tuple(
ts.getFirst().getDirection(),
ts.stream().flatMapToInt(t -> Arrays.stream(t.getData())).toArray()))
.toList();
Upvotes: 0
Reputation: 4553
I think your solution is pretty nice already, especially as using a reduction enables parallelism easily compared to collecting into a shared outside container. But it's easier to use collect
instead of reduce
as Holger pointed out. Furthermore, the conditions in the accumulator can be simplified a bit, and you forgot to merge the last and first elements in the combiner:
List<Tuple> reduce = tupleStream.collect(ArrayList::new, WDParser::add, WDParser::combine);
private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2)
{
if (!list2.isEmpty())
{
add(list1, list2.remove(0)); // merge lists in the middle if necessary
list1.addAll(list2); // add all the rest
}
return list1;
}
private static List<Tuple> add(List<Tuple> list, Tuple t)
{
int lastIndex = list.size() - 1;
if (list.isEmpty() || list.get(lastIndex).getDirection() != t.getDirection())
{
list.add(t);
}
else
{
list.set(lastIndex, list.get(lastIndex).merge(t));
}
return list;
}
Instead of using indexes to access the first/last element you could even use LinkedList
and the methods add/removeFirst/Last()
.
Upvotes: 5
Reputation: 121048
How about this. First define a small helper method:
private static Tuple mergeTwo(Tuple left, Tuple right) {
int[] leftArray = left.getData();
int[] rightArray = right.getData();
int[] result = new int[leftArray.length + rightArray.length];
System.arraycopy(leftArray, 0, result, 0, leftArray.length);
System.arraycopy(rightArray, 0, result, leftArray.length, rightArray.length);
return new Tuple(left.getDirection(), result);
}
This is close to your concat/merge
I guess, but a single one. Basically a way to merge two Tuple
(s) together.
And a helper method to produce the needed Collector
, you can put this into a utility so that it can be re-used:
private static Collector<Tuple, ?, List<Tuple>> mergedTuplesCollector() {
class Acc {
ArrayDeque<Tuple> deque = new ArrayDeque<>();
void add(Tuple elem) {
Tuple head = deque.peek();
if (head == null || head.getDirection() != elem.getDirection()) {
deque.offerFirst(elem);
} else {
deque.offerFirst(mergeTwo(deque.poll(), elem));
}
}
Acc merge(Acc right) {
Tuple lastLeft = deque.peekLast();
Tuple firstRight = right.deque.peekFirst();
if (lastLeft.getDirection() == firstRight.getDirection()) {
deque.offerLast(mergeTwo(deque.pollLast(), right.deque.pollFirst()));
} else {
deque.addAll(right.deque);
}
return this;
}
public List<Tuple> finisher() {
return new ArrayList<>(deque);
}
}
return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}
And usage would be, for example:
List<Tuple> merged = tuples.stream()
.parallel()
.collect(mergedTuplesCollector());
Upvotes: 3
Reputation: 11740
I've got two ideas on this topic. First one is getting the indices like in this answer and group it accordingly.
The second idea - if you already got a Stream
a custom Collector
should be used (similar to the other solutions, though using Deque
):
private Collector<Tuple, ?, List<Tuple>> squashTuples() {
return new Collector<Tuple, Deque<Tuple>, List<Tuple>>() {
@Override
public Supplier<Deque<Tuple>> supplier() {
return ArrayDeque::new;
}
@Override
public BiConsumer<Deque<Tuple>, Tuple> accumulator() {
return (acc, e) -> {
Objects.requireNonNull(e);
if (!acc.isEmpty() && acc.peekLast().getDirection() == e.getDirection()) {
acc.offerLast(acc.pollLast().merge(e));
} else {
acc.offerLast(e);
}
};
}
@Override
public BinaryOperator<Deque<Tuple>> combiner() {
return (left, right) -> {
if (!left.isEmpty() && !right.isEmpty() && left.peekLast().getDirection() == right.peekFirst().getDirection()) {
left.offerLast(left.pollLast().merge(right.pollFirst()));
}
left.addAll(right);
return left;
};
}
@Override
public Function<Deque<Tuple>, List<Tuple>> finisher() {
return ArrayList::new;
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.noneOf(Characteristics.class);
}
};
}
Upvotes: 0
Reputation: 45339
This is an alternative approach that uses slightly different data structures.
If this is an option, changing from int[]
to List<Integer>
allows for more flexibility (not to mention avoiding creating/copying arrays multiple times):
class Tuple {
Direction direction;
List<Integer> data;
}
And the following function does the merging on a Deque
collection:
private static List<Integer> next(Deque<Tuple> t, Direction d) {
if (!t.isEmpty() && t.peekLast().getDirection() == d) {
return t.peekLast().getData();
} else {
Tuple next = new Tuple();
next.direction = d;
next.data = new ArrayList<>();
t.addLast(next);
return next.data;
}
}
And with that, the stream can look as simple as:
Deque<Tuple> deq = new LinkedList<>(); //the final collection of tuples
tuples.stream()
.flatMap(tp -> tp.getData().stream()
.map(d -> Pair.of(tp.getDirection(), Integer.valueOf(d))))
.forEach(el -> next(deq, el.getLeft()).add(el.getRight()));
Upvotes: 1