Moonlit
Moonlit

Reputation: 5409

How to interrupt sending on a channel

I'm trying to implement a component that queries a database for events and notifies the user if a new event is available via a channel. It basically looks like this:

type struct Watcher {
  events chan Event
}

func (w *Watcher) Watch() <- chan Event {
 w.fetch()
 return w.events
}

func (w *Watcher) Stop() {
 // ???
}

func (w *Watcher) fetch() {
 //...
  for {
    //...
    e := fetchEvent()
    w.events <- e
    time.sleep(10)
    //...
  }
}

The client side then looks like this:

events := NewWatcher().Events()
e := <- events
//...

My question is how can I correctly implement the Stop() method? Especially, how can I interrupt the sender if it is currently waiting for being able to send an event to the channel I know that closing the channel from the receiver side is no option. What's the best practice on this type of issue?

Upvotes: 2

Views: 911

Answers (3)

TehSphinX
TehSphinX

Reputation: 7440

Alternatively you can stop by context. This is better because with channels for stopping it could block if for some reason the other side is already stopped or blocked by something else, etc.

type Watcher struct {
    events chan Event
    cancel context.CancelFunc
}

func (w *Watcher) Watch() <-chan Event {
    ctx, cancel := context.WithCancel(context.Background())
    w.cancel = cancel
    go w.fetch(ctx)
    return w.events
}

func (w *Watcher) Stop() {
    w.cancel()
}

func (w *Watcher) fetch(ctx context.Context) {
    for {
        e := fetchEvent()
        select {
        case <-ctx.Done():
            return
        case w.events <- e:
        }
    }
}

This example creates a Context in the Watch function (before fetching gets started) and registers the cancel function in the Watcher. The ctx is then passed on to fetch which checks for ctx.Done on every iteration. If the context is Done, it will stop fetching.


You might want to initialize the cancel function to be an empty function so it is never nil and Stop can be called without a panic even if the Watcher was never started.

func NewWatcher() *Watcher {
    return &Watcher{
        // ...
        cancel: func() {},
    }
}

If you additionally want to pass in a context from the outside, you can rewrite your Watch function to:

func (w *Watcher) Watch(ctx context.Context) <-chan Event {
    ctx, cancel := context.WithCancel(ctx)
    w.cancel = cancel
    w.fetch(ctx)
    return w.events
}

Now the Watcher will stop to fetch if the Stop function is called or the outside ctx is cancelled (e.g. if your application shuts down).


If the Watcher.events channel is not used anywhere else, you can create it in the Watch function and pass it to fetch as well as return it. That way the Watch function can also be called multiple times and they will have separate channels, and not multiple readers reading from the same channel.

If you do this, you probably also want to have a way to cancel them individually, so you could return the cancel function from the Watch function and remove it from the Watcher struct. Also Stop is now obsolete.

func (w *Watcher) Watch() (<-chan Event, context.CancelFunc) {
    ctx, cancel := context.WithCancel(context.Background())
    
    // with small buffer for channel to smooth sending/receiving
    ch := make(chan Event, 5)
    
    go w.fetch(ctx, cancel, ch)
    return ch, cancel
}

func (w *Watcher) fetch(ctx context.Context, cancel context.CancelFunc, ch chan<- Event) {
    // make sure the context is cancelled always if `fetch` stops to avoid goroutine leak.
    defer cancel()

    for {
        select {
        case <-ctx.Done():
            // now you can close it without problems
            close(ch)
            return
        case ch <- fetchEvent():
        }
    }
}

Upvotes: 1

Elias Van Ootegem
Elias Van Ootegem

Reputation: 76433

I'm going to make some assumptions: Watcher is going to be a type you're planning to pass around, or at least, you're happy having its functionality be hidden behind a clean interface. Though perhaps not immediately a concern, you want such a component to be safe for concurrent use.

type Watcher struct {
    ch   chan Event
    done chan struct{}
}

// New now starts fetching, leave out go w.fetch(ctx) if you don't want this to happen
func New(ctx context.Context) *Watcher {
    w := &Watcher{
        ch:   make(chan Event, 10), // some buffer if needed
        done: make(chan struct{}),  // no buffer needed
    }
    go w.fetch(ctx)
    return w
}

// If you only want to start fetching events when this is called, then uncomment the first line, and allow for context to be passed
func (w *Watcher) Watch() <-chan Event {
    // go w.fetch(ctx)
    return w.ch
}

func (w *Watcher) fetch(ctx context.Context) {
    // this runs in a routine
    // if we stop fetching, close channels
    defer func() {
        if w.done != nil {
            close(w.done)
        }
        w.done = nil
        close(w.ch)
    }()
    for {
        select {
        case <-ctx.Done():
            return // outside context was cancelled
        case <-w.done:
            w.done = nil // prevent second close on done channel
            return // Stop was called
        default:
            w.ch <- fetchEvent() // will block until channel buffer is available
        }
    }
}

func (w *Watcher) Stop() {
    if w.done == nil { // field is set to nil once fetch routine returns
        return
    }
    close(w.done) // rest is handled in fetch routine
}

That's pretty much it. I will say, though, that your use of a 10 second sleep until you fetch the next event is better handled using a ticker in the main fetch routine:

func (w *Watcher) fetch(ctx context.Context) {
    tick := time.NewTicker(10)
    // this runs in a routine
    // if we stop fetching, close channels
    defer func() {
        tick.Stop()
        if w.done != nil {
            close(w.done)
        }
        w.done = nil
        close(w.ch)
    }()
    for {
        select {
        case <-ctx.Done():
            return // outside context was cancelled
        case <-w.done:
            w.done = nil
            return // Stop was called
        case <-tick.C: // every 10 seconds, we'll fetch an event
            w.ch <- fetchEvent()
        }
    }
}

Using this type is pretty straightforward:

ctx, cfunc := context.WithCancel(context.Background())
watcher := NewWatcher(ctx)
go func() {
    for e := range watcher.Watch() {
        // do something with the data
    }
}()
time.Sleep(1 * time.Minute)
// either:
watcher.Stop()
// or simply:
cfunc()

Upvotes: 1

meshkati
meshkati

Reputation: 2403

Add another channel to your Watcher, like doneChan, and listen to this channel using select in your publisher ( aka sender ) like this:

// sender part
events, doneChan := NewWatcher()

select {
case done := <-doneChan:
    // cleanup and return
case events<- message:
}

Now you can implement your Stop() like this:

func (w *Watcher) Stop() {
    w.doneChan<- true
}

Your new struct be like:

type struct Watcher {
    events   chan Event
    doneChan chan bool
}

Upvotes: 0

Related Questions