Ido Barash
Ido Barash

Reputation: 5122

Complex custom Collector with Java 8

I have a stream of objects which I would like to collect the following way.

Let's say we are handling forum posts:

class Post {
    private Date time;
    private Data data
}

I want to create a list which groups posts by a period. If there were no posts for X minutes, create a new group.

class PostsGroup{
    List<Post> posts = new ArrayList<> ();
}

I want to get a List<PostGroups> containing the posts grouped by the interval.

Example: interval of 10 minutes.

Posts:

[{time:x, data:{}}, {time:x + 3, data:{}} , {time:x + 12, data:{}, {time:x + 45, data:{}}}]

I want to get a list of posts group:

[
 {posts : [{time:x, data:{}}, {time:x + 3, data:{}}, {time:x + 12, data:{}]]},
{posts : [{time:x + 45, data:{}]}
]

Is this possible?

Upvotes: 8

Views: 2350

Answers (3)

Alexander Ivanchenko
Alexander Ivanchenko

Reputation: 28968

Since no one has provided a solution with a custom collector as it was required in the original problem statement, here is a collector-implementation that groups Post objects based on the provided time-interval.

Date class mentioned in the question is obsolete since Java 8 and not recommended to be used in new projects. Hence, LocalDateTime will be utilized instead.

Post & PostGroup

For testing purposes, I've used Post implemented as a Java 16 record (if you substitute it with a class, the overall solution will be fully compliant with Java 8):

public record Post(LocalDateTime dateTime) {}

Also, I've enhanced the PostGroup object. My idea is that it should be capable to decide whether the offered Post should be added to the list of posts or rejected as the Information expert principle suggests (in short: all manipulations with the data should happen only inside a class to which that data belongs).

To facilitate this functionality, two extra fields were added: interval of type Duration from the java.time package to represent the maximum interval between the earliest post and the latest post in a group, and intervalBound of type LocalDateTime which gets initialized after the first post will be added a later on will be used internally by the method isWithinInterval() to check whether the offered post fits into the interval.

public class PostsGroup {
    private Duration interval;
    private LocalDateTime intervalBound;
    private List<Post> posts = new ArrayList<>();
    
    public PostsGroup(Duration interval) {
        this.interval = interval;
    }
    
    public boolean tryAdd(Post post) {
        if (posts.isEmpty()) {
            intervalBound = post.dateTime().plus(interval);
            return posts.add(post);
        } else if (isWithinInterval(post)) {
            return posts.add(post);
        }
        return false;
    }
    
    public boolean isWithinInterval(Post post) {
        return post.dateTime().isBefore(intervalBound);
    }
    
    @Override
    public String toString() {
        return "PostsGroup{" + posts + '}';
    }
}

I'm making two assumptions:

  • All posts in the source are sorted by time (if it is not the case, you should introduce sorted() operation in the pipeline before collecting the results);
  • Posts need to be collected into the minimum number of groups, as a consequence of this it's not possible to split this task and execute stream in parallel.

Building a Custom Collector

We can create a custom collector either inline by using one of the versions of the static method Collector.of() or by defining a class that implements the Collector interface.

These parameters have to be provided while creating a custom collector:

  • Supplier Supplier<A> is meant to provide a mutable container which store elements of the stream. In this case, ArrayDeque (as an implementation of the Deque interface) will be handy as a container to facilitate the convenient access to the most recently added element, i.e. the latest PostGroup.

  • Accumulator BiConsumer<A,T> defines how to add elements into the container provided by the supplier. For this task, we need to provide the logic on that will allow determining whether the next element from the stream (i.e. the next Post) should go into the last PostGroup in the Deque, or a new PostGroup needs to be allocated for it.

  • Combiner BinaryOperator<A> combiner() establishes a rule on how to merge two containers obtained while executing stream in parallel. Since this operation is treated as not parallelizable, the combiner is implemented to throw an AssertionError in case of parallel execution.

  • Finisher Function<A,R> is meant to produce the final result by transforming the mutable container. The finisher function in the code below turns the container, a deque containing the result, into an immutable list.

Note: Java 16 method toList() is used inside the finisher function, for Java 8 it can be replaced with collect(Collectors.toUnmodifiableList()) or collect(Collectors.toList()).

  • Characteristics allow providing additional information, for instance Collector.Characteristics.UNORDERED which is used in this case denotes that the order in which partial results of the reduction produced while executing in parallel is not significant. In this case, collector doesn't require any characteristics.

The method below is responsible for generating the collector based on the provided interval.

public static Collector<Post, ?, List<PostsGroup>> groupPostsByInterval(Duration interval) {
    
    return Collector.of(
        ArrayDeque::new,
        (Deque<PostsGroup> deque, Post post) -> {
            if (deque.isEmpty() || !deque.getLast().tryAdd(post)) { // if no groups have been created yet or if adding the post into the most recent group fails
                PostsGroup postsGroup = new PostsGroup(interval);
                postsGroup.tryAdd(post);
                deque.addLast(postsGroup);
            }
        },
        (Deque<PostsGroup> left, Deque<PostsGroup> right) -> { throw new AssertionError("should not be used in parallel"); },
        (Deque<PostsGroup> deque) -> deque.stream().collect(Collectors.collectingAndThen(Collectors.toUnmodifiableList())));
}

main() - demo

public static void main(String[] args) {
    List<Post> posts =
        List.of(new Post(LocalDateTime.of(2022,4,28,15,0)),
                new Post(LocalDateTime.of(2022,4,28,15,3)),
                new Post(LocalDateTime.of(2022,4,28,15,5)),
                new Post(LocalDateTime.of(2022,4,28,15,8)),
                new Post(LocalDateTime.of(2022,4,28,15,12)),
                new Post(LocalDateTime.of(2022,4,28,15,15)),
                new Post(LocalDateTime.of(2022,4,28,15,18)),
                new Post(LocalDateTime.of(2022,4,28,15,27)),
                new Post(LocalDateTime.of(2022,4,28,15,48)),
                new Post(LocalDateTime.of(2022,4,28,15,54)));
    
    Duration interval = Duration.ofMinutes(10);

    List<PostsGroup> postsGroups = posts.stream()
        .collect(groupPostsByInterval(interval));
    
    postsGroups.forEach(System.out::println);
}

Output:

PostsGroup{[Post[dateTime=2022-04-28T15:00], Post[dateTime=2022-04-28T15:03], Post[dateTime=2022-04-28T15:05], Post[dateTime=2022-04-28T15:08]]}
PostsGroup{[Post[dateTime=2022-04-28T15:12], Post[dateTime=2022-04-28T15:15], Post[dateTime=2022-04-28T15:18]]}
PostsGroup{[Post[dateTime=2022-04-28T15:27]]}
PostsGroup{[Post[dateTime=2022-04-28T15:48], Post[dateTime=2022-04-28T15:54]]}

You can also play around with this Online Demo

Upvotes: 0

Tagir Valeev
Tagir Valeev

Reputation: 100159

This problem could be easily solved using the groupRuns method of my StreamEx library:

long MAX_INTERVAL = TimeUnit.MINUTES.toMillis(10);
StreamEx.of(posts)
        .groupRuns((p1, p2) -> p2.time.getTime() - p1.time.getTime() <= MAX_INTERVAL)
        .map(PostsGroup::new)
        .toList();

I assume that you have a constructor

class PostsGroup {
    private List<Post> posts;

    public PostsGroup(List<Post> posts) {
        this.posts = posts;
    }
}

The StreamEx.groupRuns method takes a BiPredicate which is applied to two adjacent input elements and returns true if they must be grouped together. This method creates the stream of lists where each list represents the group. This method is lazy and works fine with parallel streams.

Upvotes: 3

OldCurmudgeon
OldCurmudgeon

Reputation: 65793

You need to retain state between stream entries and write yourself a grouping classifier. Something like this would be a good start.

class Post {

    private final long time;
    private final String data;

    public Post(long time, String data) {
        this.time = time;
        this.data = data;
    }

    @Override
    public String toString() {
        return "Post{" + "time=" + time + ", data=" + data + '}';
    }

}

public void test() {
    System.out.println("Hello");
    long t = 0;
    List<Post> posts = Arrays.asList(
            new Post(t, "One"),
            new Post(t + 1000, "Two"),
            new Post(t + 10000, "Three")
    );
    // Group every 5 seconds.
    Map<Long, List<Post>> gouped = posts
            .stream()
            .collect(Collectors.groupingBy(new ClassifyByTimeBetween(5000)));
    gouped.entrySet().stream().forEach((e) -> {
        System.out.println(e.getKey() + " -> " + e.getValue());
    });

}

class ClassifyByTimeBetween implements Function<Post, Long> {

    final long delay;
    long currentGroupBy = -1;
    long lastDateSeen = -1;

    public ClassifyByTimeBetween(long delay) {
        this.delay = delay;
    }

    @Override
    public Long apply(Post p) {
        if (lastDateSeen >= 0) {
            if (p.time > lastDateSeen + delay) {
                // Grab this one.
                currentGroupBy = p.time;
            }
        } else {
            // First time - start there.
            currentGroupBy = p.time;
        }
        lastDateSeen = p.time;
        return currentGroupBy;
    }

}

Upvotes: 0

Related Questions