Reputation: 6872
I am using standard SQL MERGE to update at regular target table based on an source external table that is a set of CVS files in a bucket. Here is a simplified input file:
$ gsutil cat gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv
"id" "PortfolioCode" "ValuationDate" "load_checksum"
"1" "CIMDI000TT" "2020-03-28" "checksum1"
The MERGE statement is:
MERGE xx_producer_conformed.demo T
USING xx_producer_raw.demo_raw S
ON
S.id = T.id
WHEN NOT MATCHED THEN
INSERT (id, PortfolioCode, ValuationDate, load_checksum, insert_time, file_name, extract_timestamp, wf_id)
VALUES (id, PortfolioCode, ValuationDate, load_checksum, CURRENT_TIMESTAMP(), _FILE_NAME, REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]{8}_([0-9]{14}).tsv'),'scheduled__2020-08-19T16:24:00+00:00')
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
T.id = S.id, T.PortfolioCode = S.PortfolioCode, T.ValuationDate = S.ValuationDate, T.load_checksum = S.load_checksum, T.file_name = S._FILE_NAME, T.extract_timestamp = REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]{8}_([0-9]{14}).tsv'), T.wf_id = 'scheduled__2020-08-19T16:24:00+00:00'
If I wipe the target table and rerun the MERGE I get an row modified count of 1:
bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r288f8d33_000001740b413532_1 ... (0s) Current status: DONE
Number of affected rows: 1
This successfully results in the target table updating:
$ bq query --format=csv --max_rows=10 --use_legacy_sql=false "select * from ta_producer_conformed.demo"
Waiting on bqjob_r7f6b6a46_000001740b5057a3_1 ... (0s) Current status: DONE
id,PortfolioCode,ValuationDate,load_checksum,insert_time,file_name,extract_timestamp,wf_id
1,CIMDI000TT,2020-03-28,checksum1,2020-08-20 09:44:20,gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv,20200505050505,scheduled__2020-08-19T16:24:00+00:00
If I return the MERGE again I get row modified count of 0:
$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r3de2f833_000001740b4161b3_1 ... (0s) Current status: DONE
Number of affected rows: 0
That results no changes to the target table. So everything is working expected.
The problem is that when I run the code on a more complex example with many input files to insert into an empty target table I end up with rows that have the same id
where count(id)
is not equal to count(distinct id)
:
$ bq query --use_legacy_sql=false --max_rows=999999 --location=asia-east2 "select count(id) as total_records from xx_producer_conformed.xxx; select count(distinct id) as unique_records from xx_producer_conformed.xxx; "
Waiting on bqjob_r5df5bec8_000001740b7dfa50_1 ... (1s) Current status: DONE
select count(id) as total_records from xx_producer_conformed.xxx; -- at [1:1]
+---------------+
| total_records |
+---------------+
| 11582 |
+---------------+
select count(distinct id) as unique_records from xx_producer_conformed.xxx; -- at [1:78]
+----------------+
| unique_records |
+----------------+
| 5722 |
+----------------+
This surprises me as my expectation is that the underlying logic would step through each line in each underlying file and only insert on the first id
then on any subsequent id
it would update. So my expectation is that you cannot have more rows than unique id
s in the input bucket.
If I then try to run the MERGE again it fails telling me that there is more than one row in the target table with the same id:
$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r2fe783fc_000001740b8271aa_1 ... (0s) Current status: DONE
Error in query string: Error processing job 'xxxx-10843454-datamesh-
dev:bqjob_r2fe783fc_000001740b8271aa_1': UPDATE/MERGE must match at most one
source row for each target row
I was expecting that the there would be no two rows with the same "id" when the MERGE statement does it's inserts.
All the tables and queries used are generated from from a file that lists the "business columns". So the simple demo example above is identical to the full scale queries in terms of the login and joins in the MERGE statement.
Why would the MERGE query above result in rows with duplicated "id" and how do I fix this?
Upvotes: 0
Views: 1770
Reputation: 6872
The problem is very easily repeatable by wiping the target table and duplicating a relatively large input as the input:
AAAA_20200805_20200814200000.tsv
AAAA_clone_20200805_20200814200000.tsv
I believe that what is at the heart of this is parallelism. A single large MERGE of many files can spawn many worker threads in parallel. It would be very slow for any two worker threads running in parallel loading different files to immediately "see" each others inserts. Rather I expect that they would run independently and not "see" each others writes into separate buffers. When the buffers are finally combined it leads to multiple inserts with the same id
.
To fix this I am using some CTEs to pick the latest record for any id
based on extract_timestamp by using using ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC)
. We can then filter by the lowest value to pick the latest version of the record. The full query is:
MERGE xx_producer_conformed.demo T
USING (
WITH cteExtractTimestamp AS (
SELECT
id, PortfolioCode, ValuationDate, load_checksum
, _FILE_NAME
, REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]{8}_([0-9]{14}).tsv') AS extract_timestamp
FROM
xx_producer_raw.demo_raw
),
cteRanked AS (
SELECT
id, PortfolioCode, ValuationDate, load_checksum
, _FILE_NAME
, extract_timestamp
, ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC) AS row_num
FROM
cteExtractTimestamp
)
SELECT
id, PortfolioCode, ValuationDate, load_checksum
, _FILE_NAME
, extract_timestamp
, row_num
, "{{ task_instance.xcom_pull(task_ids='get_run_id') }}" AS wf_id
FROM cteRanked
WHERE row_num = 1
) S
ON
S.id = T.id
WHEN NOT MATCHED THEN
INSERT (id, PortfolioCode, ValuationDate, load_checksum, insert_time, file_name, extract_timestamp, wf_id)
VALUES (id, PortfolioCode, ValuationDate, load_checksum, CURRENT_TIMESTAMP(), _FILE_NAME, extract_timestamp, wf_id)
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
T.id = S.id, T.PortfolioCode = S.PortfolioCode, T.ValuationDate = S.ValuationDate, T.load_checksum = S.load_checksum, T.file_name = S._FILE_NAME, T.extract_timestamp = S.extract_timestamp, T.wf_id = S.wf_id
This means that cloning a file and not changing the extract_timestamp in the filename will pick one of the two rows at random. In normal running we would expect subsequent extracts that have updated data to be a source file with a new extract_timetamp. The above query will then pick the newest record to merge into the target table.
Upvotes: 0