Reputation: 911
I would like to retrieve from google cloud storage (Bucket) CSV files, and load the data from these files into a bigquery table without having duplicate data. The goal being to have a code rather optimal in performance that in cost. My current code is the following:
def load_data_in_BQT():
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("id", "INTEGER"),
bigquery.SchemaField("name", "STRING"),
],
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND, # (Addition of the data (possibility of having duplications)
# write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # (Formatting of the table and insertion of the new data (Loss of the old data))
)
uri = "gs://mybucket/myfolder/myfile.csv"
load_job = self.client.load_table_from_uri(
uri, self.table_ref["object"], job_config=job_config,
)
Currently my idea is to read the CSV file in pandas in order to have a dataframe, to load also the data of the bigquery table and to transform them into dataframe, to make my treatment on the whole of the data in order to remove the duplicates, and at the end to reinsert (with the option Truncate) the whole of the cleaned data. However, I find this method harmful if we have a huge set of data that we will have to load at each new input file in our bucket.
What could you suggest? Thank you in advance
Upvotes: 0
Views: 2842
Reputation: 6572
You can use merge
query with Bigquery
:
https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax?hl=en#merge_statement
The idea behind is :
Truncate
the staging table (empty the table), before you execute your scriptPython
script to ingest data in the staging tablemerge
query between the staging and final table :
If the element doesn't exist in the final table, you can insert it, otherwise you can update it.Example of merge query :
MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
INSERT (product, quantity) VALUES(product, quantity)
Orchestrator like Airflow
or Cloud workflow
can easily chain these steps for example.
Upvotes: 1