chinabuffet
chinabuffet

Reputation: 5578

Apache Beam TextIO.Read with line number

Is it possible to get access to line numbers with the lines read into the PCollection from TextIO.Read? For context here, I'm processing a CSV file and need access to the line number for a given line.

If not possible through TextIO.Read it seems like it should be possible using some kind of custom Read or transform, but I'm having trouble figuring out where to begin.

Upvotes: 4

Views: 3132

Answers (1)

f.loris
f.loris

Reputation: 1041

You can use FileIO to read the file manually, where you can determine the line number when you read from the ReadableFile.

A simple solution can look as follows:

p
    .apply(FileIO.match().filepattern("/file.csv"))
    .apply(FileIO.readMatches())
    .apply(FlatMapElements
            .into(strings())
            .via((FileIO.ReadableFile f) -> {
                List<String> result = new ArrayList<>();
                try (BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "UTF-8"))) {
                    int lineNr = 1;
                    String line = br.readLine();
                    while (line != null) {
                        result.add(lineNr + "," + line);
                        line = br.readLine();
                        lineNr++;
                    }
                } catch (IOException e) {
                    throw new RuntimeException("Error while reading", e);
                }
                return result;
            }));

The solution above just prepends the line number to each input line.

Upvotes: 1

Related Questions