Reputation: 67248
This is a async function that does a sqlite transaction:
async transaction<V>(done: (conn: this) => V): Promise<V> {
this.depth += 1;
await this.execute(`SAVEPOINT tt_${this.depth}`);
try {
return await done(this);
} catch (err) {
await this.execute(`ROLLBACK TO tt_${this.depth}`);
throw err;
} finally {
await this.execute(`RELEASE tt_${this.depth}`);
this.depth -= 1;
}
}
It can be nested, meaning that inside done
I can call transaction() .
But calls on the same level cannot be made in parallel, for eg.
promise.all([
db.transaction(...),
db.transaction(...),
]);
because it messes up the save/release in sqlite apparently.
example above is just for simplicity, in real world parallel calls happen when 2 or more requests reach the server at same time, and all reqs use the same db
instance
any way to detect inside the function if there is another call of the function running on same level at same time ?
In case anyone is interested in the solution, I managed to do it with a "wrapper" class.
class DbInstance{
private depth = 0;
....
async transaction<V>(done: (conn: this) => V): Promise<V> {
this.depth += 1;
await this.execute(`SAVEPOINT tt_${this.depth}`);
try {
return await done(this);
} catch (err) {
await this.execute(`ROLLBACK TO tt_${this.depth}`);
throw err;
} finally {
await this.execute(`RELEASE tt_${this.depth}`);
this.depth -= 1;
}
}
}
class Db{
private dbInstance: DbInstance;
private activeTrans?: Promise<any>;
constructor(){
this.dbInstance = new DbInstance();
}
transaction<T>(done: (conn: DbInstance) => Promise<T>): Promise<T> {
if(this.activeTrans){
await this.activeTrans;
return this.transaction(done);
}
this.activeTrans = this.dbInstance.transaction(done);
try{
return await this.activeTrans;
}catch(err){
throw err;
}finally{
this.activeTrans = undefined;
}
}
}
export const db = new Db();
db
is exposed everywhere, but dbInstance is only exposed in the transaction callback, so you can have nested transactions using dbInstance.
downside is that both classes have to implement same methods...
Upvotes: 4
Views: 319
Reputation: 17477
You ask only how a conflict can be detected, but not how it can be resolved. The following program illustrates possible answers to both questions.
To have something executable without a database, I have replaced your problem with the following metaphor:
work
).wait
method, which waits a random amount of time.)step
up one rung, attempt the task there, and step
down again.const rungs = " ".split("");
class Worker {
constructor(label) {
this.rung = 0;
this.label = label;
}
async work() {
console.log(rungs.join(""));
if (this.rung < rungs.length - 1 &&
Math.random() < 0.7) {
await wait();
await this.step(1);
await this.work();
await this.step(-1);
} else
await wait(); // Simulate the task
console.log(rungs.join(""));
}
async step(dir) {
if (rungs[this.rung + dir] !== " ") {
console.log("Conflict on rung " + (this.rung + dir));
await wait();
await this.step(dir);
} else {
rungs[this.rung] = " ";
this.rung += dir;
if (this.rung > 0)
rungs[this.rung] = this.label;
}
}
}
async function wait() {
return new Promise(function(resolve, reject) {
setTimeout(resolve, 100 + 100 * Math.random());
});
}
new Worker("X").work();
new Worker("Y").work();
Conflicts are detected by maintaining the position of all workers in a global array rungs
. In order to resolve such a conflict, the workers wait
(a random amount of time) and try again.
This is a very rudimentary resolution strategy: If Y wants to step up from rung 1 to 2 and X wants to step down from 2 to 1, they will block each other forever. (And such a situation occurs whenever Y has to step higher than X.) But perhaps in your real problem such a situation does not occur, because the RELEASE
of a SAVEPOINT
is not comparable to a "step-down", but is rather as if workers jump off the ladder after completing their task. You could simulate this by making the following changes to the code
--- } else
+++ } else {
await wait(); // Simulate the task
+++ this.jumpedOff = true;
+++ }
...
--- if (rungs[this.rung + dir] !== " ") {
+++ if (!this.jumpedOff && rungs[this.rung + dir] !== " ") {
Devising a good resolution strategy requires more knowledge about your program and the database.
Upvotes: 1
Reputation: 186
You could try a locking mechanism on the database instance to track parallel calls. In the following, depth
tracks how deep a transaction is nested. If a function calls itself recursively inside another transaction, depth
increases. activeTransactions
tracks how many top-level transactions are active in parallel. The issue arises when multiple transactions start at depth
0, or parallel calls, but nested transactions should still be allowed.
private depth = 0; // tracks nested levels
private activeTopLevelTransactions = 0; // tracks only top-level parallel transactions
async transaction<V>(done: (conn: this) => Promise<V>): Promise<V> {
const isTopLevel = this.depth === 0; // only first call at level 0 is "top-level"
if (isTopLevel && this.activeTopLevelTransactions > 0) {
throw new Error("Parallel transactions are not allowed!");
}
if (isTopLevel) {
this.activeTopLevelTransactions += 1; // incremented only for top-level transactions
}
this.depth += 1; // always increase depth
await this.execute(`SAVEPOINT tt_${this.depth}`);
try {
return await done(this);
} catch (err) {
await this.execute(`ROLLBACK TO tt_${this.depth}`);
throw err;
} finally {
await this.execute(`RELEASE tt_${this.depth}`);
this.depth -= 1;
if (isTopLevel) {
this.activeTopLevelTransactions -= 1; // only decrement for top-level transactions
}
}
Alternatively, you could try using a WeakMap
to track concurrent instances or you could use the async-mutex library to do the same thing.
Upvotes: 1
Reputation: 2169
You can track concurrent transaction()
calls using a Map<number, number>
, where the key is the depth and the value is the active count. Increment on start, check for existing transactions, and decrement on completion.
class Database {
private depth = 0;
private activeTransactions = new Map<number, Set<Promise<void>>>();
async transaction<V>(done: (conn: this) => Promise<V>): Promise<V> {
const depth = ++this.depth;
const activeAtDepth = this.activeTransactions.get(depth) ?? new Set();
if (activeAtDepth.size > 0) {
throw new Error(`Concurrent transactions at depth ${depth} are not allowed.`);
}
let transactionPromise!: Promise<V>;
transactionPromise = (async () => {
try {
await this.execute(`SAVEPOINT tt_${depth}`);
return await done(this);
} catch (err) {
await this.execute(`ROLLBACK TO tt_${depth}`);
throw err;
} finally {
await this.execute(`RELEASE tt_${depth}`);
activeAtDepth.delete(transactionPromise);
if (!activeAtDepth.size) this.activeTransactions.delete(depth);
this.depth--;
}
})();
activeAtDepth.add(transactionPromise);
this.activeTransactions.set(depth, activeAtDepth);
return transactionPromise;
}
async execute(query: string) {
console.log("Executing:", query);
// ...
}
}
// test code
const db = new Database();
async function test() {
await db.transaction(async (conn) => {
console.log("Transaction 1 started");
// Allowed: Nested transaction (same instance)
await conn.transaction(async () => {
console.log("Nested transaction");
});
console.log("Transaction 1 finished");
});
// Not Allowed: Parallel transactions at the same level
await Promise.all([
db.transaction(async () => console.log("Transaction A")),
db.transaction(async () => console.log("Transaction B")),
]);
}
test().catch(console.error);
Upvotes: 1