Reputation: 125
I would like to figure out a simple implementation equivalent to Java 8 Stream that would allow me to explore the development of query algorithms lazily computed (such as map()
, filter()
, reduce()
, etc). NOTE: It is not my goal to achieve a better solution than Stream. On the other hand my only goal is to understand Stream internals.
Yet, every implementation that I found is based on the Iterable<T>
, such as the solutions presented in the following answers:
Yet, I do not feel comfortable with any of these solutions because:
Spliterator<T>
approach that is used on Stream<T>
.I know that Spliterator<T>
was designed to allow partitioning and parallel processing, but I think that its unique iterator method (boolean tryAdvance(Consumer<t>)
) could be exploited to new alternatives than those ones listed above. Moreover and as stated by Brian Goetz:
Spliterator
is a betterIterator
, even without parallelism. (They're also generally just easier to write and harder to get wrong.)
So, is it possible to develop a more readable, simpler, concise and flexible implementation of a query API lazily computed and based on the same principles of the Stream<T>
(except the parallel processing part)?
If yes, how can you do it? I would like to see simpler implementations than those ones listed above and if possible taking advantage of new Java 8 features.
Requirements:
Iterable<T>
approach.The reason of my question? I think the best approach to learn a query API such as Stream is trying to implement those same methods by myself. I have already done it successfully when I was learning .net Linq. Of course that I did not achieve a better implementation than Linq but that helped me to understand the internals part. So, I am trying to follow the same approach to learn Stream.
This is not so unusual. There are many workshops following this approach for other technologies, such as the functional-javascript-workshop, which most exercises ask for the implementation of existing methods such a: map()
, filter()
, reduce()
, call()
, bind()
, etc…
Selected Answer: For now I considered Miguel Gamboa’s answer as my choice, instead of Tagir Valeev’s answer because the latter does not allow the implementaton of findAny()
or findFirst()
without completely traversing whole elements through the forEach()
of dataSrc
. However, I think that Tagir Valeev’s answer have other merits regarding the concise implementation of some intermediate operations and also on performance, since the forEach()
based approach, reduces the overhead of the iteration code that mediates access to the data structure internals as cited by Brian Goetz on point 2 of its answer
Upvotes: 1
Views: 3083
Reputation: 1254
First of all, I have to say I love the design of Lambdas and Stream APIs. The implementation in JDK is also great and high performance. And I'm not sure if your purpose of learning the implementation/doing it by yourself is good or not. But I did implement Stream API in my open source library abacus-common, both sequential and parallel. Here are the source codes at github: Stream. I can't say how good it is, comparing the impelemenation in JDK. But personally, I think implemenation is pretty straight forward and simple. and it's also high performance.
Disclosure: I'm the developer of abacus-common.
Upvotes: 0
Reputation: 100269
It's quite easy to implement the subset of stateless operations without short-circuiting support. You just should care to always stick with internal iteration. The basic building block is forEach
operation which can perform given action for every input element. The body of forEach
method is the only thing which changes on different stages. So we can either make abstract class with abstract forEach
method or accept a function which is actually a body of forEach
. I'll stick with second approach:
public final class MyStream<T> {
private final Consumer<Consumer<T>> action;
public MyStream(Consumer<Consumer<T>> action) {
this.action = action;
}
public void forEach(Consumer<T> cons) {
action.accept(cons);
}
}
Now let's create some simple sources:
public static <T> MyStream<T> of(Iterable<T> elements) {
// just redirect to Iterable::forEach
return new MyStream<>(elements::forEach);
}
@SafeVarargs
public static <T> MyStream<T> of(T... elements) {
return of(Arrays.asList(elements));
}
public static MyStream<Integer> range(int from, int to) {
return new MyStream<>(cons -> {
for(int i=from; i<to; i++) cons.accept(i);
});
}
Now intermediate operations. They just need to adapt a consumer received by action
to perform something else:
public <U> MyStream<U> map(Function<T, U> mapper) {
return new MyStream<>(cons -> forEach(e -> cons.accept(mapper.apply(e))));
}
public MyStream<T> filter(Predicate<T> pred) {
return new MyStream<>(cons -> forEach(e -> {
if(pred.test(e))
cons.accept(e);
}));
}
public <U> MyStream<U> flatMap(Function<T, MyStream<U>> mapper) {
return new MyStream<>(cons -> forEach(e -> mapper.apply(e).forEach(cons)));
}
public MyStream<T> peek(Consumer<T> action) {
return new MyStream<>(cons -> forEach(e -> {
action.accept(e);
cons.accept(e);
}));
}
public MyStream<T> skip(long n) {
return new MyStream<>(cons -> {
long[] count = {0};
forEach(e -> {
if(++count[0] > n)
cons.accept(e);
});
});
}
Now let's create some terminal operations using forEach
:
public T reduce(T identity, BinaryOperator<T> op) {
class Box {
T val = identity;
}
Box b = new Box();
forEach(e -> b.val = op.apply(b.val, e));
return b.val;
}
public Optional<T> reduce(BinaryOperator<T> op) {
class Box {
boolean isPresent;
T val;
}
Box b = new Box();
forEach(e -> {
if(b.isPresent) b.val = op.apply(b.val, e);
else {
b.val = e;
b.isPresent = true;
}
});
return b.isPresent ? Optional.empty() : Optional.of(b.val);
}
public long count() {
return map(e -> 1L).reduce(0L, Long::sum);
}
public Optional<T> maxBy(Comparator<T> cmp) {
return reduce(BinaryOperator.maxBy(cmp));
}
public Optional<T> minBy(Comparator<T> cmp) {
return reduce(BinaryOperator.minBy(cmp));
}
So we have our stream now. Let's try it:
System.out.println(MyStream.of(1,2,3,4,5).map(x -> x*2)
.reduce(0, Integer::sum));
// 30
System.out.println(MyStream.of("a", "stream", "of", "some", "strings")
.flatMap(x -> MyStream.of(", ", x))
.skip(1).reduce("", String::concat));
// a, stream, of, some, strings
System.out.println(MyStream.range(0, 100)
.filter(x -> x % 3 == 0).count());
// 34
And so on. Such implementation is very simple, yet pretty close to what's going on in the actual Stream API. Of course when you add short-circuiting, parallel streams, primitive specializations and more stateful operations things will be much more complicated.
Note that unlike Stream API, this MyStream
can be reused many times:
MyStream<Integer> range = range(0, 10);
range.forEach(System.out::println);
range.forEach(System.out::println); // works perfectly
Upvotes: 5
Reputation: 9383
Using functional style programming and taking advantage of Java 8 default methods we can achieve a short and clean solution of a query API lazily computed. For instance, checkout how you can easily implement map()
and forEach()
methods in type Queryable
bellow and then you can use it like this:
List<String> data = Arrays.asList("functional", "super", "formula");
Queryable.of(data) // <=> data.stream().
.map(String::length)
.forEach(System.out::println);
If you replace the Queryable.of(dataSrc)
call with dataSrc.stream()
you will get the same result. The following example illustrates an implementation of map()
and forEach()
methods. Check for the complete solution and a more detailed description at Queryable repository.
UPDATED with @srborlongan comment. Changed forEach
signature from forEach(Consumer<T>)
to forEach(Consumer<? super T>)
and changed of
from of(Collection<T>)
to of(Iterable<T>)
@FunctionalInterface
public interface Queryable<T>{
abstract boolean tryAdvance(Consumer<? super T> action); // <=> Spliterator::tryAdvance
static <T> boolean truth(Consumer<T> c, T item){
c.accept(item);
return true;
}
public static <T> Queryable<T> of(Iterable<T> data) {
final Iterator<T> dataSrc = data.iterator();
return action -> dataSrc.hasNext() && truth(action, dataSrc.next());
}
public default void forEach(Consumer<? super T> action) {
while (tryAdvance(action)) { }
}
public default <R> Queryable<R> map(Function<T, R> mapper) {
return action -> tryAdvance(item -> action.accept(mapper.apply(item)));
}
}
Upvotes: 1