JacobmcDonald
JacobmcDonald

Reputation: 21

How to read Text file and returns additional input field using TextIO?

I have a PCollection of KV where key is filename and value is some additional info of the files (e.g., the "Source" systems that generated the files). E.g.,

KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir2/Y1.dat", "SourceY")

I need to read all lines from the files and with the "Source" field, returning as a KV PCollection.

KV(line1 from X1.dat, "SourceX")
KV(line2 from X1.dat, "SourceX")
...
KV(line1 from Y1.dat, "SourceY")

I was able to achieve this by calling FileIO.match() and followed by a DoFn in which I sequentially read the file and append the SourceX (retrieved from a map passed in SideInput).

To get the benefit of parallel reading, could I use TextIO.readAll() to achieve this? TextIO.read() returns a PCollection, without filename info. How can I join it back the map of Filename to Source mapping? Tried WithKeys transfer, but not working ...

Upvotes: 0

Views: 323

Answers (2)

robertwb
robertwb

Reputation: 5104

Currently using FileIO.match() as you are doing is the best way to accomplish this, but once https://github.com/apache/beam/pull/12645 is merged you'll be able to use the new ContextualTextIO transforms.

Note that computing line numbers in a distributed manner is inherently expensive; you might want to see if you can use offsets (much esasier to compute, and ordered the same as line numbers) instead.

Upvotes: 1

Pablo
Pablo

Reputation: 11031

If I understand correctly, you want to read the file in parallel? Unfortunately, TextIO.readAll does not have this feature. You will have to use FileIO.match, and then write your DoFn to read the file in the custom way that you want.

This is because you will not be able to do a random seek into a file and preserve the count of line numbers.

Is reading files serially a bottleneck for your pipeline?

Upvotes: 0

Related Questions