Harald
Harald

Reputation: 5093

What is the (kind of) inverse operation to Java's Stream.flatMap()?

The Stream.flatMap() operation transforms a stream of

a, b, c

into a stream that contains zero or more elements for each input element, e.g.

a1, a2, c1, c2, c3

Is there the opposite operations that batches up a few elements into one new one?

Does it exist? can I simulate it in any way?

Upvotes: 22

Views: 8988

Answers (7)

TheJeff
TheJeff

Reputation: 4101

Helped mostly by the StreamEx answer above by user_3380739, you can use groupRuns docs here

StreamEx.of("a1", "a2", "c1", "c2", "c3").groupRuns( t, u -> t.charAt(0) == u.charAt(0) )
.forEach(System.out::println);

// a=[a1, a2]
// c=[c1, c2, c3]

Upvotes: 1

Joshua Goldberg
Joshua Goldberg

Reputation: 5333

A bit like StreamEx, you could implement the Spliterator manually. For example,

collectByTwos(Stream.of(1, 2, 3, 4), (x, y) -> String.format("%d%d", x, y))

... returns a stream of "12", "34" using the code below:

public static <X,Y> Stream<Y> collectByTwos(Stream<X> inStream, BiFunction<X,X,Y> mapping) {
    Spliterator<X> origSpliterator = inStream.spliterator();
    Iterator<X> origIterator = Spliterators.iterator(origSpliterator);

    boolean isParallel = inStream.isParallel();
    long newSizeEst = (origSpliterator.estimateSize() + 1) / 2;

    Spliterators.AbstractSpliterator<Y> lCombinedSpliterator =
            new Spliterators.AbstractSpliterator<>(newSizeEst, origSpliterator.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super Y> action) {
            if (! origIterator.hasNext()) {
                return false;
            }
            X lNext1 = origIterator.next();
            if (! origIterator.hasNext()) {
                throw new IllegalArgumentException("Trailing elements of the stream would be ignored.");
            }
            X lNext2 = origIterator.next();
            action.accept(mapping.apply(lNext1, lNext2));
            return true;
        }
    };
    return StreamSupport.stream(lCombinedSpliterator, isParallel)
            .onClose(inStream::close);
}

(I think this may likely be incorrect for parallel streams.)

Upvotes: 0

Harald
Harald

Reputation: 5093

Finally I figured out that flatMap is its own "inverse" so to say. I oversaw that flatMap not necessarily increases the number of elements. It may also decrease the number of elements by emitting an empty stream for some of the elements. To implement a group-by operation, the function called by flatMap needs minimal internal state, namely the most recent element. It either returns an empty stream or, at the end of a group, it returns the reduced-to group representative.

Here is a quick implementation where groupBorder must return true if the two elements passed in do not belong to the same group, i.e. between them is the group border. The combiner is the group function that combines, for example (1,a), (1,a), (1,a) into (3,a), given that your group elements are, tuples (int, string).

public class GroupBy<X> implements Function<X, Stream<X>>{

  private final BiPredicate<X, X> groupBorder;
  private final BinaryOperator<X> combiner;
  private X latest = null;

  public GroupBy(BiPredicate <X, X> groupBorder,
                 BinaryOperator<X> combiner) {
    this.groupBorder = groupBorder;
    this.combiner = combiner;
  }

  @Override
  public Stream<X> apply(X elem) {
    // TODO: add test on end marker as additonal parameter for constructor
    if (elem==null) {
      return latest==null ? Stream.empty() : Stream.of(latest);
    }
    if (latest==null) {
      latest = elem;
      return Stream.empty();
    }
    if (groupBorder.test(latest, elem)) {
      Stream<X> result = Stream.of(latest);
      latest = elem;
      return result;
    }
    latest = combiner.apply(latest,  elem);
    return Stream.empty();
  }
}

There is one caveat though: to ship the last group of the whole stream, an end marker must be stuck as the last element into the stream. The above code assumes it is null, but an additional end-marker-tester could be added.

I could not come up with a solution that does not rely on the end marker.

Further I did not also convert between incoming and outgoing elements. For a unique-operation, this would just work. For a count-operation, a previous step would have to map individual elements to a counting object.

Upvotes: 8

user_3380739
user_3380739

Reputation: 1244

Take a look at collapse in StreamEx

StreamEx.of("a1", "a2", "c1", "c2", "c3").collapse((a, b) -> a.charAt(0) == b.charAt(0))
    .map(e -> e.substring(0, 1)).forEach(System.out::println);

Or my fork with more function: groupBy, split, sliding...

StreamEx.of("a1", "a2", "c1", "c2", "c3").collapse((a, b) -> a.charAt(0) == b.charAt(0))
.map(e -> e.substring(0, 1)).forEach(System.out::println);
// a
// c

StreamEx.of("a1", "a2", "c1", "c2", "c3").splitToList(2).forEach(System.out::println);
// [a1, a2]
// [c1, c2]
// [c3]

StreamEx.of("a1", "a2", "c1", "c2", "c3").groupBy(e -> e.charAt(0))
.forEach(System.out::println);
// a=[a1, a2]
// c=[c1, c2, c3]

Upvotes: 2

Tomasz Linkowski
Tomasz Linkowski

Reputation: 4496

This is what I came up with:

interface OptionalBinaryOperator<T> extends BiFunction<T, T, Optional<T>> {
  static <T> OptionalBinaryOperator<T> of(BinaryOperator<T> binaryOperator,
          BiPredicate<T, T> biPredicate) {
    return (t1, t2) -> biPredicate.test(t1, t2)
            ? Optional.of(binaryOperator.apply(t1, t2))
            : Optional.empty();
  }
}

class StreamUtils {
  public static <T> Stream<T> reducePartially(Stream<T> stream,
          OptionalBinaryOperator<T> conditionalAccumulator) {
    Stream.Builder<T> builder = Stream.builder();
    stream.reduce((t1, t2) -> conditionalAccumulator.apply(t1, t2).orElseGet(() -> {
      builder.add(t1);
      return t2;
    })).ifPresent(builder::add);
    return builder.build();
  }
}

Unfortunately, I did not have the time to make it lazy, but it can be done by writing a custom Spliterator delegating to stream.spliterator() that would follow the logic above (instead of utilizing stream.reduce(), which is a terminal operation).


PS. I just realized you wanted <T,U> conversion, and I wrote about <T,T> conversion. If you can first map from T to U, and then use the function above, then that's it (even if it's suboptimal).

If it's something more complex, the kind of condition for reducing/merging would need to be defined before proposing an API (e.g. Predicate<T>, BiPredicate<T,T>, BiPredicate<U,T>, or maybe even Predicate<List<T>>).

Upvotes: 0

Joop Eggen
Joop Eggen

Reputation: 109547

    IntStream.range(0, 10)
            .mapToObj(n -> IntStream.of(n, n / 2, n / 3))
            .reduce(IntStream.empty(), IntStream::concat)
            .forEach(System.out::println);

As you see elements are mapped to Streams too, and then concatenated into one large stream.

Upvotes: 1

Lino
Lino

Reputation: 19926

You can hack your way around. See the following example:

Stream<List<String>> stream = Stream.of("Cat", "Dog", "Whale", "Mouse")
   .collect(Collectors.collectingAndThen(
       Collectors.partitioningBy(a -> a.length() > 3),
       map -> Stream.of(map.get(true), map.get(false))
    ));

Upvotes: 1

Related Questions