Manuel Martinez
Manuel Martinez

Reputation: 828

Implementing a lock-free unbounded queue with new atomic.Pointer types

I am trying to implement this non-blocking queue from Michael and Scott.

I am trying to use the new atomic.Pointer types introduced in Go 1.19 but I am getting a data race in my application.

Here is my implementation:

package queue

import (
    "errors"
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    n := &node[T]{}
    head.Store(n)
    tail.Store(n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return head.value, errors.New("queue is empty")
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v, nil
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
        return q.head.Load() == q.tail.Load()
}

I found a different implementation here which works in my application without a data race, but I can't seem to figure out what is exactly different between the two.

Any help or feedback is appreciated!

Upvotes: 1

Views: 1085

Answers (1)

Manuel Martinez
Manuel Martinez

Reputation: 828

It turned out that changing some things fixed the issue.

First change:

var n = node[T]{}
head.Store(&n)
tail.Store(&n)

Second change was changing the Dequeue() return signature.

The final file looks like this:

package queue

import (
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    var n = node[T]{}
    head.Store(&n)
    tail.Store(&n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() T {
    var t T
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return t
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
    return q.head.Load() == q.tail.Load()
}

Upvotes: 1

Related Questions