Reputation: 1629
I have to work through a fixed width file that contains a predefined record layout, multiple types of records exist and the first character of the record determines its type. Because it is fixed width it is not always possible to fit a whole record type on one line, so the second character is a sequence number of the record. For example:
0This is the header record------------------------------------
1This is another record always existing out of one lin--------
21This is a record that can be composed out of multiple parts.
22This is the second part of record type 2--------------------
21This is a new record of type 2, first part.-----------------
22This is the second part of record type 2--------------------
23This is the third part of record type 2---------------------
...
With the Stream API, I would like to parse this file:
Stream<String> lines = Files.lines(Paths.get(args[1]));
lines.map(line -> RecordFactory.createRecord(line)).collect(Collectors.toList());
But since this stream delivers line by line the mapping of record 2 is incomplete when it parses the first line of record type 2 (record type 2 sequence 1). The next line (record type 2 sequence 2) should be added to the result of the previous mapping.
How can I solve this problem with lambda's without having to scarify thread safety?
Upvotes: 5
Views: 2166
Reputation: 137084
Operating on consecutive elements matching a predicate is not easily achievable currently with the Stream API.
One option would be to use the StreamEx library that offers the groupRuns
operation:
Returns a stream consisting of lists of elements of this stream where adjacent elements are grouped according to supplied predicate.
The following code groups together lines where the record part number of the consecutive line is strictly greater than the one of the previous line. The record number is extracted with a regular expression that finds all the digits after the first ignored digit.
private static final Pattern PATTERN = Pattern.compile("\\d(\\d+)");
public static void main(String[] args) throws IOException {
try (StreamEx<String> stream = StreamEx.ofLines(Paths.get("..."))) {
List<Record> records =
stream.groupRuns((s1, s2) -> getRecordPart(s2) > getRecordPart(s1))
.map(RecordFactory::createRecord)
.toList();
}
}
private static final int getRecordPart(String str) {
Matcher matcher = PATTERN.matcher(str);
if (matcher.find()) {
return Integer.parseInt(matcher.group(1));
}
return 1; // if the pattern didn't find anything, it means the record is on a single line
}
This assumes that your RecordFactory
would create a Record
from a List<String>
and not from a String
. Note that this solution can be run in parallel, although it would probably be better to store the content of the file into a List
and post-process that list if you want better parallel performance (at the cost of memory).
Upvotes: 4
Reputation: 12942
I think you have to make your own implementation for Collector
interface for example as Collector<String,List<String>,List<String>>
.
This collector have to get elements and add it in temporary list 2nd element in accumulator
and not add it to the 3rd one unless it is completed, its implementation will not be easy specially if you want to run this in parallel you have to implement combiner
, and this will provide you with list of lines inside memory so it your file is to big it will be a problem, other with to have make pipeline using bounded Queue and this is not straightforward using streams may check https://github.com/jOOQ/jOOL.
Upvotes: 0