Reputation: 4600
Is there a simple program which demonstrates how queues work in Go. I just need something like add number 1 to 10 in queue and pull those from the queue in parallel using another thread.
Upvotes: 2
Views: 1315
Reputation: 418435
A queue that is safe for concurrent use is basically a language construct: channel.
A channel–by design–is safe for concurrent send and receive. This is detaild here: If I am using channels properly should I need to use mutexes? Values sent on it are received in the order they were sent.
You can read more about channels here: What are golang channels used for?
A very simple example:
c := make(chan int, 10) // buffer for 10 elements
// Producer: send elements in a new goroutine
go func() {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}()
// Consumer: receive all elements sent on it before it was closed:
for v := range c {
fmt.Println("Received:", v)
}
Output (try it on the Go Playground):
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9
Note that the channel buffer (10 in this example) has nothing to do with the number of elements you want to send "through" it. The buffer tells how many elements the channel may "store", or in other words, how many elements you may send on it without blocking when there are nobody is receiving from it. When the channel's buffer is full, further sends will block until someone starts receiving values from it.
Upvotes: 7
Reputation: 8446
Another option is to create and implement a queue interface
, with a backing type of a channel for concurrency. For convenience, I've made a gist.
Here's how you can use it.
queue := GetIntConcurrentQueue()
defer queue.Close()
// queue.Enqueue(1)
// myInt, errQueueClosed := queue.DequeueBlocking()
// myInt, errIfNoInt := queue.DequeueNonBlocking()
Longer example here - https://play.golang.org/p/npb2Uj9hGn1
Full implementation below, and again here's the gist of it.
// Can be any backing type, even 'interface{}' if desired.
// See stackoverflow.com/q/11403050/3960399 for type conversion instructions.
type IntConcurrentQueue interface {
// Inserts the int into the queue
Enqueue(int)
// Will return error if there is nothing in the queue or if Close() was already called
DequeueNonBlocking() (int, error)
// Will block until there is a value in the queue to return.
// Will error if Close() was already called.
DequeueBlocking() (int, error)
// Close should be called with defer after initializing
Close()
}
func GetIntConcurrentQueue() IntConcurrentQueue {
return &intChannelQueue{c: make(chan int)}
}
type intChannelQueue struct {
c chan int
}
func (q *intChannelQueue) Enqueue(i int) {
q.c <- i
}
func (q *intChannelQueue) DequeueNonBlocking() (int, error) {
select {
case i, ok := <-q.c:
if ok {
return i, nil
} else {
return 0, fmt.Errorf("queue was closed")
}
default:
return 0, fmt.Errorf("queue has no value")
}
}
func (q *intChannelQueue) DequeueBlocking() (int, error) {
i, ok := <-q.c
if ok {
return i, nil
}
return 0, fmt.Errorf("queue was closed")
}
func (q *intChannelQueue) Close() {
close(q.c)
}
Upvotes: 0
Reputation: 1151
You could use channel(safe for concurrent use) and wait group to read from queue concurrently
package main
import (
"fmt"
"sync"
)
func main() {
queue := make(chan int)
wg := new(sync.WaitGroup)
wg.Add(1)
defer wg.Wait()
go func(wg *sync.WaitGroup) {
for {
r, ok := <-queue
if !ok {
wg.Done()
return
}
fmt.Println(r)
}
}(wg)
for i := 1; i <= 10; i++ {
queue <- i
}
close(queue)
}
Playground link: https://play.golang.org/p/A_Amqcf2gwU
Upvotes: 4