Aleksandr Dubinsky
Aleksandr Dubinsky

Reputation: 23515

Collect successive pairs from a stream

Given an object or primitive stream such as { 0, 1, 2, 3, 4 }, how can I most elegantly transform it into given form (assuming, of course, I've defined class Pair)?

{ new Pair(0, 1), new Pair(1, 2), new Pair(2, 3), new Pair(3, 4) }

Upvotes: 123

Views: 96470

Answers (23)

M. Justin
M. Justin

Reputation: 21256

The Gatherers.windowSliding gatherer in the upcoming Java 24 release (available as a preview language feature since Java 22) supports this:

Stream.of(0, 1, 2, 3, 4)
        .gather(Gatherers.windowSliding(2))
        .map(list -> new Pair(list.get(0), list.get(1)))
        .toList();

This uses the new Stream.gather method with the new windowFixed gatherer to convert the initial Stream<Integer> ([0, 1, 2, 3, 4]) to a pairwise Stream<List<Integer>> ([[0, 1], [1, 2], [2, 3], [3, 4]]). Each of these lists is then transformed to a Pair using the existing Stream.map method.

Javadocs

Gatherer:

An intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when the end of the upstream is reached. […]

[…]

There are many examples of gathering operations, including but not limited to: grouping elements into batches (windowing functions); de-duplicating consecutively similar elements; incremental accumulation functions (prefix scan); incremental reordering functions, etc. The class Gatherers provides implementations of common gathering operations.

Stream.gather:

Returns a stream consisting of the results of applying the given gatherer to the elements of this stream.

Gatherers.windowSliding

Returns a Gatherer that gathers elements into windows -- encounter-ordered groups of elements -- of a given size, where each subsequent window includes all elements of the previous window except for the least recent, and adds the next element in the stream. […]

Example:

// will contain: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
List<List<Integer>> windows2 =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();

// will contain: [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
List<List<Integer>> windows6 =
    Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(6)).toList();

Upvotes: 5

Jan Porowski
Jan Porowski

Reputation: 21

You could use Flux:

Stream<String> someStream = Stream.of("A", "B", "C", "D");
Flux<String> someFlux = Flux.fromStream(someStream);

someFlux.zipWith(someFlux.skip(1))
    .map(t -> t.getT1().concat(t.getT2()))
    .subscribe(System.out::println);

The result would be:

AB
BC
CD

Upvotes: 2

Roger Keays
Roger Keays

Reputation: 3227

The solutions here seem a little complicated or depend on third-party libraries. This problem can be solved with an intermediate stream which collect pairs:

public static <T> Stream<List<T>> pairs(Stream<T> stream) {
    Iterator<T> iterator = stream.iterator();
    if (iterator.hasNext()) {
        T first = iterator.next();
        if (iterator.hasNext()) {
            return Stream.iterate(
                List.of(first, iterator.next()),
                prev -> iterator.hasNext() ? List.of(prev.get(1), iterator.next()) : null)
                .takeWhile(prev -> prev != null);
        }
    }
    return Stream.empty();
}

Examples:

pairs(Stream.of()).toList();      // []
pairs(Stream.of(1)).toList();     // []
pairs(Stream.of(1,2)).toList();   // [[1, 2]]
pairs(Stream.of(1,2,3)).toList(); // [[1, 2], [2, 3]]
pairs(Stream.of("a","b","c","d")).toList();  // [[a, b], [b, c], [c, d]]

In this solution, Stream.iterateis using an accumulator in much the same way as reduce, except it creates an intermediate stream rather than being a terminal operation. So laziness and infinite streams are supported.

Upvotes: 1

Evgeni Sergeev
Evgeni Sergeev

Reputation: 23611

Streams.zip(..) is available in Guava, for those who depend on it.

Example:

Streams.zip(list.stream(),
            list.stream().skip(1),
            (a, b) -> System.out.printf("%s %s\n", a, b));

Upvotes: 6

Beezer
Beezer

Reputation: 1108

I finally figured out a way of tricking the Stream.reduce to be able to neatly deal with pairs of values; there are a multitude of use cases that require this facility which does not appear naturally in JDK 8:

