Reputation: 20450
Here is my code:
package main
import (
"sync/atomic"
"unsafe"
"sync"
"fmt"
"time"
)
const (
MAX_DATA_SIZE = 100
)
// lock free queue
type Queue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// one node in queue
type Node struct {
val interface{}
next unsafe.Pointer
}
// queue functions
func (self *Queue) enQueue(val interface{}) {
newValue := unsafe.Pointer(&Node{val: val, next: nil})
var tail,next unsafe.Pointer
for {
tail = self.tail
next = ((*Node)(tail)).next
if next != nil {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue){
break
}
}
}
func (self *Queue) deQueue() (val interface{}, success bool){
var head,tail,next unsafe.Pointer
for {
head = self.head
tail = self.tail
next = ((*Node)(head)).next
if head == tail {
if next == nil {
return nil, false
}else {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}
}else {
val = ((*Node)(next)).val
if atomic.CompareAndSwapPointer(&(self.head), head, next) {
return val, true
}
}
}
return
}
func main() {
var wg sync.WaitGroup
wg.Add(20)
queue := new(Queue)
queue.head = unsafe.Pointer(new(Node))
queue.tail = queue.head
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue.enQueue(t)
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
ok := false
var val interface{}
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val,ok = queue.deQueue()
for !ok {
val,ok = queue.deQueue()
}
fmt.Println("deq = ",val)
}
}()
}
wg.Wait()
}
The problem is, sometimes the code runs ok, but sometimes it fails and just gets stuck with no response.
Is there any problem in my code?
Upvotes: 2
Views: 3335
Reputation: 54089
Here is the above re-written with channels as @mkb suggested (bar the infinite queue size).
It doesn't lock up.
I'd suggest you use channels unless you have a really good reason not to as the Go team have spend a great deal of effort making them reliable, high performance and easy to use.
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
const (
MAX_DATA_SIZE = 100
)
func main() {
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup
wg.Add(20)
queue := make(chan time.Time, 10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue <- t
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val := <-queue
fmt.Println("deq = ", val)
}
}()
}
wg.Wait()
}
Upvotes: 3
Reputation: 7732
There is a lot of active waiting in this code, and I strongly recommend a clean use of channel just like Nick's nice code.
However, here is my answer to the exact original question "Why is it stuck?" : there is no garantee of when each goroutine will yield to let the others execute, and most probably it will never yield when inside an infinite loop.
You can fix this by using runtime.Gosched() inside each possibly-infinite for loop :
Gosched yields the processor, allowing other goroutines to run. It does not suspend the current goroutine, so execution resumes automatically.
This enhanced code runs almost as fast as the original, but never hangs :
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
)
const (
MAX_DATA_SIZE = 100
)
// lock free queue
type Queue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// one node in queue
type Node struct {
val interface{}
next unsafe.Pointer
}
// queue functions
func (self *Queue) enQueue(val interface{}) {
newValue := unsafe.Pointer(&Node{val: val, next: nil})
var tail, next unsafe.Pointer
for {
tail = self.tail
next = ((*Node)(tail)).next
if next != nil {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
} else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue) {
break
}
runtime.Gosched()
}
}
func (self *Queue) deQueue() (val interface{}, success bool) {
var head, tail, next unsafe.Pointer
for {
head = self.head
tail = self.tail
next = ((*Node)(head)).next
if head == tail {
if next == nil {
return nil, false
} else {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}
} else {
val = ((*Node)(next)).val
if atomic.CompareAndSwapPointer(&(self.head), head, next) {
return val, true
}
}
runtime.Gosched()
}
return
}
func main() {
var wg sync.WaitGroup
wg.Add(20)
queue := new(Queue)
queue.head = unsafe.Pointer(new(Node))
queue.tail = queue.head
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue.enQueue(t)
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
ok := false
var val interface{}
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val, ok = queue.deQueue()
for !ok {
val, ok = queue.deQueue()
runtime.Gosched()
}
fmt.Println("deq = ", val)
}
}()
}
wg.Wait()
}
Upvotes: 5