Reputation: 5122
I have a stream of objects which I would like to collect the following way.
Let's say we are handling forum posts:
class Post {
private Date time;
private Data data
}
I want to create a list which groups posts by a period. If there were no posts for X
minutes, create a new group.
class PostsGroup{
List<Post> posts = new ArrayList<> ();
}
I want to get a List<PostGroups>
containing the posts grouped by the interval.
Example: interval of 10
minutes.
Posts:
[{time:x, data:{}}, {time:x + 3, data:{}} , {time:x + 12, data:{}, {time:x + 45, data:{}}}]
I want to get a list of posts group:
[
{posts : [{time:x, data:{}}, {time:x + 3, data:{}}, {time:x + 12, data:{}]]},
{posts : [{time:x + 45, data:{}]}
]
X + 22
. Then a new post was received at X + 45
.Is this possible?
Upvotes: 8
Views: 2350
Reputation: 28968
Since no one has provided a solution with a custom collector as it was required in the original problem statement, here is a collector-implementation that groups Post
objects based on the provided time-interval.
Date
class mentioned in the question is obsolete since Java 8 and not recommended to be used in new projects. Hence, LocalDateTime
will be utilized instead.
For testing purposes, I've used Post
implemented as a Java 16 record (if you substitute it with a class, the overall solution will be fully compliant with Java 8):
public record Post(LocalDateTime dateTime) {}
Also, I've enhanced the PostGroup
object. My idea is that it should be capable to decide whether the offered Post
should be added to the list of posts or rejected as the Information expert principle suggests (in short: all manipulations with the data should happen only inside a class to which that data belongs).
To facilitate this functionality, two extra fields were added: interval
of type Duration
from the java.time
package to represent the maximum interval between the earliest post and the latest post in a group, and intervalBound
of type LocalDateTime
which gets initialized after the first post will be added a later on will be used internally by the method isWithinInterval()
to check whether the offered post fits into the interval.
public class PostsGroup {
private Duration interval;
private LocalDateTime intervalBound;
private List<Post> posts = new ArrayList<>();
public PostsGroup(Duration interval) {
this.interval = interval;
}
public boolean tryAdd(Post post) {
if (posts.isEmpty()) {
intervalBound = post.dateTime().plus(interval);
return posts.add(post);
} else if (isWithinInterval(post)) {
return posts.add(post);
}
return false;
}
public boolean isWithinInterval(Post post) {
return post.dateTime().isBefore(intervalBound);
}
@Override
public String toString() {
return "PostsGroup{" + posts + '}';
}
}
I'm making two assumptions:
sorted()
operation in the pipeline before collecting the results);We can create a custom collector either inline by using one of the versions of the static method Collector.of()
or by defining a class
that implements the Collector
interface.
These parameters have to be provided while creating a custom collector:
Supplier Supplier<A>
is meant to provide a mutable container which store elements of the stream. In this case, ArrayDeque
(as an implementation of the Deque
interface) will be handy as a container to facilitate the convenient access to the most recently added element, i.e. the latest PostGroup
.
Accumulator BiConsumer<A,T>
defines how to add elements into the container provided by the supplier. For this task, we need to provide the logic on that will allow determining whether the next element from the stream (i.e. the next Post
) should go into the last PostGroup
in the Deque
, or a new PostGroup
needs to be allocated for it.
Combiner BinaryOperator<A> combiner()
establishes a rule on how to merge two containers obtained while executing stream in parallel. Since this operation is treated as not parallelizable, the combiner is implemented to throw an AssertionError
in case of parallel execution.
Finisher Function<A,R>
is meant to produce the final result by transforming the mutable container. The finisher function in the code below turns the container, a deque containing the result, into an immutable list.
Note: Java 16 method toList()
is used inside the finisher function, for Java 8 it can be replaced with collect(Collectors.toUnmodifiableList())
or collect(Collectors.toList())
.
Collector.Characteristics.UNORDERED
which is used in this case denotes that the order in which partial results of the reduction produced while executing in parallel is not significant. In this case, collector doesn't require any characteristics.The method below is responsible for generating the collector based on the provided interval.
public static Collector<Post, ?, List<PostsGroup>> groupPostsByInterval(Duration interval) {
return Collector.of(
ArrayDeque::new,
(Deque<PostsGroup> deque, Post post) -> {
if (deque.isEmpty() || !deque.getLast().tryAdd(post)) { // if no groups have been created yet or if adding the post into the most recent group fails
PostsGroup postsGroup = new PostsGroup(interval);
postsGroup.tryAdd(post);
deque.addLast(postsGroup);
}
},
(Deque<PostsGroup> left, Deque<PostsGroup> right) -> { throw new AssertionError("should not be used in parallel"); },
(Deque<PostsGroup> deque) -> deque.stream().collect(Collectors.collectingAndThen(Collectors.toUnmodifiableList())));
}
main()
- demo
public static void main(String[] args) {
List<Post> posts =
List.of(new Post(LocalDateTime.of(2022,4,28,15,0)),
new Post(LocalDateTime.of(2022,4,28,15,3)),
new Post(LocalDateTime.of(2022,4,28,15,5)),
new Post(LocalDateTime.of(2022,4,28,15,8)),
new Post(LocalDateTime.of(2022,4,28,15,12)),
new Post(LocalDateTime.of(2022,4,28,15,15)),
new Post(LocalDateTime.of(2022,4,28,15,18)),
new Post(LocalDateTime.of(2022,4,28,15,27)),
new Post(LocalDateTime.of(2022,4,28,15,48)),
new Post(LocalDateTime.of(2022,4,28,15,54)));
Duration interval = Duration.ofMinutes(10);
List<PostsGroup> postsGroups = posts.stream()
.collect(groupPostsByInterval(interval));
postsGroups.forEach(System.out::println);
}
Output:
PostsGroup{[Post[dateTime=2022-04-28T15:00], Post[dateTime=2022-04-28T15:03], Post[dateTime=2022-04-28T15:05], Post[dateTime=2022-04-28T15:08]]}
PostsGroup{[Post[dateTime=2022-04-28T15:12], Post[dateTime=2022-04-28T15:15], Post[dateTime=2022-04-28T15:18]]}
PostsGroup{[Post[dateTime=2022-04-28T15:27]]}
PostsGroup{[Post[dateTime=2022-04-28T15:48], Post[dateTime=2022-04-28T15:54]]}
You can also play around with this Online Demo
Upvotes: 0
Reputation: 100159
This problem could be easily solved using the groupRuns
method of my StreamEx library:
long MAX_INTERVAL = TimeUnit.MINUTES.toMillis(10);
StreamEx.of(posts)
.groupRuns((p1, p2) -> p2.time.getTime() - p1.time.getTime() <= MAX_INTERVAL)
.map(PostsGroup::new)
.toList();
I assume that you have a constructor
class PostsGroup {
private List<Post> posts;
public PostsGroup(List<Post> posts) {
this.posts = posts;
}
}
The StreamEx.groupRuns
method takes a BiPredicate
which is applied to two adjacent input elements and returns true if they must be grouped together. This method creates the stream of lists where each list represents the group. This method is lazy and works fine with parallel streams.
Upvotes: 3
Reputation: 65793
You need to retain state between stream entries and write yourself a grouping classifier. Something like this would be a good start.
class Post {
private final long time;
private final String data;
public Post(long time, String data) {
this.time = time;
this.data = data;
}
@Override
public String toString() {
return "Post{" + "time=" + time + ", data=" + data + '}';
}
}
public void test() {
System.out.println("Hello");
long t = 0;
List<Post> posts = Arrays.asList(
new Post(t, "One"),
new Post(t + 1000, "Two"),
new Post(t + 10000, "Three")
);
// Group every 5 seconds.
Map<Long, List<Post>> gouped = posts
.stream()
.collect(Collectors.groupingBy(new ClassifyByTimeBetween(5000)));
gouped.entrySet().stream().forEach((e) -> {
System.out.println(e.getKey() + " -> " + e.getValue());
});
}
class ClassifyByTimeBetween implements Function<Post, Long> {
final long delay;
long currentGroupBy = -1;
long lastDateSeen = -1;
public ClassifyByTimeBetween(long delay) {
this.delay = delay;
}
@Override
public Long apply(Post p) {
if (lastDateSeen >= 0) {
if (p.time > lastDateSeen + delay) {
// Grab this one.
currentGroupBy = p.time;
}
} else {
// First time - start there.
currentGroupBy = p.time;
}
lastDateSeen = p.time;
return currentGroupBy;
}
}
Upvotes: 0