Jin Hoon Jeffrey Bang
Jin Hoon Jeffrey Bang

Reputation: 601

How to broadcast message using channel

I am new to go and I am trying to create a simple chat server where clients can broadcast messages to all connected clients.

In my server, I have a goroutine (infinite for loop) that accepts connection and all the connections are received by a channel.

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

Then, I start a handler (goroutine) for every connected client. Inside the handler, I try to broadcast to all connections by iterating through the channel.

for c := range ch {
    conn.Write(msg)
}

However, I cannot broadcast because (I think from reading the docs) the channel needs to be closed before iterating. I am not sure when I should close the channel because I want to continuously accept new connections and closing the channel won't let me do that. If anyone can help me, or provide a better way to broadcast messages to all connected clients, it would be appreciated.

Upvotes: 50

Views: 58573

Answers (7)

icza
icza

Reputation: 417472

A more elegant solution is a "broker", where clients may subscribe and unsubscribe to messages.

To also handle subscribing and unsubscribing elegantly, we may utilize channels for this, so the main loop of the broker which receives and distributes the messages can incorporate all these using a single select statement, and synchronization is given from the solution's nature.

Another trick is to store the subscribers in a map, mapping from the channel we use to distribute messages to them. So use the channel as the key in the map, and then adding and removing the clients is "dead" simple. This is made possible because channel values are comparable, and their comparison is very efficient as channel values are simple pointers to channel descriptors.

Without further ado, here's a simple broker implementation:

type Broker[T any] struct {
    stopCh    chan struct{}
    publishCh chan T
    subCh     chan chan T
    unsubCh   chan chan T
}

func NewBroker[T any]() *Broker[T] {
    return &Broker[T]{
        stopCh:    make(chan struct{}),
        publishCh: make(chan T, 1),
        subCh:     make(chan chan T, 1),
        unsubCh:   make(chan chan T, 1),
    }
}

func (b *Broker[T]) Start() {
    subs := map[chan T]struct{}{}
    for {
        select {
        case <-b.stopCh:
            return
        case msgCh := <-b.subCh:
            subs[msgCh] = struct{}{}
        case msgCh := <-b.unsubCh:
            delete(subs, msgCh)
        case msg := <-b.publishCh:
            for msgCh := range subs {
                // msgCh is buffered, use non-blocking send to protect the broker:
                select {
                case msgCh <- msg:
                default:
                }
            }
        }
    }
}

func (b *Broker[T]) Stop() {
    close(b.stopCh)
}

func (b *Broker[T]) Subscribe() chan T {
    msgCh := make(chan T, 5)
    b.subCh <- msgCh
    return msgCh
}

func (b *Broker[T]) Unsubscribe(msgCh chan T) {
    b.unsubCh <- msgCh
}

func (b *Broker[T]) Publish(msg T) {
    b.publishCh <- msg
}

Example using it:

func main() {
    // Create and start a broker:
    b := NewBroker[string]()
    go b.Start()

    // Create and subscribe 3 clients:
    clientFunc := func(id int) {
        msgCh := b.Subscribe()
        for {
            fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
        }
    }
    for i := 0; i < 3; i++ {
        go clientFunc(i)
    }

    // Start publishing messages:
    go func() {
        for msgId := 0; ; msgId++ {
            b.Publish(fmt.Sprintf("msg#%d", msgId))
            time.Sleep(300 * time.Millisecond)
        }
    }()

    time.Sleep(time.Second)
}

Output of the above will be (try it on the Go Playground):

Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3

Improvements

You may consider the following improvements. These may or may not be useful depending on how / to what you use the broker.

You may choose to close the message channel after unsubscribing, signalling that no more messages will be sent on it. You can do it in Broker.Start() in the msgCh := <-b.unsubCh case:

    case msgCh := <-b.unsubCh:
        delete(subs, msgCh)
        close(msgCh)

This would allow clients to range over the message channel, like this:

msgCh := b.Subscribe()
for msg := range msgCh {
    fmt.Printf("Client %d got message: %v\n", id, msg)
}

