jmoore255
jmoore255

Reputation: 321

Naming BigQuery Table From Template Runtime Parameters, Python, Apache Beam, Dataflow

I am working on a project in Python Apache Beam Dataflow, and I am needing to name the bigquery tables from the Runtime Parameters provided from launching a dataflow template.

I have had no luck so far, it either provides me with a definition of the runtime parameter, or an empty string.

So I basically need this to work somehow:

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--path',
            type=str,
            help='csv storage path')
        parser.add_value_provider_argument(
            '--table_name',
            type=str,
            help='Table Id')
def run()
    def rewrite_values(element):
        """ Rewrite default env values"""
        try:
            logging.info("File Path with str(): {}".format(str(custom_options.path)))
            logging.info("----------------------------")
            logging.info("element: {}".format(element))
            project_id = str(cloud_options.project)
            file_path = custom_options.path.get()
            table_name = custom_options.table_name.get()

            logging.info("project: {}".format(project_id))
            logging.info("File path: {}".format(file_path))
            logging.info("language: {}".format(table_name))
            logging.info("----------------------------")
        except Exception as e:
            logging.info("Error format----------------------------")
            raise KeyError(e)

        return file_path

    pipeline_options = PipelineOptions()
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # Beginning of the pipeline
    p = beam.Pipeline(options=pipeline_options)

    init_data = (p
                 | beam.Create(["Start"])
                 | beam.FlatMap(rewrite_values))

pipeline processing, running pipeline etc.

save_data_bigquery = (table_data | "Get all numbers" >> beam.ParDo(GetAllNumbers())
                      | 'Flat items' >> beam.FlatMap(flat_item)
                      | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project_id,
                                                                       dataset="defined_dataset",
                                                                       table=table_name, **********
                                                                       schema="id:STRING",
                                                                       batch_size=10000)
                      )

Naming the table in the writetobigquery function is where I am having trouble, I have also tried using the custom_options.table_name, declaring the variables as global etc etc.

I have created a custom DoFn to write to BigQuery although this would be my preferred method.

Upvotes: 1

Views: 706

Answers (1)

Bhaskar Bhuyan
Bhaskar Bhuyan

Reputation: 581

I tried writing a BQ_writer class and wrote the actual WriteToBigQuery inside it.

class BQ_writer(beam.DoFn):
    def __init__(self, schema, output):
        self.output = output
        self.schema = schema

    def process(self, element):
        schema_l = self.schema.get()
        output_table_l = self.output.get()
        logging.info('Writing to table and schema: {}  {}'.format(output_table_l,schema_l))
        beam.io.WriteToBigQuery(output_table_l,
                                schema=schema_l,
                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

and then in the pipeline:

| 'WriteToBigQuery' >> beam.ParDo(BQ_writer(useroptions.schema,useroptions.output))

This worked fine and the flow is built without error. But the data cannot be found in the big query table. May be that we cannot use WriteToBigQuery inside a ParDo function. Suggestions are welcome how to proceed from here..

Upvotes: 0

Related Questions