Roman Vinogradov
Roman Vinogradov

Reputation: 145

Atomic transactions with Redis Object Mapper?

I've been using josiahcarlson's Redis Object Mapper (ROM) to work with my data in Redis. I need certain operations to be done as transactions, i.e. either all writes are done or none are done (auto rollback in case of any error or shutdown).

From what I could find online, Josiah had a conversation with the maintainer of Redis, who didn't agree to accept the PR for transactions back in 2015. What is the situation now? I couldn't find anything about this in the docs nor in Google.

How do I implement atomicity for my operations with ROM? Is there a way?

Upvotes: 1

Views: 215

Answers (2)

Josiah
Josiah

Reputation: 727

It was not previously built into rom (the earlier version). If you want to go fast and loose, we'll keep a temp transaction id in a common place (which you can verify afterwards, if you need to). If you want to go official, you will want a transaction history / ledger, which will take a little more work (you can use the ledger model for storing value, then mark it as done as part of the transaction).

If you want to use built-in functionality plus your own model data to transfer, you can use something like we have in the tests: https://github.com/josiahcarlson/rom/blob/1.0.0/test/test_rom.py#L1759

The function below will transfer a value, and refresh your local models. Returns (successful_transfer, optional_message).

def transfer(source, dest, column, value, txn_set, txn_id, timeout=1, column_indexed=False):
    source_key = source._pk
    dest_key = dest._pk
    # assume source and dest have same connection
    c = source._connection.pipeline(True)

    assert value > 0, value

    end = time.time() + timeout
    while time.time() < end:
        c.watch(source_key, dest_key, txn_set)
        if c.sismember(txn_set, txn_id):
            return False, "already sent"

        v = type(value)(c.hget(source, column) or 0)
        if v < value:
            c.discard()
            return False, "not enough credit"

        try:
            c.multi()
            c.hincrby(source_key, column, -value)
            c.hincrby(dest_key, column, value)
            c.sadd(txn_set, txn_id)
            r = c.execute()
            assert not r[-1], r
            source.refresh()
            dest.refresh()
            bd = []
            if column_indexed:
                try:
                    source.save(full=True)
                except:
                    bd.append("source index update failed")
                try:
                    dest.save(full=True)
                except:
                    bd.append("dest index update failed")
            if bd:
                bd.append("value transferred")
            
            return True, ", ".join(bd)

        except redis.exceptions.WatchError:
            continue

    return False, "timed out from data race condition"

Upvotes: 1

Gavin Wong
Gavin Wong

Reputation: 1260

There are multiple functions in ROM which uses atomicity. Here is a link with the documentation of all the functions in ROM:

https://readthedocs.org/projects/redis-py/downloads/pdf/latest/

As seen, functions such as:

getset()
pipeline()
rpoplpush()
smove()

All allow you to implement atomicity in the Redits Object Mapper.

Hope this helps!

Upvotes: 0

Related Questions