Then if someone unsubscribes this msgCh like this:

b.Unsubscribe(msgCh)

The above range loop will terminate after processing all messages that were sent before the call to Unsubscribe().

If you want your clients to rely on the message channel being closed, and the broker's lifetime is narrower than your app's lifetime, then you could also close all subscribed clients when the broker is stopped, in the Start() method like this:

case <-b.stopCh:
    for msgCh := range subs {
        close(msgCh)
    }
    return

Upvotes: 82

Sandy Cash
Sandy Cash

Reputation: 406

The canonical (and idiomatic go) way to do this is via a slice of channels, as recommended above by Nevets and icza.

You should specifically not use a slice of callbacks. In some languages, you do typically register observers by passing a callback, but in those cases, you have to wrap their invocation in a fair amount of defensive code to protect the sender, and ideally you should have the generator of the message (the "Subject" in classic Observer pattern discussion) segregated from the observers by an intermediate message transport layer. This is where you typically use a pub-sub mesh (JMS brokers, gnats, MQ, whatever) when you're crossing process boundaries, but you should adhere to the same pattern if both subject and observers are internal to the same process (and most languages have available implementations of such mechanisms, so you shouldn't need to roll your own).

The reasons not to use callbacks include:

  1. Unless you build in your own message transport layer, your subject is no longer both naive (it doesn't know the nature or cardinality of the observers) and disinterested (it doesn't care what they do with the message, only that it is made available to any interested parties);
  2. If you want true broadcasting, then you need to act as if the order of receipt does not matter - ideally, everyone can see the message at the same time, even though in practice sending is iterative, even when using channels. But sending to recipient n+1 should absolutely not depend on confirmation of receipt by recipient n. That isn't broadcasting, it's serialized assignment. I say assignment because, if you are asking for a callback, then in executing the callback, you are enforcing (even if only minimally) some behavior to be taken by the recipient. You've basically turned your sender into an orchestrator, which is a very different sort of pattern with a different set of use cases.
  3. Absent a defensive boundary (wrapping each callback invocation in a separate goroutine with a timeout context, e.g.), you are vulnerable to being blocked by a recipient - this is antithetical to broadcasting. Receipt (and optionally, taking any action at all based on) a broadcast message must be entirely asynchronous with respect to the original sending.

Is it doable to provide pseudo-broadcasting by using callbacks in go? Sure, but you have to invest in so much additional complexity to keep things clean - and why would you do that when go provides an easy and rather robust way to do it? The examples of channel-driven broadcasting above are good ones and how you should do it pretty much every time.

The specific exception when you absolutely should use callbacks is when you are not disinterested - you really do care that, on the basis of the sent message, the recipients take some action (and usually something specified by contract). For example, "I am about to unmount this filesystem, so flush and close your filehandles, let me know once you're done." (I know that's a pretty old-fashioned example, but it's the first one that comes to mind.)

Upvotes: 1

Mikhail
Mikhail

Reputation: 41

another one simple example: https://play.golang.org

    
type Broadcaster struct {
    mu      sync.Mutex
    clients map[int64]chan struct{}
}

func NewBroadcaster() *Broadcaster {
    return &Broadcaster{
        clients: make(map[int64]chan struct{}),
    }
}

func (b *Broadcaster) Subscribe(id int64) (<-chan struct{}, error) {
    defer b.mu.Unlock()
    b.mu.Lock()
    s := make(chan struct{}, 1)

    if _, ok := b.clients[id]; ok {
        return nil, fmt.Errorf("signal %d already exist", id)
    }

    b.clients[id] = s

    return b.clients[id], nil
}

func (b *Broadcaster) Unsubscribe(id int64) {
    defer b.mu.Unlock()
    b.mu.Lock()
    if _, ok := b.clients[id]; ok {
        close(b.clients[id])
    }

    delete(b.clients, id)
}

func (b *Broadcaster) broadcast() {
    defer b.mu.Unlock()
    b.mu.Lock()
    for k := range b.clients {
        if len(b.clients[k]) == 0 {
            b.clients[k] <- struct{}{}
        }
    }
}

