nunya07
nunya07

Reputation: 65

Does 'SELECT FOR UPDATE' work in pg-promise task?

I want to do SELECT .. FOR UPDATE to lock a row in a table so it can be updated in a atomic way, as concurrent request of the same type could happen.

I've been doing some testing but its not clear FOR UPDATE works in a pg-promise.task. I'm trying to avoid using a pg-promise.tx as that would require more logic and possibly recursion, both of which I want to avoid as the use case will be high throughput.

Update: After more research and testing, I've found that using a task with SELECT .. FOR UPDATE is giving me an unexpected result. Explanation of code below.

submitUserOnline:(pgdb, u_uuid, socketConnectionList, user_websock) =>{
  return new Promise((resolve,reject) =>{
// pgdb.tx({mode} t => {
   pgdb.task( t => {
    return t.one('SELECT * FROM users WHERE user_uuid = $1', [u_uuid]
    .then(user =>{
// nothing happens here right now, but may in future, including for completeness. May cancel request based off some comparisons.
     return t.any('SELECT * FROM users_online WHERE user_uuid = $1 FOR UPDATE',[u_uuid]
     .then(result =>{
      if(result.length > 0){
        console.error('duplicate connection found ' + user_websock.uuid);
        if(socketConnectionList[result[0].web_sock_uuid] !== undefined){
          console.error('drop connection' + result[0].web_sock_uuid);
          socketConnectionList[result[0].web_sock_uuid].wsc.close(4020, 'USER_RECONN');
        }
        console.error('duplicate connection found - update next ' + user_websock.uuid);
        return t.none('UPDATE users_online SET web_sock_uuid = $1 WHERE user_uuid = $2', [user_websock, u_uuid])
        .then(res =>{
          console.error('UPDATE res: ' + res);
        })
        .catch(err =>{
          console.error('UPDATE err: ' + err);});

       }else{
         // not reached in test case
         console.error('no duplicate found ' + user_websock.uuid);
         return t.none(INSERT INTO  users_online.....ect ect
       }
      });
   })
   .then(res =>{
     console.error('>>>task/tx res: ' + res);
     resolve({msg: "OK"});
   })
   .catch(err =>{
     console.error('>>>task/tx err: ' + err);
     if(err.code ==== '40001'){// recursion for when called as 'tx'
       console.error('>>>task/tx err - call recurse');
       module.exports.submitUserOnline(pgdb, u_uuid, socketConnectionList, user_websock)
       .then(res =>{
         console.error('>>>task/tx err - call recurse - res ' + res);
         resolve({msg: "OK"});
        })
        .catch( err =>{
          console.error('>>>task/tx err - call recurse - err: ' + err);
          reject({msg:"FAILED"});
        });
     }
    });
  });
}
const mode = new TransactionMode({
    tiLevel: isolationLevel.serializable,
    readOnly: false,
    deferrable: true
});

submitUserOnline is called by a websocket handler. In my test case I have an 10 element array(same user_uuid) which triggers all client connections in a for loop. Basically its making the connections to the server websocket, that socket checks the users_online table for the specific user, if that user is already in the table, kill the stale connection and update the web_sock_uuid in the table, this is where the issues comes in(sometimes it works sometimes not, one run of 10 concurrent connections shows the issue). When the user row web_sock_uuid is UPDATE'ed, the other concurrent connections seem to block properly on the SELECT .. FOR UPDATE(SLFU), when the UPDATE executes, then() doesnt always run before the next SLFU is released. This seems to present it self in the form of the pending SLFU returning the old web_sock_uuid of the row before the prior UPDATE. In one instance the same stale 'web_sock_uuid' was returned 4 times in a row.

If I switch from the task method to the tx method, which requires the recursive calling, the above code works as expected, although it does require many recersions.

Upvotes: 1

Views: 3845

Answers (2)

nunya07
nunya07

Reputation: 65

I've long since figured this out, but I forgot to post back at the time. The solution for me was to use the tx method, with no mode options provided. This gives the desired operation. Would explain it more, but the exact details have faded from my mind.

Upvotes: 1

boromisp
boromisp

Reputation: 21

The pg-promise tasks are simply shared connections. They do not create transactions implicitly.

To use locks in Postgres, you need to create transactions, since all locks created in a transaction are released at the end of the transaction. If you do not create transactions explicitly, each query will be its own separate transaction.

Of course, you do not have to use the tx method. You could use task, and manage the transaction yourself.

Upvotes: 0

Related Questions