Reputation: 305
My team runs several hourly/daily Dataflow jobs, which mostly read from and write to GCS (that said, we have dozens of recurring Dataflow jobs scheduled to run within a day). Some jobs read the data from GCS which is produced by previous jobs. Roughly once or twice in a week, we face the following issue:
Job A successfully runs, and writes its output to GCS at gs://A/output/.
(the main issue which takes places between Job A and Job B we identified is described in the next paragraph).
Job B reads from gs://A/output to process some data, but throws an exception because either a temp file was removed while the job was running or due to temp files the keys in the data are no longer unique (e.g., if Job B creates a MapView of the data above, this occurs).
So the reason, as far as we were able to debug is the following:
By the time Job A finishes (according to its pipeline status, e.g.), all 'temp' files under gs://A/output/ should have been renamed to the file name(s) specified by the job.
Yet some of these temp files are lingering around for a few minutes after Job A finishes -- sometimes, we even see those temp files a few hours after the completion of Job A, so we often have to remove them manually.
For instance, we only see one or two temp files lingering around out of ~7,500 files in the directory, and they often go away within an hour, but sometimes they stay for a few hours.
We are wondering about the following:
Is our understanding correct that all "temp" files in GCS output directory should be renamed before Job A "finishes" (e.g., on the monitoring UI it says it succeeded, and it has stopped worker pool, etc.)? In other words, completion of a job is an indicator that the temp files are gone?
If yes, is it a bug that we see temp files much after the job is completed?
If no, how can we (the user) know that the job is "really" completed in the sense that its output directory does not contain temp files? (Should we check it with our own file pattern matching script or something like that?)
I did some search with GCS and Dataflow as keywords, but found nothing close to the issue we are having -- but I might have missed something, so any help will be really appreciated!
Upvotes: 0
Views: 1579
Reputation: 17913
Sorry for the trouble. This is a bug in TextIO.Write, caused by the fact that, when deleting temporary files, it suffers from GCS eventual consistency, and fails to find and delete all of them.
Ffortunately, it still looks at all the correct files when copying temporary files to their final location, so there is no data loss.
We will look into providing a fix.
Note that again due to GCS eventual consistency, job B can also fail to read some outputs produced by A - this will stay true even with the potential fix, and Dataflow has no easy way of addressing this right now. However, the chances of this decrease as you increase the interval between finishing A and starting B.
I would recommend, if possible, to join A and B into a single pipeline, representing this data as an intermediate PCollection
. If you need to have this data materialized as text on GCS for other purposes (e.g. manual inspection, serving, processing by a different tool, etc.), you can still also do that from this joint pipeline, just do not use GCS for passing data between one pipeline and another.
Upvotes: 3