Miguel Gamboa
Miguel Gamboa

Reputation: 9383

How to cache and replay the items of a Supplier<Stream<T>>

Regarding a Supplier<Stream<T>> dataSrc I would like to cache the Stream items for further traversals of the same sequence of elements. In this case, assume that dataSrc always produces the same sequence (e.g. getting a Stream<Integer> with temperatures in Celsius of March (see example usage bellow)). Thus, option 1) is to first collect the Stream items, however it will waste one first traversal to add those items into a collection:

Supplier<Stream<T>> dataSrc = ...
List<T> cache = dataSrc.collect(toList()); // **Additional** traversal to collect items
cache.stream().reduce(…) // 1st traversal
cache.stream().reduce(…) // 2nd traversal
... // Nth  traversals

I would like to avoid the additional traversal to collect items and the explicit cache variable and hide it inside the Supplier<> in such a way that on first traversal the items are implicitly cached and on further traversals the items are accessed from that cache. I think this is similar to the idea of the method cache() of Reactor Project for reactive streams.

Thus, I am outlining an alternative in the following cache() method implementation, although it already has two problems (at least): 1) the onClose is not called on traversal finish (and I cannot figure out any way of detecting the end of a traversal); 2) If the first traversal never ends then the cache will never be filled.

Supplier<Stream<T>> dataSrc = cache(...)
dataSrc.get().reduce(…) // 1st traversal
dataSrc.get().reduce(…) // 2nd traversal
... // Nth  traversals

static <T> Supplier<Stream<T>> cache(Supplier<Stream<T>> dataSrc) {
    final List<T> cache = new ArrayList<>();
    final AtomicBoolean started = new AtomicBoolean();
    final AtomicBoolean isCached = new AtomicBoolean();
    return () -> {
        if(isCached.get()) return cache.stream();
        if(!started.getAndSet(true)) {
            return dataSrc
                .get()
                .peek(cache::add)
                .onClose(() -> isCached.set(true));
        }
        return dataSrc.get();
    };
}

Question:

Is there any better approach to achieve an utility cache() function which returns a new Stream<T> that caches items on first Stream traversal (without an implicit additional traversal to collect first) and further Stream objects are created from that cache?

Usage Example:

Here I am getting a stream with the temperatures in March from the World Weather online API. To execute it you must include a dependency of AsyncHttpClient and a valid API key in given URI.

Pattern pat = Pattern.compile("\\n");
boolean [] isEven = {true};
CompletableFuture<Stream<Integer>> temps = asyncHttpClient()
    .prepareGet("http://api.worldweatheronline.com/premium/v1/past-weather.ashx?q=37.017,-7.933&date=2018-03-01&enddate=2018-03-31&tp=24&format=csv&key=715b185b36034a4c879141841182802")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(pat::splitAsStream)
    .thenApply(str -> str
            .filter(w -> !w.startsWith("#")) // Filter comments
            .skip(1)                         // Skip line: Not Available
            .filter(l -> isEven[0] = !isEven[0]) // Filter Even line
            .map(line -> line.substring(14, 16)) // Extract temperature in celcius
            .map(Integer::parseInt)
    );

Note that a CompletableFuture<Stream<Integer>> is functionally compliant with a Supplier<Stream<Integer>>. Although, the CompletableFuture caches the resulting stream it cannot be iterated twice.

Problem 1: The following code throws IllegalStateException: stream has already been operated upon or closed

out.println(temps.join().distinct().count());
out.println(temps.join().max(Integer::compare)); // throws IllegalStateException

Problem 2: Collecting it in a List will induce a first traversal and thus we will have 3 traversals, instead of 2:

CompletableFuture<List<Integer>> list = temps.thenApply(str -> str.collect(toList()));
out.println(list.join().stream().distinct().count()); // 2 traversals
out.println(list.join().stream().distinct().max(Integer::compare));// 1 traversal

Goal: Store items in cache on first traversal. Every time the stream retrieves an item it should store it in an internal cache that will be used on further traversals.

Supplier<Stream<Integer>> cache = Cache.of(temps::join);
out.println(temps.get().distinct().count()); // 1 traversal
out.println(temps.get().max(Integer::compare)); // 1 traversal form cache

Upvotes: 4

Views: 1888

Answers (3)

Miguel Gamboa
Miguel Gamboa

Reputation: 9383

I think the only way to detect the end of the Stream traversal is through its iterator() or spliterator(). Thus, maybe a better option to get a replayable Stream is to record its items from its iterator (done by Recorder class of the example bellow) and then implement a new Spliterator that reads the items previously recorded (done by cacheIterator()). In this solution I made the getOrAdvance() method of Recorder synchronized to guarantee that just one resulting stream will get a new item from the source.

So, Cache.of(dataSrc) creates a chain of:

dataSrc ----> Recorder ----> cacheIterator() ----> Stream

Side notes:

  1. The resulting stream from the method Cache.of() permits limited parallelism. For better splitting support the cacheIterator() bellow should return a Spliterator implementation instead, such as AbstractList.RandomAccessSpliterator.
  2. Although it is not a requirement, the Recorder/ cacheIterator() solution also works with infinite data sources that can be short-circuited later.

