Gaspium
Gaspium

Reputation: 125

How to implement a Java 8 Stream fluent API and lazily computed

I would like to figure out a simple implementation equivalent to Java 8 Stream that would allow me to explore the development of query algorithms lazily computed (such as map(), filter(), reduce(), etc). NOTE: It is not my goal to achieve a better solution than Stream. On the other hand my only goal is to understand Stream internals.

Yet, every implementation that I found is based on the Iterable<T>, such as the solutions presented in the following answers:

Yet, I do not feel comfortable with any of these solutions because:

  1. they are too verbose.
  2. they are not flexible for new query methods. The inclusion of new query methods requires structural modifications.
  3. Despite the query arguments, they do not take any advantage of new Java 8 features such as: first class functions or default methods.
  4. none of them use the Spliterator<T> approach that is used on Stream<T>.

I know that Spliterator<T> was designed to allow partitioning and parallel processing, but I think that its unique iterator method (boolean tryAdvance(Consumer<t>)) could be exploited to new alternatives than those ones listed above. Moreover and as stated by Brian Goetz:

Spliterator is a better Iterator, even without parallelism. (They're also generally just easier to write and harder to get wrong.)

So, is it possible to develop a more readable, simpler, concise and flexible implementation of a query API lazily computed and based on the same principles of the Stream<T> (except the parallel processing part)?

If yes, how can you do it? I would like to see simpler implementations than those ones listed above and if possible taking advantage of new Java 8 features.

Requirements:

The reason of my question? I think the best approach to learn a query API such as Stream is trying to implement those same methods by myself. I have already done it successfully when I was learning .net Linq. Of course that I did not achieve a better implementation than Linq but that helped me to understand the internals part. So, I am trying to follow the same approach to learn Stream.

This is not so unusual. There are many workshops following this approach for other technologies, such as the functional-javascript-workshop, which most exercises ask for the implementation of existing methods such a: map(), filter(), reduce(), call(), bind(), etc…

Selected Answer: For now I considered Miguel Gamboa’s answer as my choice, instead of Tagir Valeev’s answer because the latter does not allow the implementaton of findAny() or findFirst() without completely traversing whole elements through the forEach() of dataSrc. However, I think that Tagir Valeev’s answer have other merits regarding the concise implementation of some intermediate operations and also on performance, since the forEach() based approach, reduces the overhead of the iteration code that mediates access to the data structure internals as cited by Brian Goetz on point 2 of its answer

Upvotes: 1

Views: 3083

Answers (3)

user_3380739
user_3380739

Reputation: 1254

First of all, I have to say I love the design of Lambdas and Stream APIs. The implementation in JDK is also great and high performance. And I'm not sure if your purpose of learning the implementation/doing it by yourself is good or not. But I did implement Stream API in my open source library abacus-common, both sequential and parallel. Here are the source codes at github: Stream. I can't say how good it is, comparing the impelemenation in JDK. But personally, I think implemenation is pretty straight forward and simple. and it's also high performance.

Disclosure: I'm the developer of abacus-common.

Upvotes: 0

Tagir Valeev
Tagir Valeev

Reputation: 100269

It's quite easy to implement the subset of stateless operations without short-circuiting support. You just should care to always stick with internal iteration. The basic building block is forEach operation which can perform given action for every input element. The body of forEach method is the only thing which changes on different stages. So we can either make abstract class with abstract forEach method or accept a function which is actually a body of forEach. I'll stick with second approach:

public final class MyStream<T> {
    private final Consumer<Consumer<T>> action;

    public MyStream(Consumer<Consumer<T>> action) {
        this.action = action;
    }

    public void forEach(Consumer<T> cons) {
        action.accept(cons);
    }
}

Now let's create some simple sources:

public static <T> MyStream<T> of(Iterable<T> elements) {
    // just redirect to Iterable::forEach
    return new MyStream<>(elements::forEach);
}

@SafeVarargs
public static <T> MyStream<T> of(T... elements) {
    return of(Arrays.asList(elements));
}

public static MyStream<Integer> range(int from, int to) {
    return new MyStream<>(cons -> {
        for(int i=from; i<to; i++) cons.accept(i);
    });
}

Now intermediate operations. They just need to adapt a consumer received by action to perform something else:

public <U> MyStream<U> map(Function<T, U> mapper) {
    return new MyStream<>(cons -> forEach(e -> cons.accept(mapper.apply(e))));
}

public MyStream<T> filter(Predicate<T> pred) {
    return new MyStream<>(cons -> forEach(e -> {
        if(pred.test(e))
            cons.accept(e);
    }));
}

public <U> MyStream<U> flatMap(Function<T, MyStream<U>> mapper) {
    return new MyStream<>(cons -> forEach(e -> mapper.apply(e).forEach(cons)));
}

public MyStream<T> peek(Consumer<T> action) {
    return new MyStream<>(cons -> forEach(e -> {
        action.accept(e);
        cons.accept(e);
    }));
}

public MyStream<T> skip(long n) {
    return new MyStream<>(cons -> {
        long[] count = {0};
        forEach(e -> {
            if(++count[0] > n)
                cons.accept(e);
        });
    });
}

Now let's create some terminal operations using forEach:

public T reduce(T identity, BinaryOperator<T> op) {
    class Box {
        T val = identity;
    }
    Box b = new Box();
    forEach(e -> b.val = op.apply(b.val, e));
    return b.val;
}

public Optional<T> reduce(BinaryOperator<T> op) {
    class Box {
        boolean isPresent;
        T val;
    }
    Box b = new Box();
    forEach(e -> {
        if(b.isPresent) b.val = op.apply(b.val, e);
        else {
            b.val = e;
            b.isPresent = true;
        }
    });
    return b.isPresent ? Optional.empty() : Optional.of(b.val);
}

public long count() {
    return map(e -> 1L).reduce(0L, Long::sum);
}

public Optional<T> maxBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.maxBy(cmp));
}

