Reputation: 103
I am new to Apache Beam on DataflowRunner. I am trying to work on the base table and then perform CDC with delta table ( after loading the delta file in delta table.
I am getting the below error message
File "beamETL4.py", line 81, in process_id: TypeError: tuple indices must be integers, not str [while running 'FlatMap(process_id)']
Any pointers will help. Sorry I am still learning.
Details of the code :
About the data:
Files contain 3 columns
Column names :id, name , salary.
Data type :int, string, int
Below is my code module
"""
Author :
Vidya
Modification History :
17-Dec-2019 Vidya Initial Draft
"""
from __future__ import absolute_import
# Import Libraries
import argparse
import logging
import warnings
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from typing import List, Any
warnings.filterwarnings('ignore')
# Define custom class DataIngestion
class DataIngestion():
"""A helper class the load the file to the big query table."""
def __init__(self):
pass
def parse_method(self, input_string):
# Strip out carriage return, newline and quote characters.
values = re.split(",",
re.sub('\r\n', '', re.sub(u'"', '', input_string)))
row = dict(
zip(('id', 'name', 'salary'), values)
)
return row
class DataLakeComparison:
"""helper class """
def __init__(self):
pass
def base_query():
base_query = """
SELECT
id,
name,
salary
FROM CDC.base
"""
return base_query
def delta_query():
delta_query = """
SELECT
id,
name,
salary
FROM CDC.delta
"""
return delta_query
def process_id(self, id, data):
"""This function performs the join of the two datasets."""
result = list(data['delta']) # type: List[Any]
if not data['base']:
logging.info('id is missing in base')
return
if not data['delta']:
logging.info(' id is missing in delta')
return
base = {}
try:
base = data['base'][0]
except KeyError as err:
traceback.print_exc()
logging.error("id Not Found error: %s", err)
for delta in result:
delta.update(base)
return result
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
default='gs://input-cobalt/delta1.csv'
)
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output BQ table to load the delta file ',
default='CDC.delta'
)
parser.add_argument(
'--output2',
dest='output',
required=False,
help='Output BQ table to load the base table',
default='CDC.base'
)
# Parse arguments from command line.
known_args, pipeline_args = parser.parse_known_args(argv)
data_ingestion = DataIngestion()
# Instantiate pipeline
options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=options)
(p
| 'Read from a File' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='id:INTEGER,name:STRING,salary:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
datalakecomparison = DataLakeComparison()
base_data = datalakecomparison.base_query()
delta_data = datalakecomparison.delta_query()
base_data = (
p
| 'Read Delta from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query=base_data, use_standard_sql=True))
|
'Map id in base' >> beam.Map(
lambda row: (
row['id'], row
)))
delta_data = (
p
| 'Read Delta from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query=delta_data, use_standard_sql=True))
|
'Map id in base' >> beam.Map(
lambda row: (
row['id'], row
)))
result = {'base': base_data, 'delta': delta_data} | beam.CoGroupByKey()
joined = result | beam.FlatMap(datalakecomparison.process_id(result))
joined | 'Write Data to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output2,
schema='id:INTEGER,name:STRING,salary:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
p.run().wait_until_finish()
# main function
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Upvotes: 0
Views: 300
Reputation: 1731
I believe there are two issues:
Your not allowed to mutate your inputs within a DoFn but the code delta.update(base)
will mutate the input argument data
. This could be causing an unintended side effect which later manifests in the error your getting. Please create a shallow copy of the row before updating it.
Did you mean to use beam.FlatMapTuple(datalakecomparison.process_id)
instead of beam.FlatMap(datalakecomparison.process_id(result))
. The result of the CoGroupByKey will produce records like: (7, {'base': [{'id': 7, 'name': 'name1' , 'salary': 1}], 'delta': [{'id': 7, 'name': 'name1' , 'salary': 2}]})
. For the above example, process_id will be invoked with id=id1
and data={'base': [{'id': id1, 'name': 'name1' , 'salary': 1}], 'delta': ['id': id1, 'name': 'name1' , 'salary': 2]}
See FlatMapTuple for more details.
Upvotes: 1