baloo
baloo

Reputation: 7775

How to iterate all files in google cloud storage to be used as dataflow input?

Use case

I want to parse multiple files from Cloud storage and insert the results into a BigQuery table.

Selecting one particular file to read works fine. However I'm struggling when switching out the one file to instead include all files by using the * glob pattern.

I'm executing the job like this:

python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test

This is the first Dataflow experiment and I'm not sure how to debug it or what best practices there are for a pipeline like this.

Expected outcome

I would expect that the job gets uploaded to Dataflow runner and that gathering the list of files and iterating each would happen in the cloud at run time. I would expect to be able to pass the contents of all files in the same way as I do when reading one file.

Actual outcome

The job blocks already at the point of trying to submit it to the Cloud Dataflow runner.

Contents of batch.py

"""A metric sink workflow."""

from __future__ import absolute_import

import json
import argparse
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions

class ExtractDatapointsFn(beam.DoFn):
    """
    Parse json documents and extract the metrics datapoints.
    """
    def __init__(self):
        super(ExtractDatapointsFn, self).__init__()
        self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')

    def process(self, element):
        """
        Process json that contains metrics of each element.

        Args:
            element: the element being processed.

        Returns:
            unmarshaled json for each metric point.
        """
        try:
            # Catch parsing errors as well as our custom key check.
            document = json.loads(element)
            if not "DataPoints" in document:
                raise ValueError("missing DataPoints")
        except ValueError:
            self.total_invalid.inc(1)
            return

        for point in document["DataPoints"]:
            yield point

def run(argv=None):
    """
    Main entry point; defines and runs the pipeline.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://foobar-sink/*',
                        help='Input file to process.')
    parser.add_argument('--output',
                        required=True,
                        help=(
                            'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
                            'or DATASET.TABLE.'))
    known_args, pipeline_args = parser.parse_known_args(argv)
    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(GoogleCloudOptions)
    pipe = beam.Pipeline(options=pipeline_options)

    # Read the json data and extract the datapoints.
    documents = pipe | 'read' >> ReadFromText(known_args.input)
    metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())

    # BigQuery sink table.
    _ = metrics | 'write bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            known_args.output,
            schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

    # Actually run the pipeline (all operations above are deferred).
    result = pipe.run()
    result.wait_until_finish()

    total_invalid_filter = MetricsFilter().with_name('total_invalid')
    query_result = result.metrics().query(total_invalid_filter)
    if query_result['counters']:
        total_invalid_counter = query_result['counters'][0]
        logging.info('number of invalid documents: %d', total_invalid_counter.committed)
    else:
        logging.info('no invalid documents were found')

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Upvotes: 3

Views: 2781

Answers (1)

chamikara
chamikara

Reputation: 2024

We do size estimation of sources at job submission so that Dataflow service can use that information when initializing the job (for example, to determine initial number of workers). To estimate size of a glob we need to expand the glob. This could take some time (I believe several minutes for GCS) if the glob expands into more than 100k files. We'll look into ways in which we can improve user experience here.

Upvotes: 1

Related Questions