Nhu Dao
Nhu Dao

Reputation: 39

Can not sink to BigQuery using Dataflow Apache Beam

I have 2 csv files: expeditions- 2010s.csv and peaks.csv with the join key 'peak_id'. I'm using notebook with Apache Beam in Dataflow to join them. Here is my code as below

def read_csv_file(readable_file):
    import apache_beam as beam
    import csv
    import io
    import datetime
    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read it as csv, you can also use csv.reader
    csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))

    for row in csv_dict:
        yield (row)

def run(argv=None):
    import apache_beam as beam
    import io
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        # This example file contains a total of only 10 lines.
        # Useful for developing on a small set of data.
        default='gs://bucket/folder/peaks.csv')
    
    parser.add_argument(
        '--input1',
        dest='input1',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        # This example file contains a total of only 10 lines.
        # Useful for developing on a small set of data.
        default='gs://bucket/folder/expeditions- 2010s.csv')

     
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    input_p1 = (
        p
         | 'Read From GCS input1' >> beam.Create([known_args.input1])
         | 'Parse csv file p1' >> beam.FlatMap(read_csv_file)
         | 'Tuple p1' >> beam.Map(lambda e: (e["peakid"], {'peakid': e["peakid"], 'bcdate': e["bcdate"], 'smtdate':e["smtdate"]}))
    )
    input_p2 = (
        p
         | 'Read From GCS input2' >> beam.Create([known_args.input])
         | 'Parse csv file p2' >> beam.FlatMap(read_csv_file)
         | 'Tuple p2' >> beam.Map(lambda e: (e["peakid"], {'peakid': e["peakid"], 'pkname': e["pkname"], 'heightm':e["heightm"]})) 
    )
    # CoGroupByKey: relational join of 2 or more key/values PCollection. It also accept dictionary of key value
    output = (
        (input_p1, input_p2)
        | 'Join' >> beam.CoGroupByKey()
        | 'Final Dict' >> beam.Map(lambda el: to_final_dict(el[1]))
        # | beam.Map(print)
        | 'Write To BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
           table='project:dataset.expeditions',
           method='FILE_LOADS',
           custom_gcs_temp_location='gs://bucket/folder/temp',
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)    
    )
    p.run().wait_until_finish()
    
def to_final_dict(list_tuple_of_tuple):
        result = {}
        for list_tuple in list_tuple_of_tuple:
            for el in list_tuple:
                result.update(el)
        return result


    # runner = DataflowRunner()
    # runner.run_pipeline(p, options=options)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

I got the result before writing to BigQuery:

But it can't write to BigQuery with the error: RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_602_215864ba592a2e01f0c4e2157cc60c47_51de5de53b58409da70f699c833c4db5 failed. Error Result: <ErrorProto location: 'gs://bucket/folder/temp/bq_load/4bbfc44d750c4af5ab376b2e3c3dedbd/project.dataset.expeditions/25905e46-db76-49f0-9b98-7d77131e3e0d' message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 3; errors: 1. Please look into the errors[] collection for more details. File: gs://bucket/folder/temp/bq_load/4bbfc44d750c4af5ab376b2e3c3dedbd/project.dataset.expeditions/25905e46-db76-49f0-9b98-7d77131e3e0d' reason: 'invalid'> [while running 'Write To BigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs']

Upvotes: 1

Views: 226

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I think the date format is not correct, use the following format for your date fields : YYYY-MM-DD => 2013-12-25 and normally it will solve your issue.

Upvotes: 1

Related Questions