Reputation: 1260
I'm currently working on a lock-free singly linked list in C++11, and I'm having an issue with my popFront()
function - or I should at least say I know that it will have problems in certain instances.
In any case, this is what I currently have:
std::shared_ptr<T> popFront(void)
{
auto p = atomic_load(&head);
while(p && !atomic_compare_exchange_weak(&head, &p, p->next))
{}
return p ? p->data : std::shared_ptr<T>();
}
Note that head
is of type shared_ptr
.
However, I'm anticipate a few issues. The first being the situation in which two threads are executing popFront()
, they both read the same head
, and one thread finishes first. Before the second thread finishes, the caller deletes the object that was being pointed to, so the second thread is now working with deleted memory. The second issue is the classic ABA problem.
The idea behind this linked list is to have it be lock-free, so I want to avoid imposing a lock within this function. Unfortunately, though, I'm not sure how to address these issues. Any suggestions would be appreciated.
Upvotes: 3
Views: 1656
Reputation: 5344
One thing that helps make threads a lot easier is not freeing memory. If you are working with a bunch of linked list nodes, you might consider a pool of them. Instead of freeing a node, you return it to the pool. That handles part of your problem.
ABA is easy. You need to bump a counter each time you change the head pointer. You need to atomically write this counter with the pointer at the same time. If using 32 bit addressing, use the 64 bit compare and swap (CAS), and store the counter in the extra 32 bits. If using 64 bit addressing, avoid 128 bit compare and swap because it might be slow (on our Xenon chips, anything up to 64 bit is fast). Since windows and Linux both do not support full 64 bit addressing you can get away with using some of the 64 bits for the ABA. I use unions to do this for both 32 and 64 bit addressing modes.
You don't need to count every change. You just need to catch every change. Even when trying to cause ABA with lots of threads changing the head as fast as possible, it will still rarely happen. In real life it rarely rarely rarely happens. I.e. you don't need to count very high. I usually use 4 bits and let it roll. I might get in trouble for that. Use more if you want.
In this example, I assumed 64 bits, and used CAS() for compare and swap, so you will have to substitute whatever your compiler uses for CAS:
typedef unsigned __int64_t U8;
struct TNode {
TNode* m_pNext;
};
template<class T>
union THead {
struct {
U8 m_nABA : 4,
m_pNode:60; // Windows only supports 44 bits addressing anyway.
};
U8 m_n64; // for CAS
// this constructor will make an atomic copy on intel
THead(THead& r) { m_n64 = r.m_n64; }
T* Node() { return (T*)m_pNode; }
// changeing Node bumps aba
void Node(T* p) { m_nABA++; m_pNode = (U8)p; return this; }
};
// pop pNode from head of list.
template<class T>
T* Pop(volatile THead<T>& Head) {
while (1) { // race loop
// Get an atomic copy of head and call it old.
THead<T> Old(Head);
if (!Old.Node())
return NULL;
// Copy old and call it new.
THead<T> New(Old);
// change New's Node, which bumps internal aba
New.Node(Old.Node()->m_pNext);
// compare and swap New with Head if it still matches Old.
if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
return Old.Node(); // success
// race, try again
}
}
// push pNode onto head of list.
template<class T>
void Push(volatile THead<T>& Head, T* pNode) {
while (1) { // race loop
// Get an atomic copy of head and call it old.
// Copy old and call it new.
THead<T> Old(Head), New(Old);
// Wire node t Head
pNode->m_pNext = New.Node();
// change New's head ptr, which bumps internal aba
New.Node(pNode);
// compare and swap New with Head if it still matches Old.
if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
break; // success
// race, try again
}
}
Upvotes: 1
Reputation: 4410
There are a lot of solutions for designing a lock-free queue without the ABA problem.
This article should provide a couple of insights and some general tools for solving this problems can be found here.
Now, on the described problems that you mentioned:
Before the second thread finishes, the caller deletes the object that was being pointed to, so the second thread is now working with deleted memory
Yep, is can happen and a solution for this is to use tagged pointers: on 32 bit architectures the last 2 (or more) bits are not used so they can be used for tagging and on 64-bit architectures we have at least 3 unused bits.
Thus we can set as logically deleted a pointer, but not physically delete it by setting some unused bits of a pointer, like this:
__inline struct node* setTag(struct node* p, unsigned long TAG)
{
return (struct node*) ((uintptr_t)p | TAG);
}
__inline bool isTagged(struct node* p, unsigned long TAG)
{
return (uintptr_t)p == (uintptr_t)p & ~TAG;
}
__inline struct node* getUntaggedAddress(struct node* p, unsigned long TAG)
{
return (struct node*)((uintptr_t)p & ~TAG);
}
where TAG is up to 4 (for 32 bit architectures) and up to 8 on 64-bit architectures (2/3 or more unused bits depending on the computer architecture and word alignment).
Now when doing a CAS, we disregard tagged pointers => thus operating on valid pointers only.
When doing a dequeue on a queue we can perform as follows:
int dequeue(qroot* root)
{
qnode* oldHead;
do
{
oldHead = root->head;
if (isTagged(root->head)) //disregard tagged addresses
return NULL;
oldHead = getUntaggedAddress(root->head); //we do a CAS only if the old head was unchanged
} while (root->head.compare_exchange_strong(oldHead, oldHead->next, std::memory_order_seq_cst));
return &(oldHead->data);
}
given
typedef struct qnode
{
std::atomic<qnode*> next;
int data;
}qnode;
typedef struct qroot
{
std::atomic<qnode*> head; //Dequeue and peek will be performed from head
std::atomic<qnode*> tail; //Enqueue will be performed to tail
}qroot;
Upvotes: 1