type testClient struct {
    name     string
    signal   <-chan struct{}
    signalID int64
    brd      *Broadcaster
}

func (c *testClient) doWork() {
    i := 0
    for range c.signal {
        fmt.Println(c.name, "do work", i)
        if i > 2 {
            c.brd.Unsubscribe(c.signalID)
            fmt.Println(c.name, "unsubscribed")
        }
        i++
    }
    fmt.Println(c.name, "done")
}

func main() {
    var err error
    brd := NewBroadcaster()

    clients := make([]*testClient, 0)

    for i := 0; i < 3; i++ {
        c := &testClient{
            name:     fmt.Sprint("client:", i),
            signalID: time.Now().UnixNano()+int64(i), // +int64(i) for play.golang.org
            brd:      brd,
        }
        c.signal, err = brd.Subscribe(c.signalID)
        if err != nil {
            log.Fatal(err)
        }

        clients = append(clients, c)
    }

    for i := 0; i < len(clients); i++ {
        go clients[i].doWork()
    }

    for i := 0; i < 6; i++ {
        brd.broadcast()
        time.Sleep(time.Second)
    }
}

output:

client:0 do work 0
client:2 do work 0
client:1 do work 0
client:2 do work 1
client:0 do work 1
client:1 do work 1
client:2 do work 2
client:0 do work 2
client:1 do work 2
client:2 do work 3
client:2 unsubscribed
client:2 done
client:0 do work 3
client:0 unsubscribed
client:0 done
client:1 do work 3
client:1 unsubscribed
client:1 done

Upvotes: 4

Davood
Davood

Reputation: 1566

This is a late answer but I think it may appease some curious readers.

Go channels are widely welcomed to be used when it comes to concurrency.

Go community is rigid to follow this saying:

Do not communicate by sharing memory; instead, share memory by communicating.

I am completely neutral toward this and I think other options rather than well-defined channels should be considered when it comes to broadcasting.

Here is my take: Cond from sync packages are widely overlooked. Implementing braodcaster as suggested by Bronze man in very same context worths noting.

I was delighted witch icza suggestion to use channels and broadcast messages over them. I follow the same methods and use sync's conditional variable:

// Broadcaster is the struct which encompasses broadcasting
type Broadcaster struct {
    cond        *sync.Cond
    subscribers map[interface{}]func(interface{})
    message     interface{}
    running     bool
}

this is the main struct that our whole broadcasting concept relies on.

Below, I define some behaviours for this struct. In a nutshell, subscribers should be able to be added, removed and whole the process should be revokable.

    // SetupBroadcaster gives the broadcaster object to be used further in messaging
    func SetupBroadcaster() *Broadcaster {
    
        return &Broadcaster{
            cond:        sync.NewCond(&sync.RWMutex{}),
            subscribers: map[interface{}]func(interface{}){},
        }
    }
    
    // Subscribe let others enroll in broadcast event!
    func (b *Broadcaster) Subscribe(id interface{}, f func(input interface{})) {
    
        b.subscribers[id] = f
    }
    
    // Unsubscribe stop receiving broadcasting
    func (b *Broadcaster) Unsubscribe(id interface{}) {
        b.cond.L.Lock()
        delete(b.subscribers, id)
        b.cond.L.Unlock()
    }
    
    // Publish publishes the message
    func (b *Broadcaster) Publish(message interface{}) {
        go func() {
            b.cond.L.Lock()
    
            b.message = message
            b.cond.Broadcast()
            b.cond.L.Unlock()
        }()
    }
    
    // Start the main broadcasting event
    func (b *Broadcaster) Start() {
        b.running = true
        for b.running {
            b.cond.L.Lock()
            b.cond.Wait()
            go func() {
                for _, f := range b.subscribers {
                    f(b.message) // publishes the message
                }
            }()
            b.cond.L.Unlock()
        }
    
    }
    
    // Stop broadcasting event
    func (b *Broadcaster) Stop() {
        b.running = false
    }

