bunny
bunny

Reputation: 354

How to preserve the state of newest records in BigQuery?

We are using GCP in our data pipeline downstream. All the data is imported into BQ. We have a sales feed and we get sales data on hourly basis. The state of the order in the order lifecycle changes throughout the day(eg. Order status changes from created to completed OR Payment settled No to Yes) this way we end up getting 2-3 records for the same order, my goal is to store the latest and greatest state of the Order. Order number is unique no.

How can i dedup the records?

I saw couple of pages in documentation - Will that help ? https://cloud.google.com/bigquery/streaming-data-into-bigquery#manually_removing_duplicates

Upvotes: 1

Views: 962

Answers (2)

Mikhail Berlyant
Mikhail Berlyant

Reputation: 172944

Will that help ? https://cloud.google.com/bigquery/streaming-data-into-bigquery#manually_removing_duplicates

The link you referenced in your question is for eliminating duplicate insertion of the same record while streaming into the table. It has nothing to do with de-duping data in a sense of removing all related (same order) but latest entry

In your case, I would leave all data and just used Status to filter needed records. Leaving all statuses would leave you ability to do more detailed analysis as each status most likely has some specific attributes values, so you will not lose them

If this is not needed or not acceptable for whatever reason - you can do daily de-dup process - something like below

How can i dedup the records?

SELECT * FROM (
  SELECT *,
    ROW_NUMBER() OVER(PARTITION BY Order_number ORDER BY timestamp DESC) as dup
  FROM YourTable
)
WHERE dup = 1

Upvotes: 1

Felipe Hoffa
Felipe Hoffa

Reputation: 59165

WePay just wrote an article about doing exactly this. They automate MySQL exports every 15 minutes with Airflow, and then deal with updates with one BigQuery query:

They posted this example de-duplication for updates query:

SELECT *
FROM (
    -- Assign an incrementing row number to every duplicate row, descending by the last modify time
    SELECT *, ROW_NUMBER() OVER (PARTITION BY [id] ORDER BY [modify_time] DESC) etl_row_num
    FROM
        -- Get the last month's worth of daily YYYYMMDD tables
        TABLE_DATE_RANGE([project-1234:cluster_db.table],
                         DATE_ADD(USEC_TO_TIMESTAMP(UTC_USEC_TO_MONTH(CURRENT_TIMESTAMP())),
                                                    -1,
                                                    'MONTH'),
                         CURRENT_TIMESTAMP()),
        -- Get all remaining monthly tables prior to a month
        TABLE_QUERY([project-1234:cluster_db.table],
                    "integer(regexp_extract(table_id, r'^table__monthly([0-9]+)'))
                    <
                    DATE_ADD(USEC_TO_TIMESTAMP(UTC_USEC_TO_MONTH(CURRENT_TIMESTAMP())), -1, 'MONTH')") )
-- Grab the most recent row, which will always have a row number equal to 1
WHERE etl_row_num = 1;

In their words:

This query looks complicated, but it really does something pretty simple. It partitions all rows by their id field (their primary key). It then sorts within each partition according to the modify_time of the row, descending (so the most recent update is first). It then takes only the first row from each partition (the most recent row). This is how we deduplicate all of our tables.

Upvotes: 2

Related Questions