Theo
Theo

Reputation: 132972

How to specify multiple input paths to a Dataflow job

I want to run a Dataflow job over multiple inputs from Google Cloud Storage, but the paths I want to pass to the job can't be specified with just the * glob operator.

Consider these paths:

gs://bucket/some/path/20160208/input1
gs://bucket/some/path/20160208/input2
gs://bucket/some/path/20160209/input1
gs://bucket/some/path/20160209/input2
gs://bucket/some/path/20160210/input1
gs://bucket/some/path/20160210/input2
gs://bucket/some/path/20160211/input1
gs://bucket/some/path/20160211/input2
gs://bucket/some/path/20160212/input1
gs://bucket/some/path/20160212/input2

I want my job to work on the files in the 20160209, 20160210 and 20160211 directories, but not on 20160208 (the first) and 20160212 (the last). In reality there's a lot of more dates, and I want to be able to specify an arbitrary range of dates for my job to work on.

The docs for TextIO.Read say:

Standard Java Filesystem glob patterns ("*", "?", "[..]") are supported.

But I can't get this to work. There's a link to Java Filesystem glob patterns , which in turn links to getPathMatcher(String), that lists all the globbing options. One of them is {a,b,c}, which looks exactly like what I need, however, if I pass gs://bucket/some/path/201602{09,10,11}/* to TextIO.Read#from I get "Unable to expand file pattern".

Maybe the docs mean that only *, ? and […] are supported, and if that is the case, how can I construct a glob that Dataflow will accept and that can match an arbitrary date range like the one I describe above?

Update: I've figured out that I can write a chunk of code to so that I can pass in the path prefixes as a comma separated list, create an input from each and use the Flatten transform, but that seems like a very inefficient way of doing it. It looks like the first step reads all input files and immediately write them out again to the temporary location on GCS. Only when all the inputs have been read and written the actual processing starts. This step is completely unnecessary in the job I'm writing. I want the job to read the first file, start processing it and read the next, and so on. This just caused a ton other problems, I'll try to make it work, but it feels like a dead end because of the initial rewriting.

Upvotes: 3

Views: 1422

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6033

The docs do, indeed, mean that only *, ?, and [...] are supported. This means that arbitrary subsets or ranges in alphabetical or numeric order cannot be expressed as a single glob.

Here are some approaches that might work for you:

  1. If the date represented in the file path is also present in the records in the files, then the simplest solution is to read them all and use a Filter transform to select the date range you are interested in.
  2. The approach you tried of many reads in a separates TextIO.Read transforms and flattening them is OK for small sets of files; our tf-idf example does this. You can express arbitrary numerical ranges with a small number of globs so this need not be one read per file (for example the two character range "23 through 67" is 2[3-] plus [3-5][0-9] plus 6[0-7])
  3. If the subset of files is more arbitrary then the number of globs/filenames may exceed the maximum graph size, and the last recommendation is to put the list of files into a PCollection and use a ParDo transform to read each file and emit its contents.

I hope this helps!

Upvotes: 2

Related Questions