Reputation: 9383
Regarding a Supplier<Stream<T>> dataSrc
I would like to cache the Stream
items for further traversals of the same sequence of elements. In this case, assume that dataSrc
always produces the same sequence (e.g. getting a Stream<Integer>
with temperatures in Celsius of March (see example usage bellow)). Thus, option 1) is to first collect the Stream
items, however it will waste one first traversal to add those items into a collection:
Supplier<Stream<T>> dataSrc = ...
List<T> cache = dataSrc.collect(toList()); // **Additional** traversal to collect items
cache.stream().reduce(…) // 1st traversal
cache.stream().reduce(…) // 2nd traversal
... // Nth traversals
I would like to avoid the additional traversal to collect items and the explicit cache
variable and hide it inside the Supplier<>
in such a way that on first traversal the items are implicitly cached and on further traversals the items are accessed from that cache. I think this is similar to the idea of the method cache() of Reactor Project for reactive streams.
Thus, I am outlining an alternative in the following cache()
method implementation, although it already has two problems (at least): 1) the onClose
is not called on traversal finish (and I cannot figure out any way of detecting the end of a traversal); 2) If the first traversal never ends then the cache will never be filled.
Supplier<Stream<T>> dataSrc = cache(...)
dataSrc.get().reduce(…) // 1st traversal
dataSrc.get().reduce(…) // 2nd traversal
... // Nth traversals
static <T> Supplier<Stream<T>> cache(Supplier<Stream<T>> dataSrc) {
final List<T> cache = new ArrayList<>();
final AtomicBoolean started = new AtomicBoolean();
final AtomicBoolean isCached = new AtomicBoolean();
return () -> {
if(isCached.get()) return cache.stream();
if(!started.getAndSet(true)) {
return dataSrc
.get()
.peek(cache::add)
.onClose(() -> isCached.set(true));
}
return dataSrc.get();
};
}
Is there any better approach to achieve an utility cache()
function which returns a new Stream<T>
that caches items on first Stream
traversal (without an implicit additional traversal to collect first) and further Stream
objects are created from that cache?
Here I am getting a stream with the temperatures in March from the World Weather online API. To execute it you must include a dependency of AsyncHttpClient and a valid API key in given URI.
Pattern pat = Pattern.compile("\\n");
boolean [] isEven = {true};
CompletableFuture<Stream<Integer>> temps = asyncHttpClient()
.prepareGet("http://api.worldweatheronline.com/premium/v1/past-weather.ashx?q=37.017,-7.933&date=2018-03-01&enddate=2018-03-31&tp=24&format=csv&key=715b185b36034a4c879141841182802")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(pat::splitAsStream)
.thenApply(str -> str
.filter(w -> !w.startsWith("#")) // Filter comments
.skip(1) // Skip line: Not Available
.filter(l -> isEven[0] = !isEven[0]) // Filter Even line
.map(line -> line.substring(14, 16)) // Extract temperature in celcius
.map(Integer::parseInt)
);
Note that a CompletableFuture<Stream<Integer>>
is functionally compliant with a Supplier<Stream<Integer>>
. Although, the CompletableFuture
caches the resulting stream it cannot be iterated twice.
Problem 1: The following code throws IllegalStateException: stream has already been operated upon or closed
out.println(temps.join().distinct().count());
out.println(temps.join().max(Integer::compare)); // throws IllegalStateException
Problem 2: Collecting it in a List
will induce a first traversal and thus we will have 3 traversals, instead of 2:
CompletableFuture<List<Integer>> list = temps.thenApply(str -> str.collect(toList()));
out.println(list.join().stream().distinct().count()); // 2 traversals
out.println(list.join().stream().distinct().max(Integer::compare));// 1 traversal
Goal: Store items in cache on first traversal. Every time the stream retrieves an item it should store it in an internal cache that will be used on further traversals.
Supplier<Stream<Integer>> cache = Cache.of(temps::join);
out.println(temps.get().distinct().count()); // 1 traversal
out.println(temps.get().max(Integer::compare)); // 1 traversal form cache
Upvotes: 4
Views: 1888
Reputation: 9383
I think the only way to detect the end of the Stream
traversal is through its iterator()
or spliterator()
. Thus, maybe a better option to get a replayable Stream
is to record its items from its iterator (done by Recorder
class of the example bellow) and then implement a new Spliterator
that reads the items previously recorded (done by cacheIterator()
). In this solution I made the getOrAdvance()
method of Recorder
synchronized
to guarantee that just one resulting stream will get a new item from the source.
So, Cache.of(dataSrc)
creates a chain of:
dataSrc
---->
Recorder
---->
cacheIterator()
---->
Stream
Side notes:
Cache.of()
permits limited parallelism. For better splitting support the cacheIterator()
bellow should return a Spliterator
implementation instead, such as AbstractList.RandomAccessSpliterator
.Recorder
/ cacheIterator()
solution also works with infinite data sources that can be short-circuited later. E.g. it can cache the items of the infinite stream nrs
and prints the output bellow without, or with cache (i.e. nrsReplay
):
Random rnd = new Random();
Supplier<Stream<String>> nrs = () -> Stream.generate(() -> rnd.nextInt(99)).map(Object::toString);
IntStream.range(1, 6).forEach(size -> out.println(nrs.get().limit(size).collect(joining(","))));
System.out.println();
Supplier<Stream<String>> nrsReplay = Cache.of(nrs);
IntStream.range(1, 6).forEach(size -> out.println(nrsReplay.get().limit(size).collect(joining(","))));
Output:
32
65,94
94,19,34
72,77,66,18
88,41,34,97,2893
93,65
93,65,71
93,65,71,40
93,65,71,40,68
class Cache {
public static <T> Supplier<Stream<T>> of(Supplier<Stream<T>> dataSrc) {
final Spliterator<T> src = dataSrc.get().spliterator(); // !!!maybe it should be lazy and memorized!!!
final Recorder<T> rec = new Recorder<>(src);
return () -> {
// CacheIterator starts on index 0 and reads data from src or
// from an internal cache of Recorder.
Spliterator<T> iter = rec.cacheIterator();
return StreamSupport.stream(iter, false);
};
}
static class Recorder<T> {
final Spliterator<T> src;
final List<T> cache = new ArrayList<>();
final long estimateSize;
boolean hasNext = true;
public Recorder(Spliterator<T> src) {
this.src = src;
this.estimateSize = src.estimateSize();
}
public synchronized boolean getOrAdvance(
final int index,
Consumer<? super T> cons) {
if (index < cache.size()) {
// If it is in cache then just get if from the corresponding index.
cons.accept(cache.get(index));
return true;
} else if (hasNext)
// If not in cache then advance the src iterator
hasNext = src.tryAdvance(item -> {
cache.add(item);
cons.accept(item);
});
return hasNext;
}
public Spliterator<T> cacheIterator() {
return new Spliterators.AbstractSpliterator<T>(
estimateSize, src.characteristics()
) {
int index = 0;
public boolean tryAdvance(Consumer<? super T> cons) {
return getOrAdvance(index++, cons);
}
public Comparator<? super T> getComparator() {
return src.getComparator();
}
};
}
}
}
Upvotes: 2
Reputation: 9383
If using the Reactor Project is an option, then you can simply convert the Supplier<Stream<T>>
to a Flux<T>
, which already provides the utility cache()
method and henceforward use Flux<T>
operations rather than Stream<T>
operations.
Regarding the example of the original post, where temps
is a CompletableFuture<Stream<Integer>>
with the result of an HTTP request transformed in a sequence of temperatures in Celsius, then we can perform both queries in the following way:
Flux<Integer> cache = Flux.fromStream(temps::join).cache();
cache.distinct().count().subscribe(out::println);
cache.reduce(Integer::max).subscribe(out::println);
This solution avoids: 1) IllegalStateException
on further traversals of this sequence; 2) a first traversal to collect items in a cache.
Upvotes: 1
Reputation: 6801
You can use Guava's Suppliers#memoize function to turn a given Supplier into a caching ("memoizing") one.
Supplier<Stream<T>>
into a Supplier<List<T>>
that collects the streamThis would be your cache()
method:
private static <T> Supplier<Stream<T>> cache(Supplier<Stream<T>> dataSrc) {
Supplier<List<T>> memoized = Suppliers.memoize(() -> dataSrc.get().collect(toList()));
return () -> memoized.get().stream();
}
(when mixing in Guava you might need to switch between Guava's version of c.g.c.b.Supplier, and java.util.Supplier, and they can easily be transformed back and forth, however in this case it's not even necessary)
Example
Assume a simple Integer stream that returns the first 5 natural numbers and reports computation to stdout:
private static Supplier<Stream<Integer>> getDataSrc() {
return () -> IntStream.generate(new IntSupplier() {
private int i = 0;
@Override
public int getAsInt() {
System.out.println("Computing next i: " + (i + 1));
return i += 1;
}
}).limit(5).boxed();
}
Then running the non-memoized version
Supplier<Stream<Integer>> dataSrc = getDataSrc();
System.out.println(dataSrc.get().collect(toList()));
System.out.println(dataSrc.get().collect(toList()));
yields
Computing next i: 1
Computing next i: 2
Computing next i: 3
Computing next i: 4
Computing next i: 5
[1, 2, 3, 4, 5]
Computing next i: 1
Computing next i: 2
Computing next i: 3
Computing next i: 4
Computing next i: 5
[1, 2, 3, 4, 5]
And running the memoized version
Supplier<Stream<Integer>> dataSrc = cached(getDataSrc());
System.out.println(dataSrc.get().collect(toList()));
System.out.println(dataSrc.get().collect(toList()));
yields
Computing next i: 1
Computing next i: 2
Computing next i: 3
Computing next i: 4
Computing next i: 5
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
Upvotes: 1