Reputation: 1
How can I identify processed files in Data flow Job? I am using a wildcard to read files from cloud storage. but every time when the job runs, it re-read all files.
This is a batch Job and following is sample reading TextIO that I am using.
PCollection<String> filePColection = pipeline.apply("Read files from Cloud Storage ", TextIO.read().from("gs://bucketName/TrafficData*.txt"));
Upvotes: 0
Views: 347
Reputation: 11041
To see a list of files that match your wildcard you can use gsutils
, which is the Cloud Storage command line utility. You'd do the following:
gsutils ls gs://bucketName/TrafficData*.txt
Now, when it comes to running a batch job multiple times, your pipeline has no way to know which files it has analyzed already or not. To avoid analyzing new files you could do either of the following:
Define a Streaming job, and use TextIO
's watchForNewFiles
functionality. You would have to leave your job to run for as long as you want to keep processing files.
Find a way to provide your pipeline with files that have already been analyzed. For this, every time you run your pipeline you could generate a list of files to analyze, put it into a PCollection
, read each with TextIO.readAll()
, and store the list of analyzed files somewhere. Later, when you run your pipeline again you can use this list as a blacklist for files that you don't need to run again.
Let me know in the comments if you want to work out a solution around one of these two options.
Upvotes: 0