Reputation: 937
I have a Dataflow job which has a fan-out of steps, each one of them writes result to a different folder on GCS. During a batch job execution, hundreds of files are written per folder.
I'd like to identify when the FileIO step is completed in order to run java code that loads the entire content of the folder to BigQuery table.
I know I can do it per written file with Cloud Functions and PubSub notification but I prefer doing so once only at the completion of the entire folder.
Thanks!
Upvotes: 0
Views: 1182
Reputation: 396
@Daniel Oliveira Suggested an approach that you can follow but in my opinion it is not the best way.
Two reasons why I beg to differ with him:
- Narrow scope for handling job failures : Consider a situation where your Dataflow job succeeded but your loading to Big Query job failed. Due to this tight coupling you won't be able to re-run the second job.
- Performance of second job will become bottleneck : In a production scenario when your file size will grow, your load job will become bottleneck for other dependent process
As you already mentioned that you cannot write directly to BQ in same job. I will suggest you following approach:
- Create another beam job for loading all the file to BQ. You can refer this for reading multiple files in beam.
- Orchestrate both the code with cloud composer using Dataflow Java Operator or Dataflow Template Operator . Set airflow trigger rule as 'all_sucess' and set job1.setUpstream(job2). Please refer airflow documentation here
I hope this helped
Upvotes: 1
Reputation: 1421
There are two ways you could do this:
Run your pipeline and on your pipeline result, call waitUntilFinish
(wait_until_finish
in Python) to delay execution until after your pipeline is complete, as follows:
pipeline.run().waitUntilFinish();
You can verify whether the pipeline completed successfully based on the result of waitUntilFinish
and from there you can load the contents of the folders to BigQuery. The only caveat to this approach is that your code isn't part of the Dataflow pipeline so if you rely on the elements in your pipeline for that step it will be tougher.
The result of the FileIO.Write transform is a WriteFilesResult that allows you to get a PCollection containing all filenames of the written files by calling getPerDestinationOutputFilenames
. From there you can continue your pipeline with transforms that can write all those files to BigQuery. Here's an example in Java:
WriteFilesResult<DestinationT> result = files.apply(FileIO.write()...)
result.getPerDestinationOutputFilenames().apply(...)
The equivalent in Python seems to be called FileResult
but I can't find good documentation for that one.
Upvotes: 1