Reputation: 1709
I have a large file that contains a list of items.
I would like to create a batch of items, make an HTTP request with this batch (all of the items are needed as parameters in the HTTP request). I can do it very easily with a for
loop, but as Java 8 lover, I want to try writing this with Java 8's Stream framework (and reap the benefits of lazy processing).
Example:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
I want to do something a long the line of
lazyFileStream.group(500).map(processBatch).collect(toList())
What would be the best way to do this?
Upvotes: 129
Views: 116807
Reputation: 20560
Java 22 has a preview feature called Stream Gatherers (JEP 461 – JEP 473 for the second preview in Java 23) which provides a default Gatherer
that does exactly what you want, called windowFixed(int)
. Use it as follows:
lazyFileStream.gather(Gatherers.windowFixed(500))
.map(this::processBatch)
.toList();
public static <TR>
Gatherer
<TR,?,List<TR>> windowFixed(int windowSize)
Returns a Gatherer that gathers elements into windows -- encounter-ordered groups of elements -- of a fixed size. If the stream is empty then no window will be produced. The last window may contain fewer elements than the supplied window size.
Example:
// will contain: [[1, 2, 3], [4, 5, 6], [7, 8]] List<List<Integer>> windows = Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();
Upvotes: 1
Reputation: 5112
You can also use RxJava:
RxJava v3:
int batchSize = 50;
List<Table> tables = new ArrayList<>();
Observable.fromIterable(_someStream_)
.buffer(batchSize)
.map(batch -> process(batch))
.blockingSubscribe(tables::addAll, t -> Log.warning("Error", t));
Previous version:
Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));
or
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
or
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
Upvotes: 13
Reputation: 4245
In all fairness, take a look at the elegant Vavr solution:
Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
Upvotes: 4
Reputation: 191
It could be easily done using Reactor:
Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
.map(line -> someProcessingOfSingleLine(line))
.buffer(BUFFER_SIZE)
.subscribe(apiService::makeHttpRequest);
Upvotes: 4
Reputation: 5828
You can use apache.commons :
ListUtils.partition(ListOfLines, 500).stream()
.map(partition -> processBatch(partition)
.collect(Collectors.toList());
The partitioning part is done un-lazily but after the list is partitioned you get the benefits of working with streams (e.g. use parallel streams, add filters, etc.). Other answers suggested more elaborate solutions but sometimes readability and maintainability are more important (and sometimes they are not :-) )
Upvotes: 3
Reputation:
Pure Java 8 example that works with parallel streams as well.
How to use:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));
The method declaration and implementation:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
List<ElementType> newBatch = new ArrayList<>(batchSize);
stream.forEach(element -> {
List<ElementType> fullBatch;
synchronized (newBatch)
{
if (newBatch.size() < batchSize)
{
newBatch.add(element);
return;
}
else
{
fullBatch = new ArrayList<>(newBatch);
newBatch.clear();
newBatch.add(element);
}
}
batchProcessor.accept(fullBatch);
});
if (newBatch.size() > 0)
batchProcessor.accept(new ArrayList<>(newBatch));
}
Upvotes: 4
Reputation: 5313
You could also take a look at cyclops-react, I am the author of this library. It implements the jOOλ interface (and by extension JDK 8 Streams), but unlike JDK 8 Parallel Streams it has a focus on Asynchronous operations (such as potentially blocking Async I/O calls). JDK Parallel Streams, by contrast focus on data parallelism for CPU bound operations. It works by managing aggregates of Future based tasks under the hood, but presents a standard extended Stream API to end users.
This sample code may help you get started
LazyFutureStream.parallelCommonBuilder()
.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
There is a tutorial on batching here
And a more general Tutorial here
To use your own Thread Pool (which is probably more appropriate for blocking I/O), you could start processing with
LazyReact reactor = new LazyReact(40);
reactor.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
Upvotes: 8
Reputation: 449
With Java 8
and com.google.common.collect.Lists
, you can do something like:
public class BatchProcessingUtil {
public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
List<List<T>> batches = Lists.partition(data, batchSize);
return batches.stream()
.map(processFunction) // Send each batch to the process function
.flatMap(Collection::stream) // flat results to gather them in 1 stream
.collect(Collectors.toList());
}
}
In here T
is the type of the items in the input list and U
the type of the items in the output list
And You can use it like this:
List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
userKeys,
10, // Batch Size
partialKeys -> service.getUsers(partialKeys)
);
Upvotes: 1
Reputation: 220752
Note! This solution reads the whole file before running the forEach.
You could do it with jOOλ, a library that extends Java 8 streams for single-threaded, sequential stream use-cases:
Seq.seq(lazyFileStream) // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});
Behind the scenes, zipWithIndex()
is just:
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();
class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}
return seq(new ZipWithIndex());
}
... whereas groupBy()
is API convenience for:
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}
(Disclaimer: I work for the company behind jOOλ)
Upvotes: 13
Reputation: 61
this is a pure java solution that's evaluated lazily.
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}
Upvotes: 1
Reputation: 9591
For completeness, here is a Guava solution.
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
In the question the collection is available so a stream isn't needed and it can be written as,
Iterables.partition(data, batchSize).forEach(this::process);
Upvotes: 161
Reputation: 194
Simple example using Spliterator
// read file into stream, try-with-resources
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
//skip header
Spliterator<String> split = stream.skip(1).spliterator();
Chunker<String> chunker = new Chunker<String>();
while(true) {
boolean more = split.tryAdvance(chunker::doSomething);
if (!more) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Chunker<T> {
int ct = 0;
public void doSomething(T line) {
System.out.println(ct++ + " " + line.toString());
if (ct % 100 == 0) {
System.out.println("====================chunk=====================");
}
}
}
Bruce's answer is more comprehensive, but I was looking for something quick and dirty to process a bunch of files.
Upvotes: 1
Reputation: 1851
Pure Java 8 solution:
We can create a custom collector to do this elegantly, which takes in a batch size
and a Consumer
to process each batch:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.Objects.requireNonNull;
/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}
public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}
public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
Optionally then create a helper utility class:
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;
public class StreamUtils {
/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
Example usage:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
I've posted my code on GitHub as well, if anyone wants to take a look:
Upvotes: 54
Reputation: 5443
We had a similar problem to solve. We wanted to take a stream that was larger than system memory (iterating through all objects in a database) and randomise the order as best as possible - we thought it would be ok to buffer 10,000 items and randomise them.
The target was a function which took in a stream.
Of the solutions proposed here, there seem to be a range of options:
Our instinct was originally to use a custom collector, but this meant dropping out of streaming. The custom collector solution above is very good and we nearly used it.
Here's a solution which cheats by using the fact that Stream
s can give you an Iterator
which you can use as an escape hatch to let you do something extra that streams don't support. The Iterator
is converted back to a stream using another bit of Java 8 StreamSupport
sorcery.
/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
A simple example of using this would look like this:
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
The above prints
[A, B, C]
[D, E, F]
For our use case, we wanted to shuffle the batches and then keep them as a stream - it looked like this:
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
This outputs something like (it's randomised, so different every time)
A
C
B
E
D
F
The secret sauce here is that there's always a stream, so you can either operate on a stream of batches, or do something to each batch and then flatMap
it back to a stream. Even better, all of the above only runs as the final forEach
or collect
or other terminating expressions PULL the data through the stream.
It turns out that iterator
is a special type of terminating operation on a stream and does not cause the whole stream to run and come into memory! Thanks to the Java 8 guys for a brilliant design!
Upvotes: 21
Reputation: 560
I wrote a custom Spliterator for scenarios like this. It will fill lists of a given size from the input Stream. The advantage of this approach is that it will perform lazy processing, and it will work with other stream functions.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
return batchSize <= 0
? Stream.of(stream.collect(Collectors.toList()))
: StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}
private static class BatchSpliterator<E> implements Spliterator<List<E>> {
private final Spliterator<E> base;
private final int batchSize;
public BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
;
if (batch.isEmpty())
return false;
action.accept(batch);
return true;
}
@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase == null ? null
: new BatchSpliterator<>(splitBase, batchSize);
}
@Override
public long estimateSize() {
final double baseSize = base.estimateSize();
return baseSize == 0 ? 0
: (long) Math.ceil(baseSize / (double) batchSize);
}
@Override
public int characteristics() {
return base.characteristics();
}
}
Upvotes: 21
Reputation: 100139
Pure Java-8 implementation is also possible:
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
Note that unlike JOOl it can work nicely in parallel (provided that your data
is a random access list).
Upvotes: 78