raisercostin
raisercostin

Reputation: 9199

Java8 stream with memoization

How can I reuse in java8 (maybe a memoization process) values already computed through iteration over a stream?

If the stream is duplicated or supplied again it will be recomputed. In some cases it would be preferable to trade memory for that cpu time. Collecting everything from the beginning might not be a good idea since the stream is used to find the first item that satisfies a predicate.

Stream<Integer> all = Stream.of(1,2,3,4,5, ...<many other values>... ).
      map(x->veryLongTimeToComputeFunction(x));
System.out.println("fast find of 2"+all.filter(x->x>1).findFirst());

//both of these two lines generate a "java.lang.IllegalStateException: stream has already been operated upon or closed"
System.out.println("no find"+all.filter(x->x>10).findFirst());
System.out.println("find again"+all.filter(x->x>4).findFirst());

The question is simillar to Copy a stream to avoid "stream has already been operated upon or closed" (java 8)

Upvotes: 5

Views: 1263

Answers (5)

Holger
Holger

Reputation: 298389

The canonical in-memory Stream source is a Collection. A simple, not parallel capable Stream Memoization could be implemented as follows:

public static void main(String[] args) {
    Supplier<Stream<Integer>> s=memoize(
        IntStream.range(0, 10_000)
                 .map(x -> veryLongTimeToComputeFunction(x))
    );
    System.out.println("First item > 1  "+s.get().filter(x -> x>1 ).findFirst());
    System.out.println("First item > 10 "+s.get().filter(x -> x>10).findFirst());
    System.out.println("First item > 4  "+s.get().filter(x -> x>4 ).findFirst());
}
static int veryLongTimeToComputeFunction(int arg) {
    System.out.println("veryLongTimeToComputeFunction("+arg+")");
    return arg;
}

public static <T> Supplier<Stream<T>> memoize(BaseStream<T,?> stream) {
    Spliterator<T> sp=stream.spliterator();
    class S extends Spliterators.AbstractSpliterator<T> {
        ArrayList<T> mem=new ArrayList<>();
        S() { super(sp.estimateSize(), sp.characteristics()); }
        public boolean tryAdvance(Consumer<? super T> action) {
            return sp.tryAdvance(item -> {
                mem.add(item);
                action.accept(item);
            });
        }
    }
    S s=new S();
    return () -> Stream.concat(s.mem.stream(), StreamSupport.stream(s, false));
}

Take care to finish Stream processing before requesting the next Stream from the supplier.

Upvotes: 2

Sujit Kamthe
Sujit Kamthe

Reputation: 961

Java 8 streams are lazy in nature. The operations performed on streams are evaluated in vertical order. What you want to achieve can be achieved using following code:

Stream.of(1,2,3,4,5, ...<many other values>... )
    .map(x-> veryLongTimeToComputeFunction(x))
    .filter(x-> x > 1)
    .findFirst();

This will make sure that the veryLongTimeToComputeFunction() is called only till the matching first element is not found. After that the operation would terminate. In worst case it veryLongTimeToComputeFunction would be called for all numbers if the last number is the one which matches the criteria.

Also you can use parallel streams in conjunction with findAny() method. It will speed up the performance.

Upvotes: 0

Journeycorner
Journeycorner

Reputation: 2542

Streams are not meant to be saved, they are about to process data.

Example: you are watching a dvd, in java terms the dvd would be something like a collection, the data transfered from your dvd-player to your tv is a stream. You can not save the stream, however you could burn the cd, in java terms collect it.

There are other options:

  • extract/refactor your stream operations or predicates to a method that gets a stream as parameter and returns a stream
  • use a caching framework: e.g. in Spring methods can be annotated with @Cacheable. First call executes the method, subsequent calls fetches the results from the cache for defined time
  • if you are looking for a non-blocking execution for you long running task, have a look at RxJava

Upvotes: 0

a.yekimov
a.yekimov

Reputation: 326

Why not to use memoization inside veryLongTimeToComputeFunction? You can put memo cache as a parameter to the func.

Upvotes: 1

Joe C
Joe C

Reputation: 15704

I would suggest collecting your Stream to a list and then running your filters on the list's stream.

Upvotes: 0

Related Questions