Dario Villalon
Dario Villalon

Reputation: 41

issue reading function in dataflow apache beam

I am working with Apache Beam on Google Dataflow and I'm calling 3 functions

| "Unnest 1" >> beam.Map(lambda record: dict_level1(record))
| "Unnest 2" >> beam.Map(lambda record: unnest_dict(record))
| "Unnest 3" >> beam.Map(lambda record: dict_level0(record))

But when I run the job in dataflow I get the error that the name is not defined.

here is my code

import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions

#este me crea el output y me genera el template
pipeline_options = {
    'project': 'c3t-tango-dev',
    'runner': 'DataflowRunner',
    'region': 'us-central1',  # Asegúrate de especificar la región correctamente
    'staging_location': 'gs://dario-dev-gcs/dataflow-course/staging',
    'template_location': 'gs://dario-dev-gcs/dataflow-course/templates/batch_job_df_gcs_flights4'
}


pipeline_options = PipelineOptions.from_dictionary(pipeline_options)

table_schema = 'airport:STRING, list_delayed_num:INTEGER, list_delayed_time:INTEGER'
table = 'c3t-tango-dev:dataflow.flights_aggr'

class Filter(beam.DoFn):
    def process(self, record):
        if int(record[8]) > 0:
            return [record]

def dict_level1(record):
    dict_ = {}
    dict_['airport'] = record[0]
    dict_['list'] = record[1]
    return (dict_)

def unnest_dict(record):
    def expand(key, value):
        if isinstance(value, dict):
            return [(key + '_' + k, v) for k, v in unnest_dict(value).items()]
        else:
            return [(key, value)]

    items = [item for k, v in record.items() for item in expand(k, v)]
    return dict(items)

def dict_level0(record):
    #print("Record in dict_level0:", record)
    dict_ = {}
    dict_['airport'] = record['airport']
    dict_['list_Delayed_num'] = record['list_Delayed_num'][0]
    dict_['list_Delayed_time'] = record['list_Delayed_time'][0]
    return (dict_)

with beam.Pipeline(options=pipeline_options) as p1:
    serviceAccount = "./composer/dags/c3t-tango-dev-591728f351ee.json"
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = serviceAccount

    Delayed_time = (
        p1
        | "Import Data time" >> beam.io.ReadFromText("gs://dario-dev-gcs/dataflow-course/input/voos_sample.csv",
                                                     skip_header_lines=1)
        | "Split by comma time" >> beam.Map(lambda record: record.split(','))
        | "Filter Delays time" >> beam.ParDo(Filter())
        | "Create a key-value time" >> beam.Map(lambda record: (record[4], int(record[8])))
        | "Sum by key time" >> beam.CombinePerKey(sum)
    )

    Delayed_num = (
        p1
        | "Import Data" >> beam.io.ReadFromText("gs://dario-dev-gcs/dataflow-course/input/voos_sample.csv",
                                                 skip_header_lines=1)
        | "Split by comma" >> beam.Map(lambda record: record.split(','))
        | "Filter Delays" >> beam.ParDo(Filter())
        | "Create a key-value" >> beam.Map(lambda record: (record[4], int(record[8])))
        | "Count by key" >> beam.combiners.Count.PerKey()
    )

    Delay_table = (
      {'Delayed_num': Delayed_num, 'Delayed_time': Delayed_time}
      | "Group By" >> beam.CoGroupByKey()
      | "Unnest 1" >> beam.Map(lambda record: dict_level1(record))
      | "Unnest 2" >> beam.Map(lambda record: unnest_dict(record))
      | "Unnest 3" >> beam.Map(lambda record: dict_level0(record))
      #| beam.Map(print)
      | "Write to BQ" >> beam.io.WriteToBigQuery(
        table,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        custom_gcs_temp_location="gs://dario-dev-gcs/dataflow-course/staging")
    )

p1.run()

I ran this code this generate a template in gcs and then I uploaded the template to dataflow using a custom template and pointing to the template but when runs I got this error

File "/Users/dario/Repo-c3tech/c3t-tango/./composer/dags/gcp_to_bq_table.py", line 76, in NameError: name 'dict_level1' is not defined

enter image description here

Upvotes: 0

Views: 125

Answers (1)

kiran mathew
kiran mathew

Reputation: 2363

To solve the above error message set the --save_main_session pipeline option to True.

Error:

  File "/Users/dario/Repo-c3tech/c3t-tango/./composer/dags/gcp_to_bq_table.py", line 76, in NameError: name 'dict_level1' is not defined

When you execute locally, such as with DirectRunner, this error may not occur. This error occurs if your DoFns are using values in the global namespace that are not available on the Dataflow worker. For more information refer to this link.

Upvotes: 0

Related Questions