E.g. it can cache the items of the infinite stream nrs and prints the output bellow without, or with cache (i.e. nrsReplay):

Random rnd = new Random();
Supplier<Stream<String>> nrs = () -> Stream.generate(() -> rnd.nextInt(99)).map(Object::toString);
IntStream.range(1, 6).forEach(size -> out.println(nrs.get().limit(size).collect(joining(","))));
System.out.println();
Supplier<Stream<String>> nrsReplay = Cache.of(nrs);
IntStream.range(1, 6).forEach(size -> out.println(nrsReplay.get().limit(size).collect(joining(","))));

Output:

32
65,94
94,19,34
72,77,66,18
88,41,34,97,28

93
93,65
93,65,71
93,65,71,40
93,65,71,40,68

class Cache {

    public static <T> Supplier<Stream<T>> of(Supplier<Stream<T>> dataSrc) {
        final Spliterator<T> src = dataSrc.get().spliterator(); // !!!maybe it should be lazy and memorized!!!
        final Recorder<T> rec = new Recorder<>(src);
        return () -> {
            // CacheIterator starts on index 0 and reads data from src or
            // from an internal cache of Recorder.
            Spliterator<T> iter = rec.cacheIterator();
            return StreamSupport.stream(iter, false);
        };
    }

    static class Recorder<T> {
        final Spliterator<T> src;
        final List<T> cache = new ArrayList<>();
        final long estimateSize;
        boolean hasNext = true;

        public Recorder(Spliterator<T> src) {
            this.src = src;
            this.estimateSize = src.estimateSize();
        }

        public synchronized boolean getOrAdvance(
                final int index,
                Consumer<? super T> cons) {
            if (index < cache.size()) {
                // If it is in cache then just get if from the corresponding index.
                cons.accept(cache.get(index));
                return true;
            } else if (hasNext)
                // If not in cache then advance the src iterator
                hasNext = src.tryAdvance(item -> {
                    cache.add(item);
                    cons.accept(item);
                });
            return hasNext;
        }

        public Spliterator<T> cacheIterator() {
            return new Spliterators.AbstractSpliterator<T>(
                    estimateSize, src.characteristics()
            ) {
                int index = 0;
                public boolean tryAdvance(Consumer<? super T> cons) {
                    return getOrAdvance(index++, cons);
                }
                public Comparator<? super T> getComparator() {
                    return src.getComparator();
                }
            };
        }
    }
}

Upvotes: 2

Miguel Gamboa
Miguel Gamboa

Reputation: 9383

If using the Reactor Project is an option, then you can simply convert the Supplier<Stream<T>> to a Flux<T>, which already provides the utility cache() method and henceforward use Flux<T> operations rather than Stream<T> operations.

Regarding the example of the original post, where temps is a CompletableFuture<Stream<Integer>> with the result of an HTTP request transformed in a sequence of temperatures in Celsius, then we can perform both queries in the following way:

Flux<Integer> cache = Flux.fromStream(temps::join).cache();
cache.distinct().count().subscribe(out::println);
cache.reduce(Integer::max).subscribe(out::println);

This solution avoids: 1) IllegalStateException on further traversals of this sequence; 2) a first traversal to collect items in a cache.

Upvotes: 1

Jens Hoffmann
Jens Hoffmann

Reputation: 6801

You can use Guava's Suppliers#memoize function to turn a given Supplier into a caching ("memoizing") one.

  1. Turn your dataSrc Supplier<Stream<T>> into a Supplier<List<T>> that collects the stream
  2. Wrap it with Suppliers#memoize

This would be your cache() method:

private static <T> Supplier<Stream<T>> cache(Supplier<Stream<T>> dataSrc) {
  Supplier<List<T>> memoized = Suppliers.memoize(() -> dataSrc.get().collect(toList()));
  return () -> memoized.get().stream();
}

(when mixing in Guava you might need to switch between Guava's version of c.g.c.b.Supplier, and java.util.Supplier, and they can easily be transformed back and forth, however in this case it's not even necessary)

Example

Assume a simple Integer stream that returns the first 5 natural numbers and reports computation to stdout:

private static Supplier<Stream<Integer>> getDataSrc() {
    return () -> IntStream.generate(new IntSupplier() {
        private int i = 0;

        @Override
        public int getAsInt() {
            System.out.println("Computing next i: " + (i + 1));
            return i += 1;
        }
    }).limit(5).boxed();
}

Then running the non-memoized version

Supplier<Stream<Integer>> dataSrc = getDataSrc();
System.out.println(dataSrc.get().collect(toList()));
System.out.println(dataSrc.get().collect(toList()));

yields

Computing next i: 1
Computing next i: 2
Computing next i: 3
Computing next i: 4
Computing next i: 5
[1, 2, 3, 4, 5]
Computing next i: 1
Computing next i: 2
Computing next i: 3
Computing next i: 4
Computing next i: 5
[1, 2, 3, 4, 5]

And running the memoized version

Supplier<Stream<Integer>> dataSrc = cached(getDataSrc());
System.out.println(dataSrc.get().collect(toList()));
System.out.println(dataSrc.get().collect(toList()));

yields

Computing next i: 1
Computing next i: 2
Computing next i: 3
Computing next i: 4
Computing next i: 5
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]

Upvotes: 1

Related Questions