Reputation: 51
I am trying to load data from GCS to Bigquery table . i want to create the table on the fly . Below is the content of file
abc,1234,2023-05-10,1,1.5
xyz,2345,2023-05-10,1,1.6
Below is the dataflow code which i am trying to execute in local.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.internal.clients.bigquery import TableSchema
# Define the GCS file path
gcs_file_path = 'gs://gcs_path/table_v1.txt'
# Define the BigQuery table details
project_id = 'project_v1'
dataset_id = 'dataset_v1'
table_id = 'table_v1'
table_schema_str = 'emp_name:STRING,id:INTEGER,temp_date:DATE,contact:INTEGER,dept:FLOAT'
# Define the pipeline options
pipeline_options = PipelineOptions()
# Define the pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read the CSV file from GCS
csv_lines = (
pipeline
| 'Read CSV' >> beam.io.ReadFromText(gcs_file_path)
)
# Parse the CSV lines and convert them to a dictionary
table_data = (
csv_lines
| 'Parse CSV' >> beam.Map(lambda line: dict(zip(table_schema_str.split(','), line.split(','))))
)
# Write the table data to BigQuery
table_spec = bigquery.TableReference(
projectId=project_id,
datasetId=dataset_id,
tableId=table_id,
)
table_schema = TableSchema()
for field in table_schema_str.split(','):
name, type = field.split(':')
table_schema.fields.append(bigquery.TableFieldSchema(name=name, type=type))
(
table_data
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
)
)
Below is the error .It is coming for all the columns
WARNING:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will retry. Errors were [<InsertErrorsValueListEntry errors: [<ErrorProto debugInfo: '' location: 'emp_name:STRING' message: 'no such field: emp_name:STRING.' reason: 'invalid'>] index: 0>]
Upvotes: 1
Views: 339
Reputation: 832
The error you are encountering in ErrorProto is generally because of the invalid emp_name
field. Check your CSV input data and overwrite it on GCS.
Another workaround is to use insert_retry_strategy
in WriteToBigquery transform. The rows with permanent errors will be tagged with "FailedRows" by using strategies like “RETRY_NEVER” and “RETRY_ON_TRANSIENT_ERROR”. This data can be processed later manually through the tag:
res = pcoll | WriteToBigQuery(...)
failures = res['FailedRows'] #Processing failed rows after WriteToBigquery step
Upvotes: 2