Next, I can use it quite easily:

    messageToaster := func(message interface{}) {
        fmt.Printf("[New Message]: %v\n", message)
    }
    unwillingReceiver := func(message interface{}) {
        fmt.Println("Do not disturb!")
    }
    broadcaster := SetupBroadcaster()
    broadcaster.Subscribe(1, messageToaster)
    broadcaster.Subscribe(2, messageToaster)
    broadcaster.Subscribe(3, unwillingReceiver)

    go broadcaster.Start()

    broadcaster.Publish("Hello!")

    time.Sleep(time.Second)
    broadcaster.Unsubscribe(3)
    broadcaster.Publish("Goodbye!")

It should print something like this in any order:

[New Message]: Hello!
Do not disturb!
[New Message]: Hello!
[New Message]: Goodbye!
[New Message]: Goodbye!

See this on go playground

Upvotes: 4

nevets
nevets

Reputation: 4818

What you are doing is a fan out pattern, that is to say, multiple endpoints are listening to a single input source. The result of this pattern is, only one of these listeners will be able to get the message whenever there's a message in the input source. The only exception is a close of channel. This close will be recognized by all of the listeners, and thus a "broadcast".

But what you want to do is broadcasting a message read from connection, so we could do something like this:

When the number of listeners is known

Let each worker listen to dedicated broadcast channel, and dispatch the message from the main channel to each dedicated broadcast channel.

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

And then we could have a bunch of workers:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

Then start our listener:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

And a dispatcher:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

When the number of listeners is not known

In this case, the solution given above still works. The only difference is, whenever you need a new worker, you need to create a new worker, start it up, and then push it into workers slice. But this method requires a thread-safe slice, which need a lock around it. One of the implementation may look like as follows:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

Whenever you want to start a worker:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

And your dispatcher will be changed to:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

Last words: never leave a dangling goroutine

One of the good practices is: never leave a dangling goroutine. So when you finished listening, you need to close all of the goroutines you fired. This will be done via quit channel in worker:

First we need to create a global quit signalling channel:

globalQuit := make(chan struct{})

And whenever we create a worker, we assign the globalQuit channel to it as its quit signal:

worker.quit = globalQuit

Then when we want to shutdown all workers, we simply do:

close(globalQuit)

Since close will be recognized by all listening goroutines (this is the point you understood), all goroutines will be returned. Remember to close your dispatcher routine as well, but I will leave it to you :)

Upvotes: 83

bronze man
bronze man

Reputation: 1607

Broadcast to a slice of channel and use sync.Mutex to manage channel add and remove may be the easiest way in your case.

Here is what you can do to broadcast in golang:

  • You can broadcast a share status change with sync.Cond. This way do not have any alloc once setup, but you can not add timeout functional or work with another channel.
  • You can broadcast a share status change with a close old channel and create new channel and sync.Mutex. This way have one alloc per status change, but you can add timeout functional and work with another channel.
  • You can broadcast to a slice of function callback and use sync.Mutex to manage them. The caller can do channel stuff. This way have more than one alloc per caller, and work with another channel.
  • You can broadcast to a slice of channel and use sync.Mutex to manage them. This way have more than one alloc per caller, and work with another channel.
  • You can broadcast to a slice of sync.WaitGroup and use sync.Mutex to manage them.

Upvotes: 5

Rick-777
Rick-777

Reputation: 10238

Because Go channels follow the Communicating Sequential Processes (CSP) pattern, channels are a point-to-point communication entity. There is always one writer and one reader involved in each exchange.

However, each channel end can be shared amongst multiple goroutines. This is safe to do - there is no dangerous race condition.

So there can be multiple writers sharing the writing end. And/or there can be multiple readers sharing the reading end. I wrote more on this in a different answer, which includes examples.

If you really need a broadcast, you cannot do this directly, but it is not hard to implement an intermediate goroutine that copies a value out to each of a group of output channels.

Upvotes: 1

Related Questions