Reputation: 315
Lets say I have huge webserver log file that does not fit in memory. I need to stream this file to a mapreduce method and save to database. I do this using Java 8 stream api. For example, I get a list after the mapreduce process such as, consumption by client, consumption by ip, consumption by content. But, my needs are not that like that given in my example. Since I cannot share code, I just want to give basic example.
By Java 8 Stream Api, I want to read file exactly once, get 3 lists at the same time, while I am streaming file, parallel or sequential. But parallel would be good. Is there any way to do that?
Upvotes: 6
Views: 2925
Reputation: 4553
I have adapted the answer to this question to your case. The custom Spliterator will "split" the stream into multiple streams that collect by different properties:
@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}
public static class ForkingSpliterator<T>
extends AbstractSpliterator<T>
{
private Spliterator<T> sourceSpliterator;
private List<BlockingQueue<T>> queues = new ArrayList<>();
private boolean sourceDone;
@SafeVarargs
private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
{
super(Long.MAX_VALUE, 0);
sourceSpliterator = source.spliterator();
for (Consumer<Stream<T>> fork : consumers)
{
LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
queues.add(queue);
new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();
}
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));
return !sourceDone;
}
private class ForkedConsumer
extends AbstractSpliterator<T>
{
private BlockingQueue<T> queue;
private ForkedConsumer(BlockingQueue<T> queue)
{
super(Long.MAX_VALUE, 0);
this.queue = queue;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
while (queue.peek() == null)
{
if (sourceDone)
{
// element is null, and there won't be no more, so "terminate" this sub stream
return false;
}
}
// push to consumer pipeline
action.accept(queue.poll());
return true;
}
}
}
You can use it as follows:
streamForked(Stream.of(new Row("content1", "client1", "location1", 1),
new Row("content2", "client1", "location1", 2),
new Row("content1", "client1", "location2", 3),
new Row("content2", "client2", "location2", 4),
new Row("content1", "client2", "location2", 5)),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
Collectors.groupingBy(Row::getContent,
Collectors.summingInt(Row::getConsumption))))),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
Collectors.groupingBy(Row::getLocation,
Collectors.summingInt(Row::getConsumption))))),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,
Collectors.groupingBy(Row::getLocation,
Collectors.summingInt(Row::getConsumption))))));
// Output
// {client2={location2=9}, client1={location1=3, location2=3}}
// {client2={content2=4, content1=5}, client1={content2=2, content1=4}}
// {content2={location1=2, location2=4}, content1={location1=1, location2=8}}
Note that you can do pretty much anything you want with your the copies of the stream. As per your example, I used a stacked groupingBy
collector to group the rows by two properties and then summed up the int property. So the result will be a Map<String, Map<String, Integer>>
. But you could also use it for other scenarios:
rows -> System.out.println(rows.count())
rows -> rows.forEach(row -> System.out.println(row))
rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))
Upvotes: 4
Reputation: 121048
Generally collecting to anything other than standard API's gives you is pretty easy via a custom Collector
. In your case collecting to 3 lists at a time (just a small example that compiles, since you can't share your code either):
private static <T> Collector<T, ?, List<List<T>>> to3Lists() {
class Acc {
List<T> left = new ArrayList<>();
List<T> middle = new ArrayList<>();
List<T> right = new ArrayList<>();
List<List<T>> list = Arrays.asList(left, middle, right);
void add(T elem) {
// obviously do whatever you want here
left.add(elem);
middle.add(elem);
right.add(elem);
}
Acc merge(Acc other) {
left.addAll(other.left);
middle.addAll(other.middle);
right.addAll(other.right);
return this;
}
public List<List<T>> finisher() {
return list;
}
}
return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}
And using it via:
Stream.of(1, 2, 3)
.collect(to3Lists());
Obviously this custom collector does not do anything useful, but just an example of how you could work with it.
Upvotes: 7