Reputation: 65
I want to do SELECT .. FOR UPDATE
to lock a row in a table so it can be updated in a atomic way, as concurrent request of the same type could happen.
I've been doing some testing but its not clear FOR UPDATE
works in a pg-promise.task
. I'm trying to avoid using a pg-promise.tx
as that would require more logic and possibly recursion, both of which I want to avoid as the use case will be high throughput.
Update:
After more research and testing, I've found that using a task
with SELECT .. FOR UPDATE
is giving me an unexpected result. Explanation of code below.
submitUserOnline:(pgdb, u_uuid, socketConnectionList, user_websock) =>{
return new Promise((resolve,reject) =>{
// pgdb.tx({mode} t => {
pgdb.task( t => {
return t.one('SELECT * FROM users WHERE user_uuid = $1', [u_uuid]
.then(user =>{
// nothing happens here right now, but may in future, including for completeness. May cancel request based off some comparisons.
return t.any('SELECT * FROM users_online WHERE user_uuid = $1 FOR UPDATE',[u_uuid]
.then(result =>{
if(result.length > 0){
console.error('duplicate connection found ' + user_websock.uuid);
if(socketConnectionList[result[0].web_sock_uuid] !== undefined){
console.error('drop connection' + result[0].web_sock_uuid);
socketConnectionList[result[0].web_sock_uuid].wsc.close(4020, 'USER_RECONN');
}
console.error('duplicate connection found - update next ' + user_websock.uuid);
return t.none('UPDATE users_online SET web_sock_uuid = $1 WHERE user_uuid = $2', [user_websock, u_uuid])
.then(res =>{
console.error('UPDATE res: ' + res);
})
.catch(err =>{
console.error('UPDATE err: ' + err);});
}else{
// not reached in test case
console.error('no duplicate found ' + user_websock.uuid);
return t.none(INSERT INTO users_online.....ect ect
}
});
})
.then(res =>{
console.error('>>>task/tx res: ' + res);
resolve({msg: "OK"});
})
.catch(err =>{
console.error('>>>task/tx err: ' + err);
if(err.code ==== '40001'){// recursion for when called as 'tx'
console.error('>>>task/tx err - call recurse');
module.exports.submitUserOnline(pgdb, u_uuid, socketConnectionList, user_websock)
.then(res =>{
console.error('>>>task/tx err - call recurse - res ' + res);
resolve({msg: "OK"});
})
.catch( err =>{
console.error('>>>task/tx err - call recurse - err: ' + err);
reject({msg:"FAILED"});
});
}
});
});
}
const mode = new TransactionMode({
tiLevel: isolationLevel.serializable,
readOnly: false,
deferrable: true
});
submitUserOnline
is called by a websocket handler. In my test case I have an 10 element array(same user_uuid) which triggers all client connections in a for loop. Basically its making the connections to the server websocket, that socket checks the users_online
table for the specific user, if that user is already in the table, kill the stale connection and update the web_sock_uuid
in the table, this is where the issues comes in(sometimes it works sometimes not, one run of 10 concurrent connections shows the issue). When the user row web_sock_uuid
is UPDATE
'ed, the other concurrent connections seem to block properly on the SELECT .. FOR UPDATE(SLFU)
, when the UPDATE
executes, then()
doesnt always run before the next SLFU
is released. This seems to present it self in the form of the pending SLFU
returning the old web_sock_uuid
of the row before the prior UPDATE
. In one instance the same stale 'web_sock_uuid' was returned 4 times in a row.
If I switch from the task
method to the tx
method, which requires the recursive calling, the above code works as expected, although it does require many recersions.
Upvotes: 1
Views: 3845
Reputation: 65
I've long since figured this out, but I forgot to post back at the time. The solution for me was to use the tx
method, with no mode options provided. This gives the desired operation. Would explain it more, but the exact details have faded from my mind.
Upvotes: 1
Reputation: 21
The pg-promise
tasks
are simply shared connections. They do not create transactions implicitly.
To use locks in Postgres, you need to create transactions, since all locks created in a transaction are released at the end of the transaction. If you do not create transactions explicitly, each query will be its own separate transaction.
Of course, you do not have to use the tx
method. You could use task
, and manage the transaction yourself.
Upvotes: 0