Reputation: 8497
So I'm attempting to create copy-on-write map that uses an attempt at atomic reference counting on the read-side to not have locking.
Something isn't quite right. I see some references getting over-incremented and some are going down negative, so something isn't really atomic. In my tests I have 10 reader threads looping 100 times each doing a get() and 1 writer thread doing 100 writes.
It gets stuck in the writer because some of the references never go down to zero, even though they should.
I'm attempting to use the 128-bit DCAS technique laid explained by this blog.
Is there something blatantly wrong with this or is there an easier way to debugging this rather than playing with it in the debugger?
typedef std::unordered_map<std::string, std::string> StringMap;
static const int zero = 0; //provides an l-value for asm code
class NonBlockingReadMapCAS {
public:
class OctaWordMapWrapper {
public:
StringMap* fStringMap;
//std::atomic<int> fCounter;
int64_t fCounter;
OctaWordMapWrapper(OctaWordMapWrapper* copy) : fStringMap(new StringMap(*copy->fStringMap)), fCounter(0) { }
OctaWordMapWrapper() : fStringMap(new StringMap), fCounter(0) { }
~OctaWordMapWrapper() {
delete fStringMap;
}
/**
* Does a compare and swap on an octa-word - in this case, our two adjacent class members fStringMap
* pointer and fCounter.
*/
static bool inline doubleCAS(OctaWordMapWrapper* target, StringMap* compareMap, int64_t compareCounter, StringMap* swapMap, int64_t swapCounter ) {
bool cas_result;
__asm__ __volatile__
(
"lock cmpxchg16b %0;" // cmpxchg16b sets ZF on success
"setz %3;" // if ZF set, set cas_result to 1
: "+m" (*target),
"+a" (compareMap), //compare target's stringmap pointer to compareMap
"+d" (compareCounter), //compare target's counter to compareCounter
"=q" (cas_result) //results
: "b" (swapMap), //swap target's stringmap pointer with swapMap
"c" (swapCounter) //swap target's counter with swapCounter
: "cc", "memory"
);
return cas_result;
}
OctaWordMapWrapper* atomicIncrementAndGetPointer()
{
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))
return this;
else
return NULL;
}
OctaWordMapWrapper* atomicDecrement()
{
while(true) {
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter -1))
break;
}
return this;
}
bool atomicSwapWhenNotReferenced(StringMap* newMap)
{
return doubleCAS(this, this->fStringMap, zero, newMap, 0);
}
}
__attribute__((aligned(16)));
std::atomic<OctaWordMapWrapper*> fReadMapReference;
pthread_mutex_t fMutex;
NonBlockingReadMapCAS() {
fReadMapReference = new OctaWordMapWrapper();
}
~NonBlockingReadMapCAS() {
delete fReadMapReference;
}
bool contains(const char* key) {
std::string keyStr(key);
return contains(keyStr);
}
bool contains(std::string &key) {
OctaWordMapWrapper *map;
do {
map = fReadMapReference.load()->atomicIncrementAndGetPointer();
} while (!map);
bool result = map->fStringMap->count(key) != 0;
map->atomicDecrement();
return result;
}
std::string get(const char* key) {
std::string keyStr(key);
return get(keyStr);
}
std::string get(std::string &key) {
OctaWordMapWrapper *map;
do {
map = fReadMapReference.load()->atomicIncrementAndGetPointer();
} while (!map);
//std::cout << "inc " << map->fStringMap << " cnt " << map->fCounter << "\n";
std::string value = map->fStringMap->at(key);
map->atomicDecrement();
return value;
}
void put(const char* key, const char* value) {
std::string keyStr(key);
std::string valueStr(value);
put(keyStr, valueStr);
}
void put(std::string &key, std::string &value) {
pthread_mutex_lock(&fMutex);
OctaWordMapWrapper *oldWrapper = fReadMapReference;
OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
std::pair<std::string, std::string> kvPair(key, value);
newWrapper->fStringMap->insert(kvPair);
fReadMapReference.store(newWrapper);
std::cout << oldWrapper->fCounter << "\n";
while (oldWrapper->fCounter > 0);
delete oldWrapper;
pthread_mutex_unlock(&fMutex);
}
void clear() {
pthread_mutex_lock(&fMutex);
OctaWordMapWrapper *oldWrapper = fReadMapReference;
OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
fReadMapReference.store(newWrapper);
while (oldWrapper->fCounter > 0);
delete oldWrapper;
pthread_mutex_unlock(&fMutex);
}
};
Upvotes: 7
Views: 812
Reputation: 1918
Maybe not the answer but this looks suspicious to me:
while (oldWrapper->fCounter > 0);
delete oldWrapper;
You could have a reader thread just entering atomicIncrementAndGetPointer()
when the counter is 0 thus pulling the rug underneath the reader thread by deleting the wrapper.
Edit to sum up the comments below for potential solution:
The best implementation I'm aware of is to move fCounter
from OctaWordMapWrapper
to fReadMapReference
(You don't need the OctaWordMapWrapper
class at all actually). When the counter is zero swap the pointer in your writer. Because you can have high contention of reader threads which essentially blocks the writer indefinitely you can have highest bit of fCounter
allocated for reader lock, i.e. while this bit is set the readers spin until the bit is cleared. The writer sets this bit (__sync_fetch_and_or()
) when it's about to change the pointer, waits for the counter to fall down to zero (i.e. existing readers finish their work) and then swap the pointer and clears the bit.
This approach should be waterproof, though it's obviously blocking readers upon writes. I don't know if this is acceptable in your situation and ideally you would like this to be non-blocking.
The code would look something like this (not tested!):
class NonBlockingReadMapCAS
{
public:
NonBlockingReadMapCAS() :m_ptr(0), m_counter(0) {}
private:
StringMap *acquire_read()
{
while(1)
{
uint32_t counter=atom_inc(m_counter);
if(!(counter&0x80000000))
return m_ptr;
atom_dec(m_counter);
while(m_counter&0x80000000);
}
return 0;
}
void release_read()
{
atom_dec(m_counter);
}
void acquire_write()
{
uint32_t counter=atom_or(m_counter, 0x80000000);
assert(!(counter&0x80000000));
while(m_counter&0x7fffffff);
}
void release_write()
{
atom_and(m_counter, uint32_t(0x7fffffff));
}
StringMap *volatile m_ptr;
volatile uint32_t m_counter;
};
Just call acquire/release_read/write() before & after accessing the pointer for read/write. Replace atom_inc/dec/or/and()
with __sync_fetch_and_add()
, __sync_fetch_and_sub()
, __sync_fetch_and_or()
and __sync_fetch_and_and()
respectively. You don't need doubleCAS()
for this actually.
As noted correctly by @Quuxplusone in a comment below this is single producer & multiple consumer implementation. I modified the code to assert properly to enforce this.
Upvotes: 3
Reputation: 2715
Another user has suggested a similar approach, but if you are compiling with gcc (and perhaps with clang), you could use the intrinsic __sync_add_and_fetch_4 which does something similar to what your assembly code does, and is likely much more portable. I have used it when I implemented refcounting in an Ada library (but the algorithm remains the same).
int __sync_add_and_fetch_4 (int* ptr, int value);
// increments the value pointed to by ptr by value, and returns the new value
Upvotes: 2
Reputation: 27350
Well, there are probably lots of problems, but here are the obvious two.
The most trivial bug is in atomicIncrementAndGetPointer
. You wrote:
if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))
That is, you're attempting to increment this->fCounter
in a lock-free way. But it doesn't work, because you're fetching the old value twice with no guarantee that the same value is read each time. Consider the following sequence of events:
this->fCounter
(with value 0) and computes argument 5 as this->fCounter +1
= 1
.this->fCounter
(with value 1) and computes argument 3 as this->fCounter
= 1
.doubleCAS(this, this->fStringMap, 1, this->fStringMap, 1)
. It succeeds, of course, but we've lost the "increment" we were trying to do.What you wanted is more like
StringMap* oldMap = this->fStringMap;
int64_t oldCounter = this->fCounter;
if (doubleCAS(this, oldMap, oldValue, oldMap, oldValue+1))
...
The other obvious problem is that there's a data race between get
and put
. Consider the following sequence of events:
get
: it fetches fReadMapReference.load()
and prepares to execute atomicIncrementAndGetPointer
on that memory address.put
: it deletes that memory address. (It is within its rights to do so, because the wrapper's reference count is still at zero.)atomicIncrementAndGetPointer
on the deleted memory address. If you're lucky, you segfault, but of course in practice you probably won't.As explained in the blog post:
The garbage collection interface is omitted, but in real applications you would need to scan the hazard pointers before deleting a node.
Upvotes: 2
Reputation: 4490
Although I'm not sure how your reader threads work, I suspect your problem is that you are not catching and handling possible out_of_range
exceptions in your get()
method that might arise from this line: std::string value = map->fStringMap->at(key);
. Note that if key
is not found in the map, this will throw, and exit the function without decrementing the counter, which would lead to the condition you describe (of getting stuck in the while-loop within the writer thread while waiting for the counters to decrement).
In any event, whether this is the cause of the issues you're seeing or not, you definitely need to either handle this exception (and any others) or modify your code such that there's no risk of a throw. For the at()
method, I would probably just use find()
instead, and then check the iterator it returns. However, more generally, I would suggest using the RAII pattern to ensure that you don't let any unexpected exceptions escape without unlocking/decrementing. For example, you might check out boost::scoped_lock
to wrap your fMutex
and then write something simple like this for the OctaWordMapWrapper increment/decrement:
class ScopedAtomicMapReader
{
public:
explicit ScopedAtomicMapReader(std::atomic<OctaWordMapWrapper*>& map) : fMap(NULL) {
do {
fMap = map.load()->atomicIncrementAndGetPointer();
} while (NULL == fMap);
}
~ScopedAtomicMapReader() {
if (NULL != fMap)
fMap->atomicDecrement();
}
OctaWordMapWrapper* map(void) {
return fMap;
}
private:
OctaWordMapWrapper* fMap;
}; // class ScopedAtomicMapReader
With something like that, then for example, your contains()
and get()
methods would simplify to (and be immune to exceptions):
bool contains(std::string &key) {
ScopedAtomicMapReader mapWrapper(fReadMapReference);
return (mapWrapper.map()->fStringMap->count(key) != 0);
}
std::string get(std::string &key) {
ScopedAtomicMapReader mapWrapper(fReadMapReference);
return mapWrapper.map()->fStringMap->at(key); // Now it's fine if this throws...
}
Finally, although I don't think you should have to do this, you might also try declaring fCounter
as volatile
as well (given your access to it in the while-loop in the put()
method will be on a different thread than the writes to it on the reader threads.
Hope this helps!
By the way, one other minor thing: fReadMapReference
is leaking. I think you should delete this in your destructor.
Upvotes: 0