marcmagransdeabril
marcmagransdeabril

Reputation: 1494

How to implement double-word compare and swap in C/Linux?

I was reading the paper "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms" and I realized that that they assume that the computer implements the following pseudo-code atomically:

CAS(Q->Tail,tail,<next.ptr,next.count+1>)

Where Q->Tail and tail are a pointer and an instance of a struct containing a pointer and a counter.

I know that gcc provides a couple of build-ins for single word compare and swap in c. However, is it possible to implement a non-blocking atomic double-word compare and swap from a single compare and swap in c (using Linux)? Is this the right approach to implement the pseudo-code of the referenced paper?

Upvotes: 4

Views: 2926

Answers (3)

Edison von Myosotis
Edison von Myosotis

Reputation: 619

Take this class for DWCAS:

#pragma once
#include <cstdint>
#include <utility>
#include <atomic>
#include <type_traits>
#include <cstddef>
#include <bit>

#if defined(_MSC_VER)
    #pragma warning(push)
    #pragma warning(disable: 26495)
#endif
#if defined(__llvm__)
    #pragma clang diagnostic push
    #pragma clang diagnostic ignored "-Watomic-alignment"
#endif

struct dcas_atomic
{
    static_assert(sizeof(size_t) == 8 || sizeof(size_t) == 4, "must be 64 or 32 bit architecture");
    struct pair_t { size_t first, second; };
    dcas_atomic() = default;
    dcas_atomic( pair_t const &desired );
    size_t first();
    size_t second();
    operator pair_t();
    template<bool Release>
    bool compare_exchange( pair_t &expected, pair_t const &desired, std::bool_constant<Release> );
    template<bool Release>
    pair_t load( std::bool_constant<Release> = std::false_type() );
    template<bool Release>
    void store( pair_t const &niu, std::bool_constant<Release> = std::true_type() );
private:
#if defined(__GNUC__) || defined(__clang__)
    using dword_t = std::conditional_t<sizeof(size_t) == 8, unsigned __int128, unsigned long long>;
#endif
    static constexpr bool SUBSTITITABLE = std::atomic<pair_t>::is_always_lock_free || sizeof(std::atomic<pair_t>) == sizeof(pair_t);
    union alignas(2 * sizeof(size_t)) U
    {
        U() {}
        static_assert(sizeof(std::atomic<size_t>) == sizeof(size_t), "sizeof(atomic<size_t>) must be == sizeof(size_t)");
        std::atomic<size_t> m_atomics[2];
        std::conditional_t<SUBSTITITABLE, std::atomic<pair_t>, char> m_subsitute;
#if defined(_MSC_VER)
    #if defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM64EC)
        __int64 volatile m_firstAndSecond[2];
    #elif defined(_M_IX86)
        __int64 volatile m_firstAndSecond;
    #else
        #error unknown architecture
    #endif
#elif defined(__GNUC__) || defined(__clang__)
        dword_t volatile m_firstAndSecond;
#endif
    } u;
};

inline dcas_atomic::dcas_atomic( pair_t const &desired )
{
    u.m_atomics[0].store( desired.first, std::memory_order_relaxed );
    u.m_atomics[1].store( desired.second, std::memory_order_relaxed );
}

inline size_t dcas_atomic::first()
{
    return u.m_atomics[0].load( std::memory_order_relaxed );
}

inline size_t dcas_atomic::second()
{
    return u.m_atomics[1].load( std::memory_order_relaxed );
}

inline dcas_atomic::operator pair_t()
{
    return pair_t( first(), second() );
}

