Toby Eggitt
Toby Eggitt

Reputation: 1872

Produce a Stream from a Stream and an element, Java 8

I'm working on getting my head around some of the Java 8 Stream features. I'm tolerably familiar with FP, having written some Lisp thirty years ago, and I think I might be trying to do something this new facility isn't really targeted at. Anyway, if the question is stupid, I'm happy to learn the error of my ways.

I'll give a specific problem, though it's really a general concept I'm trying to resolve.

Suppose I want to get a Stream from every third element of a Stream. In regular FP I would (approximately) create a recursive function that operates by concatenating the first element of the list with the (call-thyself) of the remainder of the list after dropping two elements. Easy enough. But to do this in a stream, I feel like I want one of two tools:

1) a means of having an operation extract multiple items from the stream for processing (then I'd just grab three, use the first, and dump the rest)

2) a means of making a Supplier that takes an item and a Stream and creates a Stream. Then it feels like I could create a downstream stream out of the first item and the shortened stream, though it's still unclear to me if this would do the necessary recursive magic to actually work.

BEGIN EDIT

So, there's some interesting and useful feedback; thanks everyone. In particular, the comments have helped me clarify what my head is trying to get around a bit better.

First, one can--conceptually, at least--having / needing knowledge of order in a sequence should not prevent one from permitting fully parallelizable operations. An example sprang to mind, and that's the convolution operations that graphics folks are inclined to do. Imagine blurring an image. Each pixel is modified by virtue of pixels near to it, but those pixels are only read, not modified, in themselves.

It's my understanding (very shaky at this stage, for sure!) that the streams mechanism is the primary entry point to the wonderful world of VM managed parallelism, and that iterators are still what they always were (yes? no?) If that's correct, then using the iterator to solve the problem domain that I'm waffling around doesn't seem great.

So, at this point, at least, the suggestion to create a chunking spliterator seems the most promising, but boy, does the code that supports that example seem like hard work! I think I'd rather do it with a ForkJoin mechanism, despite it being "old hat" now :)

Anyway, still interested in any more insights folks wish to offer.

END EDIT

Any thoughts? Am I trying to use these Streams to do something they're not intended for, or am I missing something obvious?

Cheers, Toby.

Upvotes: 3

Views: 715

Answers (4)

Tagir Valeev
Tagir Valeev

Reputation: 100169

My StreamEx library enhances standard Stream API. In particular it adds the headTail method which allows recursive definition of custom operations. It takes a function which receives stream head (first element) and tail (stream of the rest elements) and should return the resulting stream which will be used instead of the original one. For example, you can define every3 operation as follows:

public static <T> StreamEx<T> every3(StreamEx<T> input) {
    return input.headTail(
        (first, tail1) -> tail1.<T>headTail(
            (second, tail2) -> tail2.headTail(
                (third, tail3) -> every3(tail3))).prepend(first));
}

Here prepend is also used which just prepends the given element to the stream (this operation is just a best friend of headTail.

In general using headTail you can define almost any intermediate operation you want, including existing ones and new ones. You may find some samples here.

Note that I implemented some mechanism which optimizes tails in such recursive operation definition, so properly defined operation will not eat the whole stack when processing the long stream.

Upvotes: 1

David Soroko
David Soroko

Reputation: 9086

You can use (or peek at the source) of BatchingSpliterator Then given aStream you can create a stream that consists of lists with size=3 (except maybe for the last one) and use the first element of that list

Stream<T> aStream = ...;
Stream<List<T>> batchedStream =  
    new BatchingSpliterator.Builder().wrap(aStream).batchSize(3).stream();

batchedStream.map(l -> l.get(0) ). ... 

You can also "go parallel":

batchedStream.parallel().map(l -> l.get(0) ). ....    

Upvotes: 0

Marko Topolnik
Marko Topolnik

Reputation: 200148

Java's streams are nothing like FP's (lazy) sequences. If you are familiar with Clojure, the difference is exactly like the difference between lazy seq and reducer. Whereas the lazy seq packages all processing with each element individually, and thus allows getting individually processed elements, a reducer collapses the complete sequence in a single atomic operation.

Specifically for the example you have described, consider relying on a stream partitioning transformation, as described in detail here. You would then easily do

partition(originalStream, 3).map(xs -> xs.get(0));

resulting in a stream having every third element of the original.

This would maintain efficiency, laziness, and parallelizability.

Upvotes: 0

sprinter
sprinter

Reputation: 27946

One of the things to keep in mind is that Stream was primarily designed to be a way of taking advantage of parallel processing. An implication of this is that they have a number of conditions associated with them that are aimed at giving the VM a lot of freedom to process the elements in any convenient order. An example of this is insisting that reduction functions are associative. Another is that local variables manipulated are final. These types of conditions mean the stream items can be evaluated and collected in any order.

A natural consequence of this is that the best use cases for Stream involve no dependencies between the values of the stream. Things such as mapping a stream of integers to their cumulative values are trivial in languages like LISP but a pretty unnatural fit for Java streams (see this question).

There are clever ways of getting around some of these restrictions by using sequential to force the Stream to not be parallel but my experience has been that these are more trouble than they are worth. If your problem involves an essentially sequential series of items in which state is required to process the values then I recommend using traditional collections and iteration. The code will end up being clearer and will perform no worse given the stream cannot be parallelised anyway.

Having said all that, if you really want to do this then the most straightforward way is to have a collector that stores every third item then sends them out as a stream again:

class EveryThird {

    private final List<Integer> list = new ArrayList<>();
    private int count = 0;

    public void accept(Integer i) {
        if (count++ % 3 == 0)
            list.add(i);
    }

    public EveryThird combine(EveryThird other) {
        list.addAll(other.list);
        count += other.count;
        return this;
    }

    public Stream<Integer> stream() {
        return list.stream();
    }
}

This can then be used like:

IntStream.range(0, 10000)
    .collect(EveryThird::new, EveryThird::accept, EveryThird::combine)
    .stream()

But that's not really what collectors are designed for and this is pretty inefficient as it's unnecessarily collecting the stream. As stated above my recommendation is to use traditional iteration for this sort of situation.

Upvotes: 1

Related Questions