Reputation: 43
I'm running an Apache Beam pipeline reading text files from Google Cloud Storage, performing some parsing on those files and the writing the parsed data to Bigquery.
Ignoring the parsing and google_cloud_options here for the sake of keeping it short, my code is as follows: (apache-beam 2.5.0 with GCP add-ons and Dataflow as runner)
p = Pipeline(options=options)
lines = p | 'read from file' >>
beam.io.ReadFromText('some_gcs_bucket_path*') | \
'parse xml to dict' >> beam.ParDo(
beam.io.WriteToBigQuery(
'my_table',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
p.run()
This runs fine and successfully appends the relevant data to my Bigquery table for a small number of input files. However when I ramp up my number of input files to +- 800k, I get an error:
"Total size of the BoundedSource objects returned by BoundedSource.split() operation is larger than the allowable limit."
I found Troubleshooting apache beam pipeline import errors [BoundedSource objects is larger than the allowable limit] which recommends using ReadAllFromText in stead of ReadFromText.
However when I swap out I get the following error:
Traceback (most recent call last):
File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 240, in <module>
xmltobigquery.run_dataflow()
File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 220, in run_dataflow
'parse xml to dict' >> beam.ParDo(XmlToDictFn(), job_spec=self.job_spec) | \
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 831, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 488, in __ror__
result = p.apply(self, pvalueish, label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/textio.py", line 470, in expand
return pvalue | 'ReadAllFiles' >> self._read_all_files
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
label or transform.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 416, in expand
| 'ReadRange' >> ParDo(_ReadRange(self._source_from_file)))
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
label or transform.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 568, in expand
| 'RemoveRandomKeys' >> Map(lambda t: t[1]))
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 494, in expand
windowing_saved = pcoll.windowing
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
self.producer.inputs)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
return inputs[0].windowing
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
self.producer.inputs)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
return inputs[0].windowing
AttributeError: 'PBegin' object has no attribute 'windowing'.
Any suggestions?
Upvotes: 4
Views: 8600
Reputation: 19860
To answer the original question: ReadFromText
takes a file pattern argument while ReadAllFromText
takes its file patterns as a pipeline input:
# ReadFromText
(p
| beam.io.ReadFromText("myfile.csv"))
# ReadAllFromText
(p
| beam.Create(["myfile1.csv", "myfile2.csv", "myfile3.csv"])
| beam.io.ReadAllFromText())
Upvotes: 0
Reputation: 1044
I was facing the same issue. As Richardt mentioned beam.Create
has to be called explicitly. An additional challenge is how to use this pattern together with template parameters, because beam.Create
only support in-memory data as described in the documentation.
Google Cloud Support helped me in this case and I want to share the solution with you. The trick is to create the pipeline with a dummy string and then use a mapping lambda to read the input at runtime:
class AggregateOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
help='Path of the files to read from')
parser.add_value_provider_argument(
'--output',
help='Output files to write results to')
def run():
logging.info('Starting main function')
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_options)
options = pipeline_options.view_as(AggregateOptions)
steps = (
pipeline
| 'Create' >> beam.Create(['Start']) # workaround to kickstart the pipeline
| 'Read Input Parameter' >> beam.Map(lambda x: options.input.get()) # get the real input param
| 'Read Data' >> beam.io.ReadAllFromText()
| # ... other steps
Hope this answer is helpful.
Upvotes: 6