Reputation: 39
I have 2 csv files: expeditions- 2010s.csv and peaks.csv with the join key 'peak_id'. I'm using notebook with Apache Beam in Dataflow to join them. Here is my code as below
def read_csv_file(readable_file):
import apache_beam as beam
import csv
import io
import datetime
# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Read it as csv, you can also use csv.reader
csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))
for row in csv_dict:
yield (row)
def run(argv=None):
import apache_beam as beam
import io
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
# This example file contains a total of only 10 lines.
# Useful for developing on a small set of data.
default='gs://bucket/folder/peaks.csv')
parser.add_argument(
'--input1',
dest='input1',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
# This example file contains a total of only 10 lines.
# Useful for developing on a small set of data.
default='gs://bucket/folder/expeditions- 2010s.csv')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
input_p1 = (
p
| 'Read From GCS input1' >> beam.Create([known_args.input1])
| 'Parse csv file p1' >> beam.FlatMap(read_csv_file)
| 'Tuple p1' >> beam.Map(lambda e: (e["peakid"], {'peakid': e["peakid"], 'bcdate': e["bcdate"], 'smtdate':e["smtdate"]}))
)
input_p2 = (
p
| 'Read From GCS input2' >> beam.Create([known_args.input])
| 'Parse csv file p2' >> beam.FlatMap(read_csv_file)
| 'Tuple p2' >> beam.Map(lambda e: (e["peakid"], {'peakid': e["peakid"], 'pkname': e["pkname"], 'heightm':e["heightm"]}))
)
# CoGroupByKey: relational join of 2 or more key/values PCollection. It also accept dictionary of key value
output = (
(input_p1, input_p2)
| 'Join' >> beam.CoGroupByKey()
| 'Final Dict' >> beam.Map(lambda el: to_final_dict(el[1]))
# | beam.Map(print)
| 'Write To BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table='project:dataset.expeditions',
method='FILE_LOADS',
custom_gcs_temp_location='gs://bucket/folder/temp',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)
p.run().wait_until_finish()
def to_final_dict(list_tuple_of_tuple):
result = {}
for list_tuple in list_tuple_of_tuple:
for el in list_tuple:
result.update(el)
return result
# runner = DataflowRunner()
# runner.run_pipeline(p, options=options)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
I got the result before writing to BigQuery:
But it can't write to BigQuery with the error: RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_602_215864ba592a2e01f0c4e2157cc60c47_51de5de53b58409da70f699c833c4db5 failed. Error Result: <ErrorProto location: 'gs://bucket/folder/temp/bq_load/4bbfc44d750c4af5ab376b2e3c3dedbd/project.dataset.expeditions/25905e46-db76-49f0-9b98-7d77131e3e0d' message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 3; errors: 1. Please look into the errors[] collection for more details. File: gs://bucket/folder/temp/bq_load/4bbfc44d750c4af5ab376b2e3c3dedbd/project.dataset.expeditions/25905e46-db76-49f0-9b98-7d77131e3e0d' reason: 'invalid'> [while running 'Write To BigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs']
Upvotes: 1
Views: 226
Reputation: 6572
I think the date format is not correct, use the following format for your date fields : YYYY-MM-DD
=> 2013-12-25
and normally it will solve your issue.
Upvotes: 1