Reputation: 313
I have 4 large files (around 1.5 gb each) and I want to process these files, read each line of the file and convert it to a customer object. I have the following implementation.
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
public class CustomerDataAccess {
public static void main(String[] args) throws IOException {
CustomerFileItem john = new CustomerFileItem("CustFile1", "http://w.customer1.com");
CustomerFileItem sarah = new CustomerFileItem("CustFile2", "http://w.customer2.com");
CustomerFileItem charles = new CustomerFileItem("CustFile3", "http://w.customer3.com");
List<CustomerFileItem> customers = Arrays.asList(john, sarah, charles);
Iterator<CustomerFileLineItem> custList = new CustIterator(customers);
}
public static class CustIterator implements Iterator<CustomerFileLineItem> {
private static final int HEADER_LINES = 9; // 8 + 1 blank line
BufferedReader bufferedReader;
private int index = 0;
private final List<CustomerFileItem> custFileItems = new ArrayList<>();
public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
this.custFileItems.addAll(custFileItems);
processNext();
}
private void processNext() throws IOException {
if (bufferedReader != null) {
bufferedReader.close();
}
if (index < custFileItems.size()) { // only update if there's another file
CustomerFileItem custFileItem = custFileItems.get(index);
GZIPInputStream gis = new GZIPInputStream(new URL(custFileItem.url).openStream());
// default buffer size is 8 KB
bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
// read the first few lines
for (int i = 0; i < HEADER_LINES; i++) {
bufferedReader.readLine();
}
}
index++;
}
@Override
public boolean hasNext() {
try {
boolean currentReaderStatus = bufferedReader.ready();
if (currentReaderStatus) {
return true;
} else if (index < custFileItems.size()) {
// at end of current file, try to get the next one
processNext();
return hasNext();
} else { // no more files left
return false;
}
} catch (IOException e) {
try {
bufferedReader.close();
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
throw new UncheckedIOException(e);
}
}
@Override
public CustomerFileLineItem next() {
try {
String line = bufferedReader.readLine();
if (line != null) {
return new CustomerFileLineItem(line);
} else {
return null;
}
} catch (IllegalArgumentException exception) {
return null;
} catch (IOException e) {
try {
bufferedReader.close();
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
throw new UncheckedIOException(e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
throw new UnsupportedOperationException();
}
}
public static class CustomerFileLineItem {
private static final int NUMBER_OF_FIELDS = 4;
final String id;
final String productNumber;
final String usageType;
final String operation;
public CustomerFileLineItem(final String line) {
String[] strings = line.split(",");
if (strings.length != NUMBER_OF_FIELDS) {
throw new IllegalArgumentException(String.format("Malformed customer file line: %s", line));
}
this.id = strings[0];
this.productNumber = strings[1];
this.usageType = strings[3];
this.operation = strings[4];
}
}
static class CustomerFileItem {
private String fileName;
private String url;
public CustomerFileItem(String fileName, String url) {
this.fileName = fileName;
this.url = url;
}
}
}
In one of use case I want use streams in the output list(custList). But I know I can't use streams with Iterator
. How I can convert it to Spliterator
? Or how can I implement the same that I implement with Iterator in Spliterator?
Upvotes: 0
Views: 1134
Reputation: 298539
TL;DR You don’t need to implement an Iterator
or Spliterator
, you can simply use a Stream
in the first place:
private static final int HEADER_LINES = 9; // 8 + 1 blank line
Stream<CustomerFileLineItem> stream = customers.stream()
.flatMap(custFileItem -> {
try {
GZIPInputStream gis
= new GZIPInputStream(new URL(custFileItem.url).openStream());
BufferedReader br = new BufferedReader(new InputStreamReader(gis, UTF_8));
// read the first few lines
for (int i = 0; i < HEADER_LINES; i++) br.readLine();
return br.lines().onClose(() -> {
try { br.close(); }
catch(IOException ex) { throw new UncheckedIOException(ex); }
});
} catch(IOException ex) {
throw new UncheckedIOException(ex);
}
})
.map(CustomerFileLineItem::new);
But for completeness, addressing the question literally:
First of all, you should not add a method definition like
@Override
public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
throw new UnsupportedOperationException();
}
This method will surely backfire when you use the Stream API, as that’s where most non-short-circuiting operations will end up.
There is not even a reason to add it. When you don’t declare the method, you’ll get a reasonable default method from the Iterator
interface.
When you fixed this issue, you can easily convert the Iterator
to a Spliterator
using Spliterators.pliteratorUnknownSize(Iterator, int)
.
But there is no reason to do so. Your code becomes simpler when implementing Spliterator
in the first place:
public static class CustIterator
extends Spliterators.AbstractSpliterator<CustomerFileLineItem> {
private static final int HEADER_LINES = 9; // 8 + 1 blank line
BufferedReader bufferedReader;
private final ArrayDeque<CustomerFileItem> custFileItems;
public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
super(Long.MAX_VALUE, ORDERED|NONNULL);
this.custFileItems = new ArrayDeque<>(custFileItems);
processNext();
}
@Override
public boolean tryAdvance(Consumer<? super CustomerFileLineItem> action) {
if(bufferedReader == null) return false;
try {
String line = bufferedReader.readLine();
while(line == null) {
processNext();
if(bufferedReader == null) return false;
line = bufferedReader.readLine();
}
action.accept(new CustomerFileLineItem(line));
return true;
}
catch(IOException ex) {
if(bufferedReader != null) try {
bufferedReader.close();
bufferedReader = null;
}
catch(IOException ex2) {
ex.addSuppressed(ex2);
}
throw new UncheckedIOException(ex);
}
}
private void processNext() throws IOException {
if (bufferedReader != null) {
bufferedReader.close();
bufferedReader = null;
}
if (!custFileItems.isEmpty()) { // only update if there's another file
CustomerFileItem custFileItem = custFileItems.remove();
GZIPInputStream gis
= new GZIPInputStream(new URL(custFileItem.url).openStream());
// default buffer size is 8 KB
bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
// read the first few lines
for (int i = 0; i < HEADER_LINES; i++) {
bufferedReader.readLine();
}
}
}
}
But, as said at the beginning, you don’t even need to implement a Spliterator
here.
Upvotes: 4
Reputation: 44486
Every Iterable<T>
object has the following methods:
Iterator<T> iterator()
returning Iterator<T>
default Spliterator<T> spliterator()
(default method) returning Spliterator<T>
Therefore, you want to create Iterable<T>
back from Iterator<T>
which requires to override the only one non-default and abstract method:
Iterable<CustomerFileLineItem> iterable = new Iterable<CustomerFileLineItem>() {
@Override
public Iterator<CustomerFileLineItem> iterator() {
return custList;
}
};
This can be shortened into a lambda expression resulting in:
Iterable<CustomerFileLineItem> iterable = () -> custList;
Spliterator<CustomerFileLineItem> spliterator = iterable.spliterator();
... so the Stream is easily to be created:
Stream<CustomerFileLineItem> stream = StreamSupport.stream(spliterator, false);
Upvotes: 1