public static int ArithGeo(int[] arr) {
    //Geometric
    List<Integer> diffList = new ArrayList<>();
    List<Integer> divList = new ArrayList<>();
    Arrays.stream(arr).reduce((left, right) -> {
        diffList.add(right-left);
        divList.add(right/left);
        return right;
    });
    //Arithmetic
    if(diffList.stream().distinct().count() == 1) {
        return 1;
    }
    //Geometric
    if(divList.stream().distinct().count() == 1) {
        return 2;
    }
    return -1;
}

The trick i use is the return right; statement.

Upvotes: 1

mishadoff
mishadoff

Reputation: 10789

This is not elegant, it's a hackish solution, but works for infinite streams

Stream<Pair> pairStream = Stream.iterate(0, (i) -> i + 1).map( // natural numbers
    new Function<Integer, Pair>() {
        Integer previous;

        @Override
        public Pair apply(Integer integer) {
            Pair pair = null;
            if (previous != null) pair = new Pair(previous, integer);
            previous = integer;
            return pair;
        }
    }).skip(1); // drop first null

Now you can limit your stream to the length you want

pairStream.limit(1_000_000).forEach(i -> System.out.println(i));

P.S. I hope there is better solution, something like clojure (partition 2 1 stream)

Upvotes: 18

Tomek Rękawek
Tomek Rękawek

Reputation: 9304

I've implemented a spliterator wrapper which takes every n elements T from the original spliterator and produces List<T>:

public class ConsecutiveSpliterator<T> implements Spliterator<List<T>> {

    private final Spliterator<T> wrappedSpliterator;

    private final int n;

    private final Deque<T> deque;

    private final Consumer<T> dequeConsumer;

    public ConsecutiveSpliterator(Spliterator<T> wrappedSpliterator, int n) {
        this.wrappedSpliterator = wrappedSpliterator;
        this.n = n;
        this.deque = new ArrayDeque<>();
        this.dequeConsumer = deque::addLast;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        deque.pollFirst();
        fillDeque();
        if (deque.size() == n) {
            List<T> list = new ArrayList<>(deque);
            action.accept(list);
            return true;
        } else {
            return false;
        }
    }

    private void fillDeque() {
        while (deque.size() < n && wrappedSpliterator.tryAdvance(dequeConsumer))
            ;
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return wrappedSpliterator.estimateSize();
    }

    @Override
    public int characteristics() {
        return wrappedSpliterator.characteristics();
    }
}

Following method may be used to create a consecutive stream:

public <E> Stream<List<E>> consecutiveStream(Stream<E> stream, int n) {
    Spliterator<E> spliterator = stream.spliterator();
    Spliterator<List<E>> wrapper = new ConsecutiveSpliterator<>(spliterator, n);
    return StreamSupport.stream(wrapper, false);
}

Sample usage:

consecutiveStream(Stream.of(0, 1, 2, 3, 4, 5), 2)
    .map(list -> new Pair(list.get(0), list.get(1)))
    .forEach(System.out::println);

Upvotes: 17

Rob Philipp
Rob Philipp

Reputation: 261

For calculating successive differences in the time (x-values) of a time-series, I use the stream's collect(...) method:

final List< Long > intervals = timeSeries.data().stream()
                    .map( TimeSeries.Datum::x )
                    .collect( DifferenceCollector::new, DifferenceCollector::accept, DifferenceCollector::combine )
                    .intervals();

Where the DifferenceCollector is something like this:

public class DifferenceCollector implements LongConsumer
{
    private final List< Long > intervals = new ArrayList<>();
    private Long lastTime;

    @Override
    public void accept( final long time )
    {
        if( Objects.isNull( lastTime ) )
        {
            lastTime = time;
        }
        else
        {
            intervals.add( time - lastTime );
            lastTime = time;
        }
    }

    public void combine( final DifferenceCollector other )
    {
        intervals.addAll( other.intervals );
        lastTime = other.lastTime;
    }

    public List< Long > intervals()
    {
        return intervals;
    }
}

