Y2Jepic
Y2Jepic

Reputation: 51

Error transfering the data from GCS to Bigquery using dataflow

I am trying to load data from GCS to Bigquery table . i want to create the table on the fly . Below is the content of file

abc,1234,2023-05-10,1,1.5
xyz,2345,2023-05-10,1,1.6

Below is the dataflow code which i am trying to execute in local.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.internal.clients.bigquery import TableSchema

# Define the GCS file path
gcs_file_path = 'gs://gcs_path/table_v1.txt'

# Define the BigQuery table details
project_id = 'project_v1'
dataset_id = 'dataset_v1'
table_id = 'table_v1'
table_schema_str = 'emp_name:STRING,id:INTEGER,temp_date:DATE,contact:INTEGER,dept:FLOAT'

# Define the pipeline options
pipeline_options = PipelineOptions()

# Define the pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
    # Read the CSV file from GCS
    csv_lines = (
        pipeline
        | 'Read CSV' >> beam.io.ReadFromText(gcs_file_path)
    )

    # Parse the CSV lines and convert them to a dictionary
    table_data = (
        csv_lines
        | 'Parse CSV' >> beam.Map(lambda line: dict(zip(table_schema_str.split(','), line.split(','))))
    )

    # Write the table data to BigQuery
    table_spec = bigquery.TableReference(
        projectId=project_id,
        datasetId=dataset_id,
        tableId=table_id,
    )
    table_schema = TableSchema()
    for field in table_schema_str.split(','):
        name, type = field.split(':')
        table_schema.fields.append(bigquery.TableFieldSchema(name=name, type=type))

    (
        table_data
        | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            table_spec,
            schema=table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        )
    )

Below is the error .It is coming for all the columns

WARNING:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will retry. Errors were [<InsertErrorsValueListEntry errors: [<ErrorProto debugInfo: '' location: 'emp_name:STRING' message: 'no such field: emp_name:STRING.' reason: 'invalid'>] index: 0>]

Upvotes: 1

Views: 339

Answers (1)

Poala Astrid
Poala Astrid

Reputation: 832

The error you are encountering in ErrorProto is generally because of the invalid emp_name field. Check your CSV input data and overwrite it on GCS.

Another workaround is to use insert_retry_strategy in WriteToBigquery transform. The rows with permanent errors will be tagged with "FailedRows" by using strategies like “RETRY_NEVER” and “RETRY_ON_TRANSIENT_ERROR”. This data can be processed later manually through the tag: 

 res = pcoll | WriteToBigQuery(...)

 failures = res['FailedRows'] #Processing failed rows after WriteToBigquery step

Upvotes: 2

Related Questions