Reputation: 47
I am trying to load a csv file from my google storage bucket to bigquery. There is no extra procession, just simple load operation. but its failing.
below is the code snippet :
def bigquery_commit(element):
from google.cloud import bigquery
PROJECT = 'emerald-990'
source = contact_options.output.get()
client = bigquery.Client(project=PROJECT)
dataset_ref = client.dataset('snow')
table_ref = dataset_ref.table(source)
table = client.get_table(table_ref)
errors = client.insert_rows(table, element)
print ("Errors occurred:", errors)
Error
IndexError: string index out of range [while running 'FlatMap(bigquery_commit)']
main function :
options = PipelineOptions()
p = beam.Pipeline(options=options)
(p
| 'Read from a File' >> beam.io.ReadFromText(contact_options.input, skip_header_lines=0)
| beam.FlatMap(bigquery_commit))
p.run().wait_until_finish()
Now, when i pass a test record directly, it works. example :
rows_to_insert = [{u'id': 101, u'name': 'tom', u'salary': 99899}]
errors = client.insert_rows(table, rows_to_insert)
Any idea, what I am missing.
Upvotes: 1
Views: 1645
Reputation: 3462
I have experienced the same error message, but under a slightly different context. I could not find the answers for my issue so I thought of leaving a hint here.
The traceback did not help much:
File "/user_code/main.py", line 198, in execute
errors = client.insert_rows(table, row)
File "/env/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 2113, in insert_rows
json_rows = [_record_field_to_json(schema, row) for row in rows]
File "/env/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 2113, in <listcomp>
json_rows = [_record_field_to_json(schema, row) for row in rows]
File "/env/local/lib/python3.7/site-packages/google/cloud/bigquery/_helpers.py", line 415, in _record_field_to_json
record[subname] = _field_to_json(subfield, subvalue)
File "/env/local/lib/python3.7/site-packages/google/cloud/bigquery/_helpers.py", line 444, in _field_to_json
return _repeated_field_to_json(field, row_value)
File "/env/local/lib/python3.7/site-packages/google/cloud/bigquery/_helpers.py", line 386, in _repeated_field_to_json
values.append(_field_to_json(item_field, item))
File "/env/local/lib/python3.7/site-packages/google/cloud/bigquery/_helpers.py", line 447, in _field_to_json
return _record_field_to_json(field.fields, row_value)
File "/env/local/lib/python3.7/site-packages/google/cloud/bigquery/_helpers.py", line 414, in _record_field_to_json
subvalue = row_value[subindex]
IndexError: string index out of range
I was running a script that inserted a row to BigQuery in a structure with the correct column order, values, and data types. The script selected the appropriate dataset and table. If the table did not exist, then it would create it, and this was the problem.
During the table's creation, I have provided a certain number of columns in its schema, but shortly after I have added two more columns to its schema instead of directly altering the table structure. Therefore, the order of the columns was correct, the inserted rows matched the data types, but the table was not created with all the columns of the inserted row:
IndexError: string index out of range
The solution was deleting and recreating the table with the desired schema. It worked like a charm!
Upvotes: 0
Reputation: 11041
An issue that I notice quickly here is that your data coming from ReadFromText
has type str
, while it seems that client.insert_rows
takes a LIST of elements.
You should consider rewriting your code to use native Apache Beam transforms, like so:
(p
| 'Read from a File' >> beam.io.ReadFromText(contact_options.input, skip_header_lines=0)
| beam.Map(lambda x: json.loads(x)) # Parse your JSON strings
| apache_beam.io.gcp.bigquery.WriteToBigQuery(table=table_ref))
Now, I do not recommend the following approach, but if you really need to fix your code, you'd do:
def bigquery_commit(element):
from google.cloud import bigquery
PROJECT = 'emerald-990'
source = contact_options.output.get()
client = bigquery.Client(project=PROJECT)
dataset_ref = client.dataset('snow')
table_ref = dataset_ref.table(source)
table = client.get_table(table_ref)
parsed_element = json.loads(element)
errors = client.insert_rows(table, [parsed_element])
print ("Errors occurred:", errors)
Upvotes: 2