public Optional<T> minBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.minBy(cmp));
}

So we have our stream now. Let's try it:

System.out.println(MyStream.of(1,2,3,4,5).map(x -> x*2)
                           .reduce(0, Integer::sum));
// 30

System.out.println(MyStream.of("a", "stream", "of", "some", "strings")
                           .flatMap(x -> MyStream.of(", ", x))
                           .skip(1).reduce("", String::concat));
// a, stream, of, some, strings

System.out.println(MyStream.range(0, 100)
                           .filter(x -> x % 3 == 0).count());
// 34

And so on. Such implementation is very simple, yet pretty close to what's going on in the actual Stream API. Of course when you add short-circuiting, parallel streams, primitive specializations and more stateful operations things will be much more complicated.

Note that unlike Stream API, this MyStream can be reused many times:

MyStream<Integer> range = range(0, 10);
range.forEach(System.out::println);
range.forEach(System.out::println); // works perfectly

Upvotes: 5

Miguel Gamboa
Miguel Gamboa

Reputation: 9383

Using functional style programming and taking advantage of Java 8 default methods we can achieve a short and clean solution of a query API lazily computed. For instance, checkout how you can easily implement map() and forEach() methods in type Queryable bellow and then you can use it like this:

List<String> data = Arrays.asList("functional", "super", "formula");
Queryable.of(data) // <=> data.stream().
     .map(String::length)
     .forEach(System.out::println);

If you replace the Queryable.of(dataSrc) call with dataSrc.stream()you will get the same result. The following example illustrates an implementation of map() and forEach() methods. Check for the complete solution and a more detailed description at Queryable repository.

UPDATED with @srborlongan comment. Changed forEach signature from forEach(Consumer<T>) to forEach(Consumer<? super T>) and changed of from of(Collection<T>) to of(Iterable<T>)

@FunctionalInterface
public interface Queryable<T>{

  abstract boolean tryAdvance(Consumer<? super T> action); // <=> Spliterator::tryAdvance

  static <T> boolean truth(Consumer<T> c, T item){
    c.accept(item);
    return true;
  }

  public static <T> Queryable<T> of(Iterable<T> data) {
    final Iterator<T> dataSrc = data.iterator();
    return action -> dataSrc.hasNext() && truth(action, dataSrc.next());
  }

  public default void forEach(Consumer<? super T> action) {
    while (tryAdvance(action)) { }
  }

  public default <R> Queryable<R> map(Function<T, R> mapper) {
    return action -> tryAdvance(item -> action.accept(mapper.apply(item)));
  }
}

Upvotes: 1

Related Questions