template<bool Release>
inline bool dcas_atomic::compare_exchange( pair_t &expected, pair_t const &desired, std::bool_constant<Release> )
{
    using namespace std;
    if constexpr( !SUBSTITITABLE )
    {
#if defined(_MSC_VER)
    #if defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM64EC)
        __int64 expectedA[2];
        expectedA[0] = (__int64)expected.first;
        expectedA[1] = (__int64)expected.second;
        char fRet;
        #if defined(_M_X64)
        fRet = _InterlockedCompareExchange128( u.m_firstAndSecond, desired.second, desired.first, expectedA );
        #else
        if constexpr( Release )
            fRet = _InterlockedCompareExchange128_rel( u.m_firstAndSecond, desired.second, desired.first, expectedA );
        else
            fRet = _InterlockedCompareExchange128_acq( u.m_firstAndSecond, desired.second, desired.first, expectedA );
        #endif
        if( fRet )
            return true;
        expected.first = expectedA[0];
        expected.second = expectedA[1];
        return false;
    #elif defined(_M_IX86)
        auto compose = []( pair_t const &p ) -> uint64_t { return p.first | (uint64_t)p.second << 32; };
        uint64_t
            cDesired = compose( desired ),
            cExpected = compose( expected ),
            cResult = _InterlockedCompareExchange64( &u.m_firstAndSecond, cDesired, cExpected );
        if( cResult == cExpected ) [[likely]]
            return true;
        expected.first = (uint32_t)cResult;
        expected.second = (uint32_t)(cResult >> 32);
        return false;
    #else
        #error unspupported Windows-platform
    #endif
#elif defined(__GNUC__) || defined(__clang__)
        constexpr auto
            pair_t::*FIRST = std::endian::native == std::endian::little ? &pair_t::first : &pair_t::second,
            pair_t::*SECOND = std::endian::native == std::endian::little ? &pair_t::second : &pair_t::first;
        constexpr unsigned SZT_BITS = sizeof(size_t) * 8;
        auto compose = []( pair_t const &p ) -> dword_t { return (dword_t)(p.*FIRST) | (dword_t)(p.*SECOND) << SZT_BITS; };
        dword_t
            dwExpected = compose( expected ),
            dwDesired = compose( desired );
        bool ret;
        if constexpr( Release )
            ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected, dwDesired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED );
        else
            ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected, dwDesired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED );
        if( ret ) [[likely]]
            return true;
        expected.*FIRST = (size_t)dwExpected;
        expected.*SECOND = (size_t)(dwExpected >> SZT_BITS);
        return false;
#else
    #error unspupported platform
#endif
    }
    else
        if constexpr( Release )
            return u.m_subsitute.compare_exchange_weak( expected, desired, memory_order_release, memory_order_relaxed );
        else
            return u.m_subsitute.compare_exchange_weak( expected, desired, memory_order_acquire, memory_order_relaxed );
}

template<bool Release>
inline typename dcas_atomic::pair_t dcas_atomic::load( std::bool_constant<Release> )
{
    if constexpr( !SUBSTITITABLE )
    {
        pair_t cmp( *this );
        while( !this->compare_exchange( cmp, cmp, std::bool_constant<Release>() ) );
        return cmp;
    }
    else
        return u.m_subsitute.load( !Release ? std::memory_order_acquire : std::memory_order_relaxed );
}

template<bool Release>
inline void dcas_atomic::store( pair_t const &niu, std::bool_constant<Release> )
{
    if constexpr( !SUBSTITITABLE )
    {
        pair_t cmp( *this );
        while( !this->compare_exchange( cmp, niu, std::bool_constant<Release>() ) );
    }
    else
        return u.m_subsitute.store( niu, !Release ? std::memory_order_acquire : std::memory_order_relaxed );
}

#if defined(_MSC_VER)
    #pragma warning(pop)
#endif
#if defined(__llvm__)
    #pragma clang diagnostic pop
#endif

Upvotes: 0

uppi
uppi

Reputation: 300

Gcc also provides built-ins for double-word CAS, you can use __sync_bool_compare_and_swap, if sizeof(*Q->Tail) == sizeof(tail) == sizeof(third_arg) == 8. So everything works fine if you can increment "next.count" before performing CAS.

Upvotes: 0

Ioan
Ioan

Reputation: 2412

I don't know of any CAS operations that work on two distinct memory locations. However, it's possible the paper is using a struct for the pointer and counter as a workaround to treat both fields as a larger type, and therefore, change both atomically.

Assuming you have a struct of two fields, a pointer and a counter: pointer size of 4 bytes, counter size of 4 bytes; properly aligned without padding to a struct size of 8 bytes. Assuming you also have a CAS operation that handles values of 8 bytes (such as a pointer to a 64-bit integer). You can prepare the struct values separately, and use the CAS operation on the struct as a whole to change two values at once.

Upvotes: 1

Related Questions