Reputation: 642
I have a CSV stored in GCS which I want to load it to BigQuery table. But I need to do some pre-process first so I load it to DataFrame and later load to BigQuery table
import pandas as pd
import json
from google.cloud import bigquery
cols_name_list = [....]. # column name in order
uri = "gs://<bucket>/<path>/<csv_file>"
df = pd.read_csv(uri, dtype="string")
df =df.reindex(columns=cols_name_list)
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
... # added all schema field according to table column type
)
job = client.load_table_from_dataframe(
df, "<bq_table_id>", job_config=job_config
)
job.result()
From code above, I reorder the dataframe column order to match with the order in BigQuery table (not sure if this matter or not) and convert all column to be string type.
I got this error as shown below in which
pyarrow.lib.ArrowInvalid: Could not convert '47803' with type str: tried to convert to int
I also ran it without forcing the dtypes to be string and I got another error
pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
The code and data looks normal. So, I tried downgrading the version of numpy and pyarrow and still cause the same error.
Update:
I updated the code to force only string column
df =pd.read_csv(uri, dtype={"B" : "string"})
This is the example CSV data that I worked with
A,B,C,D,E,F,G
47803,000000000020030263,629,,2021-01-12 23:26:37,,
where column type of BQ table should be like this
job_config = bigquery.LoadJobConfig(
schema = [
bigquery.SchemaField("A", "INTEGER"),
bigquery.SchemaField("B", "STRING"),
bigquery.SchemaField("C", "INTEGER"),
bigquery.SchemaField("D", "INTEGER"),
bigquery.SchemaField("E", "DATETIME"),
bigquery.SchemaField("F", "INTEGER"),
bigquery.SchemaField("G", "DATETIME")
]
)
Now, when I'm trying to load data with load_table_from_dataframe()
with these configs, I got this error
pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int
So, I print the dtypes out
A int64
B string
C int64
D float64
E object
F float64
G float64
dtype: object
Which column that is the issue right now and how can I fix this? The error is not quite useful for debugging. Since the column that supposed to be int is already int. The only column with string type no need to be converted to int but the error thrown like that
Upvotes: 7
Views: 23912
Reputation: 77
I'm facing the same issue, but I'm trying to use pandas.to_bgq
This is my code:
#transform new_data to pandas dataframe. new_data is a JSON
new_data = pd.DataFrame(new_data, columns=['FechaHora', 'Fecha', 'Hora', 'Entrada', 'Grupo1', 'HourlyCountOne', 'HourlyCountTwo', 'Variacion', 'sumToAforo', 'DiaSemana', 'Tipo', 'NDiaSemana'])
print("new_data.dtypes: ", new_data.dtypes)
if new_data['FechaHora'].dtype == 'object':
new_data['FechaHora'] = pd.to_datetime(new_data['FechaHora'], format='%Y-%m-%dT%H:%M:%S')
new_data['HourlyCountOne'] = pd.to_numeric(new_data['HourlyCountOne'], errors='coerce').fillna(0).astype(int)
new_data['HourlyCountTwo'] = pd.to_numeric(new_data['HourlyCountTwo'], errors='coerce').fillna(0).astype(int)
new_data['Variacion'] = pd.to_numeric(new_data['Variacion'], errors='coerce').fillna(0).astype(int)
new_data['NDiaSemana'] = pd.to_numeric(new_data['NDiaSemana'], errors='coerce').fillna(0).astype(int)
print("new_data.dtypes: ", new_data.dtypes)
#print new_data row by row
for index, row in new_data.iterrows():
#print the row as a single string
print(f"{index}={row.to_string()}")
pandas_gbq.to_gbq(new_data, 'project_id.dataset.table_id', project_id='project_id', if_exists='append')
print("Loaded {} rows into {}.".format(len(new_data), 'project-id.dataset.tableid'))
return "Loaded {} rows into {}.".format(len(new_data), 'project-id.dataset.tableid')
when I deploy and run the function, I'm getting the error:
pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int
I'm unable to find what column or value is causing the issue.
This is the bigquery table schema:
FechaHora:DATETIME,
Fecha:DATE,
Hora:TIME,
Entrada:STRING,
Grupo1:STRING,
HourlyCountOne:INTEGER,
HourlyCountTwo:INTEGER,
Variacion:INTEGER,
sumToAforo:BOOLEAN,
DiaSemana:STRING,
Tipo:STRING,
NDiaSemana:INTEGER
This is the printed dtypes of new_data
FechaHora datetime64[ns]
Fecha object
Hora object
Entrada object
Grupo1 object
HourlyCountOne int64
HourlyCountTwo int64
Variacion int64
sumToAforo bool
DiaSemana object
Tipo object
NDiaSemana int64
This is the output when I print the first row of the pandas dataframe:
0=FechaHora 2023-09-27 04:00:00
Fecha 2023-09-27
Hora 04:00:00
Entrada Entrada Motos
Grupo1 Entrada Motos
HourlyCountOne 9
HourlyCountTwo 0
Variacion 9
sumToAforo False
DiaSemana Miércoles
Tipo Moto
NDiaSemana 4
Upvotes: 1
Reputation: 691
I had the same issue with load_table_from_dataframe. I guess there is some issue which needs to be checked since it only works when all 3 are passed (schema, autodetect set to False, source_format set to bigquery.SourceFormat.CSV). I didn't want to replicate the schema again since I want to maintain it only at one place in my terraform.
I changed my code like below to read the existing BigQuery schema and pass it back during the write.
# Drop index from the pandas df
write_df.reset_index(drop=True, inplace=True)
# Get the dataset and table ref
dataset_ref = client.dataset("DATASET")
table_ref = dataset_ref.table("TABLE")
# Set the job config
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
job_config.schema = client.get_table(table_ref).schema
job_config.autodetect = False
job_config.source_format = bigquery.SourceFormat.CSV
# Write to BQ
load_job = client.load_table_from_dataframe(
write_df, table_ref,
job_config=job_config)
Upvotes: 0
Reputation: 33
It may not be a solution for your issue, specifically, but I've face the same problem and realized that the .to_gbd() retrieves the original table schema. Deleting the previously created table solves this issue to any dtype, apparently.
Upvotes: 0
Reputation: 1820
You have to set the source_format
to the format of the source data inside your LoadJobConfig. In this case you can set autodetect=False
as you have explicitly specified the schema of the table. Below is the sample according to which you can make changes in your code and try to execute.
job_config = bigquery.LoadJobConfig(
schema = [
bigquery.SchemaField("A", "INTEGER"),
bigquery.SchemaField("B", "STRING"),
bigquery.SchemaField("C", "INTEGER"),
bigquery.SchemaField("D", "INTEGER"),
bigquery.SchemaField("E", "DATETIME"),
bigquery.SchemaField("F", "INTEGER"),
bigquery.SchemaField("G", "DATETIME")
],
autodetect=False,
source_format=bigquery.SourceFormat.CSV
)
CSV Data:
A,B,C,D,E,F,G
47803,000000000020030263,629,785,2021-01-12 23:26:37,986,2022-01-12 23:26:37
Output:
Upvotes: 16