You could probably modify this to suit your needs.

Upvotes: 1

SamTebbs33
SamTebbs33

Reputation: 5647

You can do this with the Stream.reduce() method (I haven't seen any other answers using this technique).

public static <T> List<Pair<T, T>> consecutive(List<T> list) {
    List<Pair<T, T>> pairs = new LinkedList<>();
    list.stream().reduce((a, b) -> {
        pairs.add(new Pair<>(a, b));
        return b;
    });
    return pairs;
}

Upvotes: 24

walkeros
walkeros

Reputation: 4942

You can achieve that by using a bounded queue to store elements which flows through the stream (which is basing on the idea which I described in detail here: Is it possible to get next element in the Stream?)

Belows example first defines instance of BoundedQueue class which will store elements going through the stream (if you don't like idea of extending the LinkedList, refer to link mentioned above for alternative and more generic approach). Later you just combine two subsequent elements into instance of Pair:

public class TwoSubsequentElems {
  public static void main(String[] args) {
    List<Integer> input = new ArrayList<Integer>(asList(0, 1, 2, 3, 4));

    class BoundedQueue<T> extends LinkedList<T> {
      public BoundedQueue<T> save(T curElem) {
        if (size() == 2) { // we need to know only two subsequent elements
          pollLast(); // remove last to keep only requested number of elements
        }

        offerFirst(curElem);

        return this;
      }

      public T getPrevious() {
        return (size() < 2) ? null : getLast();
      }

      public T getCurrent() {
        return (size() == 0) ? null : getFirst();
      }
    }

    BoundedQueue<Integer> streamHistory = new BoundedQueue<Integer>();

    final List<Pair<Integer>> answer = input.stream()
      .map(i -> streamHistory.save(i))
      .filter(e -> e.getPrevious() != null)
      .map(e -> new Pair<Integer>(e.getPrevious(), e.getCurrent()))
      .collect(Collectors.toList());

    answer.forEach(System.out::println);
  }
}

Upvotes: -1

Rob Marrowstone
Rob Marrowstone

Reputation: 1264

As others have observed, there is, due to the nature of the problem, some statefulness required.

I was faced with a similar problem, in which I wanted what was essentially the Oracle SQL function LEAD. My attempt to implement that is below.

/**
 * Stream that pairs each element in the stream with the next subsequent element.
 * The final pair will have only the first item, the second will be null.
 */
<T> Spliterator<Pair<T>> lead(final Stream<T> stream)
{
    final Iterator<T> input = stream.sequential().iterator();

    final Iterable<Pair<T>> iterable = () ->
    {
        return new Iterator<Pair<T>>()
        {
            Optional<T> current = getOptionalNext(input);

            @Override
            public boolean hasNext()
            {
                return current.isPresent();
            }

            @Override
            public Pair<T> next()
            {
                Optional<T> next = getOptionalNext(input);
                final Pair<T> pair = next.isPresent()
                    ? new Pair(current.get(), next.get())
                    : new Pair(current.get(), null);
                current = next;

                return pair;
            }
        };
    };

    return iterable.spliterator();
}

private <T> Optional<T> getOptionalNext(final Iterator<T> iterator)
{
    return iterator.hasNext()
        ? Optional.of(iterator.next())
        : Optional.empty();
}

Upvotes: -1

John McClean
John McClean

Reputation: 5313

You can do this in cyclops-react (I contribute to this library), using the sliding operator.

  LazyFutureStream.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

Or

   ReactiveSeq.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

Assuming the Pair constructor can accept a Collection with 2 elements.

If you wanted to group by 4, and increment by 2 that is also supported.

     ReactiveSeq.rangeLong( 0L,Long.MAX_VALUE)
                .sliding(4,2)
                .forEach(System.out::println);

Equivalant static methods for creating a sliding view over java.util.stream.Stream are also provided in cyclops-streams StreamUtils class.

       StreamUtils.sliding(Stream.of(1,2,3,4),2)
                  .map(Pair::new);

Note :- for single-threaded operation ReactiveSeq would be more appropriate. LazyFutureStream extends ReactiveSeq but is primarily geared for concurrent / parallel use (it is a Stream of Futures).

LazyFutureStream extends ReactiveSeq which extends Seq from the awesome jOOλ (which extends java.util.stream.Stream), so the solutions Lukas' presents would also work with either Stream type. For anyone interested the primary differences between the window / sliding operators are the obvious relative power / complexity trade off and suitability for use with infinite streams (sliding doesn't consume the stream, but buffers as it flows).

Upvotes: 8

Kedar Mhaswade
Kedar Mhaswade

Reputation: 4695

This is an interesting problem. Is my hybrid attempt below any good?

public static void main(String[] args) {
    List<Integer> list = Arrays.asList(1, 2, 3);
    Iterator<Integer> first = list.iterator();
    first.next();
    if (first.hasNext())
        list.stream()
        .skip(1)
        .map(v -> new Pair(first.next(), v))
        .forEach(System.out::println);
}

I believe it does not lend itself to parallel processing, and hence may be disqualified.

Upvotes: -1

frhack
frhack

Reputation: 5112

We can use RxJava (very powerful reactive extension library)

IntStream intStream  = IntStream.iterate(1, n -> n + 1);

Observable<List<Integer>> pairObservable = Observable.from(intStream::iterator).buffer(2,1);

pairObservable.take(10).forEach(b -> {
            b.forEach(n -> System.out.println(n));
            System.out.println();
        });

The buffer operator transforms an Observable that emits items into an Observable that emits buffered collections of those items..

Upvotes: 2

Lukas Eder
Lukas Eder

Reputation: 221105

Finding successive pairs

If you're willing to use a third party library and don't need parallelism, then jOOλ offers SQL-style window functions as follows

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window()
   .filter(w -> w.lead().isPresent())
   .map(w -> tuple(w.value(), w.lead().get())) // alternatively, use your new Pair() class
   .toList()
);

