Johnny Willer
Johnny Willer

Reputation: 3917

How stream's pipeline works in java like IntPipeline

I'm learning about java 8 streams and some questions became to me.

Suppose this code:

 new Random().ints().forEach(System.out::println);

internally at some point, it calls IntPipeline, that I think it's responsible to generate those indefinitely ints. Streams implementation is hard to understand by looking the java source.

Can you give a brief explanation or give some good/easy-understandable material about how streams are generated and how operation over the pipeline are connected. Example in code above the integers are generate randomly, how this connection is made?

Upvotes: 2

Views: 2414

Answers (1)

Tagir Valeev
Tagir Valeev

Reputation: 100239

The Stream implementation is separated to Spliterator (which is input-specific code) and pipeline (which is input-independent code). The Spliterator is similar to Iterator. The main differences are the following:

  • It can split itself to the two parts (the trySplit method). For ordered spliterator the parts are prefix and suffix (for example, for array it could be the first half and the last half). For unordered sources (like random numbers) both parts just can generated some of the elements. The resulting parts are able to split further (unless they become too small). This feature is crucial for parallel stream processing.

  • It can report its size either exact or estimated. The exact size may be used to preallocate memory for some stream operations like toArray() or just to return it to caller (like count() in Java-9). The estimated size is used for parallel stream processing to decide when to stop splitting.

  • It can report some characteristics like ORDERED, SORTED, DISTINCT, etc.

  • It implements internal iteration: instead of two methods hasNext and next you have single method tryAdvance which executes the provided Consumer once unless there are no more elements left.

There are also primitive specializations of Spliterator interface (Spliterator.OfInt, etc.) which can help you process primitive values like int, long or double efficiently.

Thus to create your own Stream datasource you have to implement Spliterator, then call StreamSupport.stream(mySpliterator, isParallel) to create the Stream and StreamSupport.int/long/doubleStream for primitive specializations. So actually Random.ints calls StreamSupport.intStream providing its own spliterator. You don't have to implement all the Stream operations by yourself. In general Stream interface is implemented only once per stream type in JDK for different sources. There's basic abstract class AbstractPipeline and four implementations (ReferencePipeline for Stream, IntPipeline for IntStream, LongPipeline for LongStream and DoublePipeline for DoubleStream). But you have much more sources (Collection.stream(), Arrays.stream(), IntStream.range, String.chars(), BufferedReader.lines(), Files.lines(), Random.ints(), and so on, even more to appear in Java-9). All of these sources are implemented using custom spliterators. Implementing the Spliterator is much simpler than implementing the whole stream pipeline (especially taking into account the parallel processing), so such separation makes sense.

If you want to create your own stream source, you may start extending AbstractSpliterator. In this case you only have to implement tryAdvance and call superclass constructor providing the estimated size and some characteristics. The AbstractSpliterator provides default splitting behavior by reading a part of your source into array (calling your implemented tryAdvance method) and creating array-based spliterator for this prefix. Of course such strategy is not very performant and often affords only limited parallelism, but as a starting point it's ok. Later you can implement trySplit by yourself providing better splitting strategy.

Upvotes: 6

Related Questions