Gaurav Shewale
Gaurav Shewale

Reputation: 33

How can i call google-bigquery delete and insert API's synchronously?

I am maintaining a database of transaction records which change data periodically.

i have a cron running every half an hour pulling latest transactions from main database and feeding to my express node app (i am pretty new to node), i am deleting old transactions which match with incoming transaction's order number first then insert the latest one into big query table.

after running the app for a day i am getting duplicate transactions in my database. even after checking logs i don't see delete api failing anywhere no idea how and where duplicates are coming from.

i am using @google-cloud/bigquery: ^2.0.2 , i am deleting and inserting data into bigquery tables using query api.

i have tried using streaming inserts but it won't allow me to delete the recently inserted rows until 90 minutes which won't work in my case.

My index.js
let orderNumbers = '';

                    rows.map(function (value) {
                        orderNumbers += "'" + value.Order_Number+ "',";
                    });

                    orderNumbers = orderNumbers.slice(0, -1);

                    await functions.deleteAllWhere('Order_Number', orderNumbers);

                        let chunkedRowsArray = _.chunk(rows, CONSTANTS.chunkSize);


                        let arrSize = chunkedRowsArray.length;
                        for (var i = 0; i < arrSize; i++) {
                            let insertString = '';

                            chunkedRowsArray[i].forEach(element => {
                                let values = '(';
                                Object.keys(element).forEach(function (key) {
                                    if (typeof element[key] == 'string') {
                                        values += '"' + element[key] + '",';
                                    } else {
                                        values += element[key] + ",";
                                    }
                                });
                                values = values.slice(0, -1);
                                values += '),';
                                insertString += values;
                            });
                            insertString = insertString.slice(0, -1);

                            let rs = await functions.bulkInsert(insertString,i);
                        }

delete function call

await functions.deleteAllWhere('Order_Number', orderNumbers);

module.exports.deleteAllWhere = async (conditionKey, params) => {

const DELETEQUERY = `
DELETE FROM
\`${URI}\` 
WHERE ${conditionKey}
IN
(${params})`;

const options = {
    query: DELETEQUERY,
    timeoutMs: 300000,
    useLegacySql: false, // Use standard SQL syntax for queries.
};

// // Runs the query
return await bigquery.query(options);
};

similarly building insert query with values by chunk of 200 in insert function.

I need to write a synchronous node program which deletes some rows first and after successful deletion of rows insert the new ones.

I have no idea if this is caused by async nature of code or something is up with bigquery or the stored procedure is buggy from which i am getting the data.

Sorry for this long post i am new to node and stack overflow.

any help is appreciated.

Upvotes: 0

Views: 1103

Answers (1)

Pentium10
Pentium10

Reputation: 208042

Regarding BigQuery integration, you should arhitect your data flow in such way to let every new row in BigQuery table. Then have queries that return only newest row, which is easy to do if you have a field to order by most recent row.

You can schedule BigQuery queries that maintain a materialized table of this cleanup data. So in the end you end up having two tables one that you stream into all rows, one that is materialized to retain only the newest.

Upvotes: 2

Related Questions