Reputation: 31
I have a piece of apache beam pipe code that reads from a file in the GCS bucket and prints it. It is working perfectly with the DirectRunner and prints the file output but with the Dataflow runner it is not printing anything no errors as well.
Do we need to do anything special/different for the Dataflow runner?
Code looks like this
p = beam.Pipeline(options=pipeline_options)
read_file_pipe = (
p
| "Create {}".format(file_name) >> beam.Create(["Start"])
| "Read File {}".format(file_name)
>> ReadFromTextWithFilename(file_path, skip_header_lines=1)
| beam.Map(print)
)
p.run().wait_until_finish()
call stack is python3 Test_Pipe.py --region us-central1 --output_project= --runner=DataflowRunner --project= --temp_location= --service_account_email= --experiments=use_network_tags=default-uscentral1 --subnetwork --no_use_public_ips
Upvotes: 3
Views: 1144
Reputation: 358
What solved my problem was to change the approach of reading those files. I basically switched from ReadFromTextWithFilename
to MatchFiles
(apache_beam.io.fileio
).
For a reason that I still don't know ReadFromTextWithFilename
hasn't the expected behaviour. Docs are also not clear about it.
My code looks like this now:
from apache_beam.io import fileio
import apache_beam as beam
def __expand_items(element):
content = element.read_utf8()
if content:
items = content.splitlines()
filename = element.metadata.path
for item in items:
yield (filename, item)
...
| "Match" >> fileio.MatchFiles(self.gcs_pattern, empty_match_treatment=fileio.EmptyMatchTreatment.ALLOW)
| "Read" >> fileio.ReadMatches()
| "Flatmap" >> beam.FlatMap(__expand_items)
| "Reshuffle -- avoid fusion" >> beam.Reshuffle()
My reference was this answer.
Upvotes: 0
Reputation: 6572
You can use logging
instead of print
to solve your issue, I added your code snippet with logging
:
import logging
p = beam.Pipeline(options=pipeline_options)
read_file_pipe = (
p
| "Create {}".format(file_name) >> beam.Create(["Start"])
| "Read File {}".format(file_name)
>> ReadFromTextWithFilename(file_path, skip_header_lines=1)
| beam.Map(self.log_element)
)
p.run().wait_until_finish()
def log_element(self, element):
logging.info(element)
return element
Upvotes: 2