Reputation: 7775
I want to parse multiple files from Cloud storage and insert the results into a BigQuery table.
Selecting one particular file to read works fine. However I'm struggling when switching out the one file to instead include all files by using the *
glob pattern.
I'm executing the job like this:
python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test
This is the first Dataflow experiment and I'm not sure how to debug it or what best practices there are for a pipeline like this.
I would expect that the job gets uploaded to Dataflow runner and that gathering the list of files and iterating each would happen in the cloud at run time. I would expect to be able to pass the contents of all files in the same way as I do when reading one file.
The job blocks already at the point of trying to submit it to the Cloud Dataflow runner.
"""A metric sink workflow."""
from __future__ import absolute_import
import json
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
class ExtractDatapointsFn(beam.DoFn):
"""
Parse json documents and extract the metrics datapoints.
"""
def __init__(self):
super(ExtractDatapointsFn, self).__init__()
self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')
def process(self, element):
"""
Process json that contains metrics of each element.
Args:
element: the element being processed.
Returns:
unmarshaled json for each metric point.
"""
try:
# Catch parsing errors as well as our custom key check.
document = json.loads(element)
if not "DataPoints" in document:
raise ValueError("missing DataPoints")
except ValueError:
self.total_invalid.inc(1)
return
for point in document["DataPoints"]:
yield point
def run(argv=None):
"""
Main entry point; defines and runs the pipeline.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://foobar-sink/*',
help='Input file to process.')
parser.add_argument('--output',
required=True,
help=(
'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(GoogleCloudOptions)
pipe = beam.Pipeline(options=pipeline_options)
# Read the json data and extract the datapoints.
documents = pipe | 'read' >> ReadFromText(known_args.input)
metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())
# BigQuery sink table.
_ = metrics | 'write bq' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
# Actually run the pipeline (all operations above are deferred).
result = pipe.run()
result.wait_until_finish()
total_invalid_filter = MetricsFilter().with_name('total_invalid')
query_result = result.metrics().query(total_invalid_filter)
if query_result['counters']:
total_invalid_counter = query_result['counters'][0]
logging.info('number of invalid documents: %d', total_invalid_counter.committed)
else:
logging.info('no invalid documents were found')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Upvotes: 3
Views: 2781
Reputation: 2024
We do size estimation of sources at job submission so that Dataflow service can use that information when initializing the job (for example, to determine initial number of workers). To estimate size of a glob we need to expand the glob. This could take some time (I believe several minutes for GCS) if the glob expands into more than 100k files. We'll look into ways in which we can improve user experience here.
Upvotes: 1