Reputation: 2342
I currently have the code:
gs_folder = sys.argv[1]
options = PipelineOptions(
runner='DataflowRunner',
project='xxx',
job_name=f'xxx{uuid.uuid4()}',
region='us-central1',
temp_location='xxx')
gfs = gcs.GCSFileSystem(options)
p = beam.Pipeline(options=options)
discover_empty = p | 'Filenames' >> beam.Create(gfs.match([gs_folder])[0].metadata_list) | \
'Reshuffle' >> beam.Reshuffle() | \
'Delete empty files' >> beam.ParDo(DeleteEmpty(gfs))
p.run()
This code is based off of the question here. What eventually happens is that I get this error below:
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['gs://xx/xx']
Which does not make much sense since that is the folder that I this deletion to perform on. Furthermore it looks like the dataflow job does run through successfully, but the files that are supposed to be deleted are not correctly deleted. how am I supposed to pass the pipeline options arg here?
I also had a couple of follow up questions about this process. It looks like the beam.Create()
runs locally, and then switches over to dataflow. How can I go about making that part of the pipeline run on dataflow?
Upvotes: 0
Views: 4762
Reputation: 87
you can pass as an command line args
--db_port 5432 --db_pass XXXX
import argparse
# initializing Pipeline object - parse CL args
parser = argparse.ArgumentParser()
parser.add_argument('--db_pass')
parser.add_argument('--db_port')
parser.add_argument('--db_host')
args, beam_args = parser.parse_known_args()
# initializing Pipeline object
options = PipelineOptions(beam_args)
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
args.db_pass // your value
Upvotes: -1
Reputation: 3010
Make sure that you are passing the flag as --input=gs//... That error looks like your command line invocation isn't valid, and the gs path is being interpreted as the whole argument.
beam.Create runs as part of pipeline, but the argument passed to it is evaluate locally. To compute that in the pipeline instead, use beam.Create(None) and then have a DoFn that runs the matching logic.
Upvotes: 1