Reputation: 354
We are using GCP in our data pipeline downstream. All the data is imported into BQ. We have a sales feed and we get sales data on hourly basis. The state of the order in the order lifecycle changes throughout the day(eg. Order status changes from created to completed OR Payment settled No to Yes) this way we end up getting 2-3 records for the same order, my goal is to store the latest and greatest state of the Order. Order number is unique no.
How can i dedup the records?
I saw couple of pages in documentation - Will that help ? https://cloud.google.com/bigquery/streaming-data-into-bigquery#manually_removing_duplicates
Upvotes: 1
Views: 962
Reputation: 172944
Will that help ? https://cloud.google.com/bigquery/streaming-data-into-bigquery#manually_removing_duplicates
The link you referenced in your question is for eliminating duplicate insertion of the same record while streaming into the table. It has nothing to do with de-duping data in a sense of removing all related (same order) but latest entry
In your case, I would leave all data and just used Status to filter needed records. Leaving all statuses would leave you ability to do more detailed analysis as each status most likely has some specific attributes values, so you will not lose them
If this is not needed or not acceptable for whatever reason - you can do daily de-dup process - something like below
How can i dedup the records?
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER(PARTITION BY Order_number ORDER BY timestamp DESC) as dup
FROM YourTable
)
WHERE dup = 1
Upvotes: 1
Reputation: 59165
WePay just wrote an article about doing exactly this. They automate MySQL exports every 15 minutes with Airflow, and then deal with updates with one BigQuery query:
They posted this example de-duplication for updates query:
SELECT *
FROM (
-- Assign an incrementing row number to every duplicate row, descending by the last modify time
SELECT *, ROW_NUMBER() OVER (PARTITION BY [id] ORDER BY [modify_time] DESC) etl_row_num
FROM
-- Get the last month's worth of daily YYYYMMDD tables
TABLE_DATE_RANGE([project-1234:cluster_db.table],
DATE_ADD(USEC_TO_TIMESTAMP(UTC_USEC_TO_MONTH(CURRENT_TIMESTAMP())),
-1,
'MONTH'),
CURRENT_TIMESTAMP()),
-- Get all remaining monthly tables prior to a month
TABLE_QUERY([project-1234:cluster_db.table],
"integer(regexp_extract(table_id, r'^table__monthly([0-9]+)'))
<
DATE_ADD(USEC_TO_TIMESTAMP(UTC_USEC_TO_MONTH(CURRENT_TIMESTAMP())), -1, 'MONTH')") )
-- Grab the most recent row, which will always have a row number equal to 1
WHERE etl_row_num = 1;
In their words:
This query looks complicated, but it really does something pretty simple. It partitions all rows by their id field (their primary key). It then sorts within each partition according to the modify_time of the row, descending (so the most recent update is first). It then takes only the first row from each partition (the most recent row). This is how we deduplicate all of our tables.
Upvotes: 2