ambikanair
ambikanair

Reputation: 4600

Simple queue model example

Is there a simple program which demonstrates how queues work in Go. I just need something like add number 1 to 10 in queue and pull those from the queue in parallel using another thread.

Upvotes: 2

Views: 1315

Answers (3)

icza
icza

Reputation: 418435

A queue that is safe for concurrent use is basically a language construct: channel.

A channel–by design–is safe for concurrent send and receive. This is detaild here: If I am using channels properly should I need to use mutexes? Values sent on it are received in the order they were sent.

You can read more about channels here: What are golang channels used for?

A very simple example:

c := make(chan int, 10) // buffer for 10 elements

// Producer: send elements in a new goroutine
go func() {
    for i := 0; i < 10; i++ {
        c <- i
    }
    close(c)
}()

// Consumer: receive all elements sent on it before it was closed:
for v := range c {
    fmt.Println("Received:", v)
}

Output (try it on the Go Playground):

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9

Note that the channel buffer (10 in this example) has nothing to do with the number of elements you want to send "through" it. The buffer tells how many elements the channel may "store", or in other words, how many elements you may send on it without blocking when there are nobody is receiving from it. When the channel's buffer is full, further sends will block until someone starts receiving values from it.

Upvotes: 7

Frank Bryce
Frank Bryce

Reputation: 8446

Another option is to create and implement a queue interface, with a backing type of a channel for concurrency. For convenience, I've made a gist.

Here's how you can use it.

queue := GetIntConcurrentQueue()
defer queue.Close()

// queue.Enqueue(1)
// myInt, errQueueClosed := queue.DequeueBlocking()
// myInt, errIfNoInt := queue.DequeueNonBlocking()

Longer example here - https://play.golang.org/p/npb2Uj9hGn1

Full implementation below, and again here's the gist of it.

// Can be any backing type, even 'interface{}' if desired.
// See stackoverflow.com/q/11403050/3960399 for type conversion instructions.
type IntConcurrentQueue interface {
    // Inserts the int into the queue
    Enqueue(int)
    // Will return error if there is nothing in the queue or if Close() was already called
    DequeueNonBlocking() (int, error)
    // Will block until there is a value in the queue to return.
    // Will error if Close() was already called.
    DequeueBlocking() (int, error)
    // Close should be called with defer after initializing
    Close()
}

func GetIntConcurrentQueue() IntConcurrentQueue {
    return &intChannelQueue{c: make(chan int)}
}

type intChannelQueue struct {
    c chan int
}

func (q *intChannelQueue) Enqueue(i int) {
    q.c <- i
}

func (q *intChannelQueue) DequeueNonBlocking() (int, error) {
    select {
    case i, ok := <-q.c:
        if ok {
            return i, nil
        } else {
            return 0, fmt.Errorf("queue was closed")
        }
    default:
        return 0, fmt.Errorf("queue has no value")
    }
}

func (q *intChannelQueue) DequeueBlocking() (int, error) {
    i, ok := <-q.c
    if ok {
        return i, nil
    }
    return 0, fmt.Errorf("queue was closed")
}

func (q *intChannelQueue) Close() {
    close(q.c)
}

Upvotes: 0

vedhavyas
vedhavyas

Reputation: 1151

You could use channel(safe for concurrent use) and wait group to read from queue concurrently

package main

import (
    "fmt"
    "sync"
)

func main() {
    queue := make(chan int)

    wg := new(sync.WaitGroup)
    wg.Add(1)
    defer wg.Wait()

    go func(wg *sync.WaitGroup) {
        for {

            r, ok := <-queue
            if !ok {
                wg.Done()
                return
            }

            fmt.Println(r)
        }
    }(wg)

    for i := 1; i <= 10; i++ {
        queue <- i
    }

    close(queue)
}

Playground link: https://play.golang.org/p/A_Amqcf2gwU

Upvotes: 4

Related Questions