HKS
HKS

Reputation: 31

Same Apache beam code works in Direct Runner but not in Dataflow runner

I have a piece of apache beam pipe code that reads from a file in the GCS bucket and prints it. It is working perfectly with the DirectRunner and prints the file output but with the Dataflow runner it is not printing anything no errors as well.

Do we need to do anything special/different for the Dataflow runner?

Code looks like this

  p = beam.Pipeline(options=pipeline_options)
  read_file_pipe = (
      p
      | "Create {}".format(file_name) >> beam.Create(["Start"])
      | "Read File {}".format(file_name)
      >> ReadFromTextWithFilename(file_path, skip_header_lines=1)
      | beam.Map(print)
  )

  p.run().wait_until_finish()

call stack is python3 Test_Pipe.py --region us-central1 --output_project= --runner=DataflowRunner --project= --temp_location= --service_account_email= --experiments=use_network_tags=default-uscentral1 --subnetwork --no_use_public_ips

Upvotes: 3

Views: 1144

Answers (2)

no-stale-reads
no-stale-reads

Reputation: 358

What solved my problem was to change the approach of reading those files. I basically switched from ReadFromTextWithFilename to MatchFiles (apache_beam.io.fileio).

For a reason that I still don't know ReadFromTextWithFilename hasn't the expected behaviour. Docs are also not clear about it.

My code looks like this now:

from apache_beam.io import fileio
import apache_beam as beam

def __expand_items(element):
    content = element.read_utf8()
    if content:
        items = content.splitlines()
        filename = element.metadata.path
        for item in items:
            yield (filename, item)

...
| "Match" >> fileio.MatchFiles(self.gcs_pattern, empty_match_treatment=fileio.EmptyMatchTreatment.ALLOW)
| "Read" >> fileio.ReadMatches()
| "Flatmap" >> beam.FlatMap(__expand_items)
| "Reshuffle -- avoid fusion" >> beam.Reshuffle()

My reference was this answer.

Upvotes: 0

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

You can use logging instead of print to solve your issue, I added your code snippet with logging :

import logging

p = beam.Pipeline(options=pipeline_options)
  read_file_pipe = (
      p
      | "Create {}".format(file_name) >> beam.Create(["Start"])
      | "Read File {}".format(file_name)
      >> ReadFromTextWithFilename(file_path, skip_header_lines=1)
      | beam.Map(self.log_element)
  )

  p.run().wait_until_finish()

 def log_element(self, element):
     logging.info(element)

     return element

Upvotes: 2

Related Questions