user997112
user997112

Reputation: 30615

Two threads deleting from a lock-free stack

I am referring to page 185 and 186 in C++ Concurrency in Action. They give the following code as a method for a lock-free stack:

void push(T const& data){
    node* const new_node = new node(data);
    new_node->next=head.load();
    while(!head.compare_exchange_weak(new_node->next, new_node));
}

and on P186 it says the following:

Of course, now that you have a means of adding data to the stack, you need a way of getting it off again. On the face of it, this is quite simple:

  1. Read the current value of head
  2. Read head->next
  3. Set head to head->next
  4. Return the data from the retrieved node
  5. Delete the retrieved node

However, in the presence of multiple threads, this isn't so simple. If there are two threads removing items from the stack they both might read the same value of head at step 1. If one thread then proceeds all the way through to step 5 before the other gets to step 2, the second thread will be dereferencing a dangling pointer.

I thought that compare_exchange_weak() could be used to essentially complete step 2 and 3 atomically and the second thread could see that head->next is no longer valid?

I am surprised we cannot use CAS to solve the above problem??

Upvotes: 0

Views: 365

Answers (1)

johnnycrash
johnnycrash

Reputation: 5344

When popping an item off, you don't change item->next, you change head to point to item->next. You are not allowed to change an item till you pop it off the list. If another thread changes item->next, it means they have popped it off the list, which means the ABA version will be different (i'm assuming you use an ABA check in linked lists like this) and probably (which is why you use ABA - because of the "probably" here) head will be different, so your CAS will fail.

Notes about ABA, if you never put an item back on the list once it's popped, and you never free any items (which would allow an item's memory to be reused and stuck back on the list), then you don't need the ABA check. However, it only takes a few bits to implement ABA, because you only need as many versions as could possibly happen in the time it takes to pop an item off the list. I like at least 16, but that might be cutting it close. Every time you add an item change the ABA version. Every time you remove an item, change the ABA version.

Other important notes. Items in an atomic list like this are not meant to be traversed using the head/next that are being manipulated atomically. This is because you never know if someone popped the item off and changed next. I have found that if you change next to something else in the list, say to skip over an item, this can work in some situations. Items in an atomic list like this are never meant to be added or removed anywhere other than the head of the list. You cannot make this work (easily...never say never) with atomic operations.

Here is an example that works on msvc 2010. This uses Microsoft's version of what volatile means. You could use std::atomic if your compiler was iso. NOTE the use of union. union CAtomicLinkList is how we atomically set more than one field at a time.

template <typename T> class CAtomicListItem : public T {
  typedef CAtomicListItem<T> CItem;
public:
  CItem*      m_pNext;

  CItem()  : m_pNext(NULL)                               { }
  CItem(T &r) : T(r),  m_pNext(NULL)                     { }
};

template <typename T> union CAtomicLinkList {
  typedef CAtomicLinkList<T> CList;
  typedef CAtomicListItem<T> CItem;
public:
  struct {
    QWORD     m_pHead : 44,    // Addr of first item.  Windows only uses 8tb
                      :  4,    // extra space.
              m_nABA  : 16;    // Version to prevent ABA.  We don't need this 
                               // many bits so there is room for expansion.
  };
  QWORD        m_n64;          // atomically update this struct by CAS on this field.

  CList() : m_n64(0)                                     { }
  // These constructors are for making copies.  These cannot threadsafe 
  // update a shared instance.
  CList(volatile const CList& r) : m_n64(r.m_n64)        { }

  void Push(CItem *pItem) volatile {
    while (1) {
      CList Old(*this), New(Old);
      New.m_pHead = UINT_PTR(pItem);
      New.m_nABA++;  // whenever you change the list, change the version
      pItem->m_pNext = (CItem*)Old.m_pHead;
      if (CAS(&m_n64, Old.m_n64, New.m_n64))
        return; // success
    }
  }

  CItem* Pop() volatile {
    while (1) {
      CList Old(*this);
      if (!Old.m_pHead)
        return NULL;
      CList New(Old);
      CItem* pItem = (CItem*)Old.m_pHead;
      New.m_pHead = UINT_PTR(pItem->m_pNext);
      New.m_nABA++; // whenever you change the list, change the version
      if (CAS(&m_n64, Old.m_n64, New.m_n64))
        return pItem; // success
    }
  }
};

inline bool CAS(volatile WORD* p, const WORD nOld, const WORD nNew) {
  Assert(IsAlign16(p)); 
  return WORD(_InterlockedCompareExchange16((short*)p, nNew, nOld)) == nOld; 
}
inline bool CAS(volatile DWORD* p, const DWORD nOld, const DWORD nNew) {
  Assert(IsAlign32(p)); 
  return DWORD(InterlockedCompareExchange((long*)p, nNew, nOld)) == nOld; 
}
inline bool CAS(volatile QWORD* p, const QWORD nOld, const QWORD nNew) { 
  Assert(IsAlign64(p)); 
  return QWORD(InterlockedCompareExchange64((LONGLONG*)p, nNew, nOld)) == nOld; 
}
inline bool CAS(volatile PVOID* pp, const void *pOld, const void *pNew) {
  Assert(IsAlign64(pp)); 
  return PVOID(InterlockedCompareExchangePointer(pp, (LPVOID)pNew, (LPVOID)pOld)) == pOld; 
}

Upvotes: 0

Related Questions