Yielding

[(0, 1), (1, 2), (2, 3), (3, 4)]

The lead() function accesses the next value in traversal order from the window.

Finding successive triples / quadruples / n-tuples

A question in the comments was asking for a more general solution, where not pairs but n-tuples (or possibly lists) should be collected. Here's thus an alternative approach:

int n = 3;

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

Yielding a list of lists

[[0, 1, 2], [1, 2, 3], [2, 3, 4]]

Without the filter(w -> w.count() == n), the result would be

[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]

Disclaimer: I work for the company behind jOOλ

Upvotes: 5

Tagir Valeev
Tagir Valeev

Reputation: 100279

My StreamEx library which extends standard streams provides a pairMap method for all stream types. For primitive streams it does not change the stream type, but can be used to make some calculations. Most common usage is to calculate differences:

int[] pairwiseDiffs = IntStreamEx.of(input).pairMap((a, b) -> (b-a)).toArray();

For object stream you can create any other object type. My library does not provide any new user-visible data structures like Pair (that's the part of library concept). However if you have your own Pair class and want to use it, you can do the following:

Stream<Pair> pairs = IntStreamEx.of(input).boxed().pairMap(Pair::new);

Or if you already have some Stream:

Stream<Pair> pairs = StreamEx.of(stream).pairMap(Pair::new);

This functionality is implemented using custom spliterator. It has quite low overhead and can parallelize nicely. Of course it works with any stream source, not just random access list/array like many other solutions. In many tests it performs really well. Here's a JMH benchmark where we find all input values preceding a larger value using different approaches (see this question).

Upvotes: 39

assylias
assylias

Reputation: 328735

The operation is essentially stateful so not really what streams are meant to solve - see the "Stateless Behaviors" section in the javadoc:

The best approach is to avoid stateful behavioral parameters to stream operations entirely

One solution here is to introduce state in your stream through an external counter, although it will only work with a sequential stream.

public static void main(String[] args) {
    Stream<String> strings = Stream.of("a", "b", "c", "c");
    AtomicReference<String> previous = new AtomicReference<>();
    List<Pair> collect = strings.map(n -> {
                            String p = previous.getAndSet(n);
                            return p == null ? null : new Pair(p, n);
                        })
                        .filter(p -> p != null)
                        .collect(toList());
    System.out.println(collect);
}


static class Pair<T> {
    private T left, right;
    Pair(T left, T right) { this.left = left; this.right = right; }
    @Override public String toString() { return "{" + left + "," + right + '}'; }
}

Upvotes: 3

DArt
DArt

Reputation: 45

I agree with @aepurniet but instead map you have to use mapToObj

range(0, 100).mapToObj((i) -> new Pair(i, i+1)).forEach(System.out::println);

Upvotes: -3

Alexis C.
Alexis C.

Reputation: 93862

The proton-pack library provides the windowed functionnality. Given a Pair class and a Stream, you can do it like this:

Stream<Integer> st = Stream.iterate(0 , x -> x + 1);
Stream<Pair<Integer, Integer>> pairs = StreamUtils.windowed(st, 2, 1)
                                                  .map(l -> new Pair<>(l.get(0), l.get(1)))
                                                  .moreStreamOps(...);

Now the pairs stream contains:

(0, 1)
(1, 2)
(2, 3)
(3, 4)
(4, ...) and so on

Upvotes: 5

Stuart Marks
Stuart Marks

Reputation: 132460

The Java 8 streams library is primarily geared toward splitting streams into smaller chunks for parallel processing, so stateful pipeline stages are quite limited, and doing things like getting the index of the current stream element and accessing adjacent stream elements are not supported.

A typical way to solve these problems, with some limitations, of course, is to drive the stream by indexes and rely on having the values being processed in some random-access data structure like an ArrayList from which the elements can be retrieved. If the values were in arrayList, one could generate the pairs as requested by doing something like this:

    IntStream.range(1, arrayList.size())
             .mapToObj(i -> new Pair(arrayList.get(i-1), arrayList.get(i)))
             .forEach(System.out::println);

Of course the limitation is that the input cannot be an infinite stream. This pipeline can be run in parallel, though.

Upvotes: 85

gadget
gadget

Reputation: 1978

An elegant solution would be to use zip. Something like:

List<Integer> input = Arrays.asList(0, 1, 2, 3, 4);
Stream<Pair> pairStream = Streams.zip(input.stream(),
                                      input.stream().substream(1),
                                      (a, b) -> new Pair(a, b)
);

This is pretty concise and elegant, however it uses a list as an input. An infinite stream source cannot be processed this way.

Another (lot more troublesome) issue is that zip together with the entire Streams class has been lately removed from the API. The above code only works with b95 or older releases. So with the latest JDK I would say there is no elegant FP style solution and right now we can just hope that in some way zip will be reintroduced to the API.

Upvotes: 0

jpvee
jpvee

Reputation: 973

In your case, I would write my custom IntFunction which keeps track of the last int passed and use that to map the original IntStream.

import java.util.function.IntFunction;
import java.util.stream.IntStream;

public class PairFunction implements IntFunction<PairFunction.Pair> {

  public static class Pair {

    private final int first;
    private final int second;

    public Pair(int first, int second) {
      this.first = first;
      this.second = second;
    }

    @Override
    public String toString() {
      return "[" + first + "|" + second + "]";
    }
  }

  private int last;
  private boolean first = true;

  @Override
  public Pair apply(int value) {
    Pair pair = !first ? new Pair(last, value) : null;
    last = value;
    first = false;
    return pair;
  }

  public static void main(String[] args) {

    IntStream intStream = IntStream.of(0, 1, 2, 3, 4);
    final PairFunction pairFunction = new PairFunction();
    intStream.mapToObj(pairFunction)
        .filter(p -> p != null) // filter out the null
        .forEach(System.out::println); // display each Pair

  }

}

Upvotes: 0

Savv
Savv

Reputation: 431

Run a for loop that runs from 0 to length-1 of your stream

for(int i = 0 ; i < stream.length-1 ; i++)
{
    Pair pair = new Pair(stream[i], stream[i+1]);
    // then add your pair to an array
}

Upvotes: -7

Related Questions