Reputation: 1814
I am trying to chunk a text file (lets say, a log file), to only pick a certain no. of rows at a time for processing (lets say, we are splitting log file into smaller ones). I wrote this code in imperative style:
package utils;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.function.Consumer;
public class FileUtils {
public static void main(String[] args) {
readFileInChunks("D:\\demo.txt", 10000, System.out::println);
}
public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) {
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
StringBuilder lines = new StringBuilder();
String line, firstLine = null;
int i;
for (i = 0; (line = br.readLine()) != null; i++) {
if (firstLine == null)
firstLine = line;
lines.append(line + "\n");
if ((i + 1) % chunkSize == 0) {
processor.accept(lines);
lines = new StringBuilder(firstLine + "\n");
}
}
if (lines.toString() != "") {
processor.accept(lines);
}
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
All these years, I spent coding in iterative style and I can't come up with Java 8 streams based functional style implementation of this method.
Is it possible to make readFileInChunks
method return a Stream<String>
of chunks? Or, implement readFileInChunks
in a functional way?
Upvotes: 1
Views: 3749
Reputation: 298429
First, pick the right tool for the job. If you want to process a text file in chunks, it’s much simpler to read the file in chunks, instead of reading it in lines, just to (re-)assemble the lines later on. If you want to have the chunks clipped to line boundary, it’s still simpler to search for the line break closest to the chunk boundary, instead of processing all line breaks.
public static void readFileInChunks(
String filePath, int chunkSize, Consumer<? super CharSequence> processor) {
CharBuffer buf=CharBuffer.allocate(chunkSize);
try(FileReader r = new FileReader(filePath)) {
readMore: for(;;) {
while(buf.hasRemaining()) if(r.read(buf)<0) break readMore;
buf.flip();
int oldLimit=buf.limit();
for(int p=oldLimit-1; p>0; p--)
if(buf.charAt(p)=='\n' || buf.charAt(p)=='\r') {
buf.limit(p+1);
break;
}
processor.accept(buf);
buf.position(buf.limit()).limit(oldLimit);
buf.compact();
}
if(buf.position()>0) {
buf.flip();
processor.accept(buf);
}
} catch (IOException e) {
e.printStackTrace();
}
}
This code might look more complicate at the first glance, but it is copying free. If you want to allow the consumer to keep a reference to the received object or perform concurrent processing, just change the lines processor.accept(buf);
to processor.accept(buf.toString());
so it doesn’t pass the actual buffer to the consumer. This is mandatory, if you want to provide the same functionality as stream. For a stream, the loop has to be converted to a function which can provide the next item on request:
public static Stream<String> fileInChunks(
String filePath, int chunkSize) throws IOException {
FileChannel ch=FileChannel.open(Paths.get(filePath), StandardOpenOption.READ);
CharsetDecoder dec = Charset.defaultCharset().newDecoder();
long size = (long)(ch.size()*dec.averageCharsPerByte());
Reader r = Channels.newReader(ch, dec, chunkSize);
return StreamSupport.stream(new Spliterators.AbstractSpliterator<String>(
(size+chunkSize-1)/chunkSize, Spliterator.ORDERED|Spliterator.NONNULL) {
CharBuffer buf=CharBuffer.allocate(chunkSize);
public boolean tryAdvance(Consumer<? super String> processor) {
CharBuffer buf=this.buf;
if(buf==null) return false;
boolean more=true;
while(buf.hasRemaining() && more) try {
if(r.read(buf)<0) more=false;
} catch(IOException ex) { throw new UncheckedIOException(ex); }
if(more) {
buf.flip();
int oldLimit=buf.limit();
for(int p=oldLimit-1; p>0; p--)
if(buf.charAt(p)=='\n' || buf.charAt(p)=='\r') {
buf.limit(p+1);
break;
}
processor.accept(buf.toString());
buf.position(buf.limit()).limit(oldLimit);
buf.compact();
return true;
}
this.buf=null;
if(buf.position()>0) {
buf.flip();
processor.accept(buf.toString());
return true;
}
return false;
}
}, false);
}
Upvotes: 2
Reputation: 546
i have created and tested a Solution using Java 8 which is below :
package com.grs.stackOverFlow.pack01;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
public class FileUtils {
private static long processed=1;
public static void main(String[] args) throws IOException {
readFileInChunks("src/com/grs/stackOverFlow/pack01/demo.txt", 3, System.out::println);
}
public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) throws IOException {
List<String> lines = Files.readAllLines(Paths.get(filePath));
String firstLine=lines.get(0);
long splitCount=lines.size()<chunkSize?1:lines.size()/chunkSize;
for(int i=1;i<=splitCount;i++){
Optional<String> result=lines.stream()
.skip(processed)
.limit(chunkSize)
.reduce((a,b) -> {processed++; return a+ "\n"+ b;});
//reduce increments processed one less time as it starts with 2 element at a time
processed++;
processor.accept(new StringBuilder("chunk no. = " + i + "\n" + firstLine+ "\n"+ result.orElse("") ));
}
}
}
Upvotes: 1
Reputation: 112
You can define a custom iterator and construct a stream based on it:
public static Stream<String> readFileInChunks(String filePath, int chunkSize) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(filePath));
Iterator<String> iter = new Iterator<String>() {
String nextChunk = null;
@Override
public boolean hasNext() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < chunkSize; i++) {
try {
String nextLine = br.readLine();
if (nextLine == null) break;
sb.append(nextLine).append("\n");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
if (sb.length() == 0) {
nextChunk = null;
return false;
} else {
nextChunk = sb.toString();
return true;
}
}
@Override
public String next() {
if (nextChunk != null || hasNext()) {
String chunk = nextChunk;
nextChunk = null;
return chunk;
} else {
throw new NoSuchElementException();
}
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL), false)
.onClose(() -> {
try {
br.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
Another option is to use the protonpack library, which offers the zipWithIndex method:
public static Stream<String> readFileInChunks(String filePath, int chunkSize) throws IOException {
return new TreeMap<>(StreamUtils.zipWithIndex(Files.lines(Paths.get(filePath)))
.collect(Collectors.groupingBy(el -> el.getIndex() / chunkSize)))
.values().stream()
.map(list -> list.stream()
.map(el -> el.getValue())
.collect(Collectors.joining("\n")));
}
The second solution is more compact, but it collects all lines in a map while grouping them (and then copies them into a TreeMap, in order to have the chunks in the right order), therefore is not suited for processing very large files.
Upvotes: 1
Reputation: 120978
One things you could do is have a custom collector that builds these chunks and then sends them to the consumer, like this for example (not compiled, just a sample):
private static final class ToChunksCollector<T> implements Collector<T, List<StringBuilder>, List<StringBuilder>> {
private final int chunkSize;
public ToChunksCollector(int chunkSize) {
this.chunkSize = chunkSize;
}
@Override
public Supplier<List<StringBuilder>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<StringBuilder>, T> accumulator() {
return (list, line) -> {
if (list.size() == 0) {
list.add(new StringBuilder());
}
StringBuilder lastBuilder = list.get(list.size() - 1);
String[] linesInCurrentBuilder = lastBuilder.toString().split("\n");
// no more room
if (linesInCurrentBuilder.length == chunkSize) {
String lastLine = linesInCurrentBuilder[chunkSize - 1];
StringBuilder builder = new StringBuilder();
builder.append(lastLine).append("\n");
list.add(builder);
} else {
lastBuilder.append(line).append("\n");
}
};
}
@Override
public BinaryOperator<List<StringBuilder>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}
@Override
public Function<List<StringBuilder>, List<StringBuilder>> finisher() {
return Function.identity();
}
// TODO add the relevant characterics
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return EnumSet.noneOf(Characteristics.class);
}
}
And then usage:
public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) {
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
List<StringBuilder> builder = br.lines().collect(new ToChunksCollector<>(chunkSize));
builder.stream().forEachOrdered(processor);
} catch (IOException e) {
e.printStackTrace();
}
}
Upvotes: 0