lys
lys

Reputation: 1027

ERRNO2 for WriteToText files in a Dataflow pipeline

I have a branching pipeline with multiple ParDo transforms that are merged and written to text file records in a GCS bucket.

I am receiving the following messages after my pipeline crashes:

Which looks like it can't find the log file it's been writing to. It seems to be fine until a certain point when the error occurs. I'd like to wrap a try: / except: around it or a breakpoint, but I'm not even sure how to discover what the root cause is.

Is there a way to just write a single file? Or only open a file to write once? It's spamming thousands of output files into this bucket, which is something I'd like to eliminate and may be a factor.

with beam.Pipeline(argv=pipeline_args) as p:
     csvlines = (
            p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
              | 'Parse CSV to Dictionary' >> beam.ParDo(Split())
              | 'Read Files into Memory' >> beam.ParDo(DownloadFilesDoFn())
              | 'Windowing' >> beam.WindowInto(window.FixedWindows(20 * 60))
            )

     b1 = ( csvlines | 'Branch1' >> beam.ParDo(Branch1DoFn()) )
     b2 = ( csvlines | 'Branch2' >> beam.ParDo(Branch2DoFn()) )
     b3 = ( csvlines | 'Branch3' >> beam.ParDo(Branch3DoFn()) )
     b4 = ( csvlines | 'Branch4' >> beam.ParDo(Branch4DoFn()) )
     b5 = ( csvlines | 'Branch5' >> beam.ParDo(Branch5DoFn()) )
     b6 = ( csvlines | 'Branch6' >> beam.ParDo(Branch6DoFn()) )
        
     output = (
           (b1,b2,b3,b4,b5,b6) | 'Merge PCollections' >> beam.Flatten()
           | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
           )

Upvotes: 0

Views: 511

Answers (1)

lys
lys

Reputation: 1027

This question is linked to this previous question which contains more detail about the implementation. The solution there suggested to create an instance of google.cloud.storage.Client() in the start_bundle() of every call to a ParDo(DoFn). This is connected to the same gcs bucket - given via the args in WriteToText(known_args.output)

class DownloadFilesDoFn(beam.DoFn):
    def __init__(self):
        import re
        self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')

    def start_bundle(self):
        import google.cloud.storage
        self.gcs = google.cloud.storage.Client()

    def process(self, element):
        self.file_match = self.gcs_path_regex.match(element['Url'])
        self.bucket = self.gcs.get_bucket(self.file_match.group(1))
        self.blob = self.bucket.get_blob(self.file_match.group(2))
        self.f = self.blob.download_as_bytes()

It's likely the cause of this error is related to to having too many connections to the client. I'm not clear on good practice for this - since it's been suggested elsewhere that you can set up network connections in this way for each bundle.

Adding this to the end to remove the client object from memory at the end of the bundle should help close some unnecessary lingering connections.

    def finish_bundle(self):
        del self.gcs, self.gcs_path_regex

Upvotes: 1

Related Questions