Reputation: 434
Considering the following function:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File filePathData) -> filePathData.getName().endsWith("CDR"));
List<CDR> cdrs = new ArrayList<CDR>();
Arrays.asList(files).parallelStream().forEach(file -> readCDRP(cdrs, file));
cdrs.sort(cdrsorter);
}
which reads a list of Files containing CDR and executes the readCDRP() which is this:
private void readCDRP(List<CDR> cdrs, File file) {
final CDR cdr = new CDR(file.getName());
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
List<String> lines = bfr.lines().collect(Collectors.toList());
lines.parallelStream().forEach(e -> {
String[] data = e.split(",", -1);
CDREntry entry = new CDREntry(file.getName());
for (int i = 0; i < data.length; i++) {
entry.setField(i, data[i]);
}
cdr.addEntry(entry);
});
if (cdr != null) {
cdrs.add(cdr);
}
} catch (IOException e) {
e.printStackTrace();
}
}
What I observe is that occasionally and NOT all the time, I either get a ArrayIndexNotBound Exception at the readCDRP function over the line (which is awkward, as the list of cdr is an ArrayList() ):
cdr.addEntry(entry);
or at the last line in execute4() where I apply the sorting.
I think the issue is that the first parallelStream from execute4 is not in a separate space in memory from the second parallelStream execution inside readCDRP() and also seems to share wrongly the data. Using "seem" word as I can't confirm and is just a hutch.
The questions are: is my code buggy to the bone from JDK8 perspective? Is there a workaround using the same flow, something like using CountDownLatch for example? Is limitation of the ForkJoinPool ?
Thanks for any responce....
EDIT(1): The addEntry is part of a class itself:
class CDR {
public final String fileName;
private final List<CDREntry> entries = new ArrayList<CDREntry>();
public CDR(String fileName) {
super();
this.fileName = fileName;
}
public List<CDREntry> getEntries() {
return entries;
}
public List<CDREntry> addEntry(CDREntry e) {
entries.add(e);
return entries;
}
public String getFileName() {
return this.fileName;
}
}
Upvotes: 2
Views: 527
Reputation: 13696
Your code is broken from a thread safety point of view. In readCDR
you add elements to the cdrs
list which is an ArrayList
that does not support concurrent writes. That is why it breaks.
A better approach would be to have readCDR
return a cdr
object and do something like:
List<CDR> cdrs = Arrays.stream(files)
.parallel()
.map(this::readCDR)
.collect(Collectors.toList());
Also, using parallel streams for IO related operations is generally a bad idea, but that is another discussion.
Upvotes: 4
Reputation: 100209
When you starting programming in functional style you should prefer immutable objects which can be fully created via construction (or probably using builder pattern or some factory method). So your CDREntry
class may look like this:
class CDREntry {
private final String[] fields;
private final String name;
public CDREntry(String name, String[] fields) {
this.name = name;
this.fields = fields;
}
// Add getters and whatever
}
And your CDR
class may look like this:
class CDR {
private final String fileName;
private final List<CDREntry> entries;
public CDR(String fileName, List<CDREntry> entries) {
this.fileName = fileName;
this.entries = entries;
}
public List<CDREntry> getEntries() {
return entries;
}
public String getFileName() {
return this.fileName;
}
}
Having such classes things become easier. The rest of the code can be rewritten like this:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File data, String name) ->
data.getName().endsWith("CDR")); // fixed this line: it had compilation error
List<CDR> cdrs = Arrays.stream(files).parallel()
.map(this::readCDRP).sorted(cdrsorter)
.collect(Collectors.toList());
}
private CDR readCDRP(File file) {
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
// I'm not sure that collecting lines into list
// before main processing was actually necessary
return bfr.lines().parallelStream()
.map(e -> new CDREntry(file.getName(), e.split(",", -1)))
.collect(Collectors.collectingAndThen(
Collectors.toList(), list -> new CDR(file.getName(), list)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
In general remember that forEach
is usually not the cleanest way to solve the tasks. It may be helpful when you integrate the streams into legacy code, but in general should be avoided.
Upvotes: 3
Reputation: 1647
you are using a parallel stream and a lambda that has side effects (the lambda updates the ArrayList 'cdrs') try to use a Collector or a Reduction-Operation.
Upvotes: 2