Reputation: 145
I've been using josiahcarlson's Redis Object Mapper (ROM) to work with my data in Redis. I need certain operations to be done as transactions, i.e. either all writes are done or none are done (auto rollback in case of any error or shutdown).
From what I could find online, Josiah had a conversation with the maintainer of Redis, who didn't agree to accept the PR for transactions back in 2015. What is the situation now? I couldn't find anything about this in the docs nor in Google.
How do I implement atomicity for my operations with ROM? Is there a way?
Upvotes: 1
Views: 215
Reputation: 727
It was not previously built into rom (the earlier version). If you want to go fast and loose, we'll keep a temp transaction id in a common place (which you can verify afterwards, if you need to). If you want to go official, you will want a transaction history / ledger, which will take a little more work (you can use the ledger model for storing value, then mark it as done as part of the transaction).
If you want to use built-in functionality plus your own model data to transfer, you can use something like we have in the tests: https://github.com/josiahcarlson/rom/blob/1.0.0/test/test_rom.py#L1759
The function below will transfer a value, and refresh your local models. Returns (successful_transfer, optional_message)
.
def transfer(source, dest, column, value, txn_set, txn_id, timeout=1, column_indexed=False):
source_key = source._pk
dest_key = dest._pk
# assume source and dest have same connection
c = source._connection.pipeline(True)
assert value > 0, value
end = time.time() + timeout
while time.time() < end:
c.watch(source_key, dest_key, txn_set)
if c.sismember(txn_set, txn_id):
return False, "already sent"
v = type(value)(c.hget(source, column) or 0)
if v < value:
c.discard()
return False, "not enough credit"
try:
c.multi()
c.hincrby(source_key, column, -value)
c.hincrby(dest_key, column, value)
c.sadd(txn_set, txn_id)
r = c.execute()
assert not r[-1], r
source.refresh()
dest.refresh()
bd = []
if column_indexed:
try:
source.save(full=True)
except:
bd.append("source index update failed")
try:
dest.save(full=True)
except:
bd.append("dest index update failed")
if bd:
bd.append("value transferred")
return True, ", ".join(bd)
except redis.exceptions.WatchError:
continue
return False, "timed out from data race condition"
Upvotes: 1
Reputation: 1260
There are multiple functions in ROM which uses atomicity. Here is a link with the documentation of all the functions in ROM:
https://readthedocs.org/projects/redis-py/downloads/pdf/latest/
As seen, functions such as:
getset()
pipeline()
rpoplpush()
smove()
All allow you to implement atomicity in the Redits Object Mapper.
Hope this helps!
Upvotes: 0