Reputation: 5409
I'm trying to implement a component that queries a database for events and notifies the user if a new event is available via a channel. It basically looks like this:
type struct Watcher {
events chan Event
}
func (w *Watcher) Watch() <- chan Event {
w.fetch()
return w.events
}
func (w *Watcher) Stop() {
// ???
}
func (w *Watcher) fetch() {
//...
for {
//...
e := fetchEvent()
w.events <- e
time.sleep(10)
//...
}
}
The client side then looks like this:
events := NewWatcher().Events()
e := <- events
//...
My question is how can I correctly implement the Stop()
method?
Especially, how can I interrupt the sender if it is currently waiting for being able to send an event to the channel I know that closing the channel from the receiver side is no option. What's the best practice on this type of issue?
Upvotes: 2
Views: 911
Reputation: 7440
Alternatively you can stop by context. This is better because with channels for stopping it could block if for some reason the other side is already stopped or blocked by something else, etc.
type Watcher struct {
events chan Event
cancel context.CancelFunc
}
func (w *Watcher) Watch() <-chan Event {
ctx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
go w.fetch(ctx)
return w.events
}
func (w *Watcher) Stop() {
w.cancel()
}
func (w *Watcher) fetch(ctx context.Context) {
for {
e := fetchEvent()
select {
case <-ctx.Done():
return
case w.events <- e:
}
}
}
This example creates a Context
in the Watch
function (before fetching gets started) and registers the cancel
function in the Watcher. The ctx
is then passed on to fetch
which checks for ctx.Done
on every iteration. If the context is Done
, it will stop fetching.
You might want to initialize the cancel
function to be an empty function so it is never nil
and Stop
can be called without a panic
even if the Watcher
was never started.
func NewWatcher() *Watcher {
return &Watcher{
// ...
cancel: func() {},
}
}
If you additionally want to pass in a context from the outside, you can rewrite your Watch
function to:
func (w *Watcher) Watch(ctx context.Context) <-chan Event {
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
w.fetch(ctx)
return w.events
}
Now the Watcher
will stop to fetch
if the Stop
function is called or the outside ctx
is cancelled (e.g. if your application shuts down).
If the Watcher.events
channel is not used anywhere else, you can create it in the Watch
function and pass it to fetch
as well as return it. That way the Watch
function can also be called multiple times and they will have separate channels, and not multiple readers reading from the same channel.
If you do this, you probably also want to have a way to cancel them individually, so you could return the cancel
function from the Watch
function and remove it from the Watcher
struct. Also Stop
is now obsolete.
func (w *Watcher) Watch() (<-chan Event, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
// with small buffer for channel to smooth sending/receiving
ch := make(chan Event, 5)
go w.fetch(ctx, cancel, ch)
return ch, cancel
}
func (w *Watcher) fetch(ctx context.Context, cancel context.CancelFunc, ch chan<- Event) {
// make sure the context is cancelled always if `fetch` stops to avoid goroutine leak.
defer cancel()
for {
select {
case <-ctx.Done():
// now you can close it without problems
close(ch)
return
case ch <- fetchEvent():
}
}
}
Upvotes: 1
Reputation: 76433
I'm going to make some assumptions: Watcher
is going to be a type you're planning to pass around, or at least, you're happy having its functionality be hidden behind a clean interface. Though perhaps not immediately a concern, you want such a component to be safe for concurrent use.
type Watcher struct {
ch chan Event
done chan struct{}
}
// New now starts fetching, leave out go w.fetch(ctx) if you don't want this to happen
func New(ctx context.Context) *Watcher {
w := &Watcher{
ch: make(chan Event, 10), // some buffer if needed
done: make(chan struct{}), // no buffer needed
}
go w.fetch(ctx)
return w
}
// If you only want to start fetching events when this is called, then uncomment the first line, and allow for context to be passed
func (w *Watcher) Watch() <-chan Event {
// go w.fetch(ctx)
return w.ch
}
func (w *Watcher) fetch(ctx context.Context) {
// this runs in a routine
// if we stop fetching, close channels
defer func() {
if w.done != nil {
close(w.done)
}
w.done = nil
close(w.ch)
}()
for {
select {
case <-ctx.Done():
return // outside context was cancelled
case <-w.done:
w.done = nil // prevent second close on done channel
return // Stop was called
default:
w.ch <- fetchEvent() // will block until channel buffer is available
}
}
}
func (w *Watcher) Stop() {
if w.done == nil { // field is set to nil once fetch routine returns
return
}
close(w.done) // rest is handled in fetch routine
}
That's pretty much it. I will say, though, that your use of a 10 second sleep until you fetch the next event is better handled using a ticker in the main fetch
routine:
func (w *Watcher) fetch(ctx context.Context) {
tick := time.NewTicker(10)
// this runs in a routine
// if we stop fetching, close channels
defer func() {
tick.Stop()
if w.done != nil {
close(w.done)
}
w.done = nil
close(w.ch)
}()
for {
select {
case <-ctx.Done():
return // outside context was cancelled
case <-w.done:
w.done = nil
return // Stop was called
case <-tick.C: // every 10 seconds, we'll fetch an event
w.ch <- fetchEvent()
}
}
}
Using this type is pretty straightforward:
ctx, cfunc := context.WithCancel(context.Background())
watcher := NewWatcher(ctx)
go func() {
for e := range watcher.Watch() {
// do something with the data
}
}()
time.Sleep(1 * time.Minute)
// either:
watcher.Stop()
// or simply:
cfunc()
Upvotes: 1
Reputation: 2403
Add another channel to your Watcher
, like doneChan
, and listen to this channel using select
in your publisher ( aka sender ) like this:
// sender part
events, doneChan := NewWatcher()
select {
case done := <-doneChan:
// cleanup and return
case events<- message:
}
Now you can implement your Stop()
like this:
func (w *Watcher) Stop() {
w.doneChan<- true
}
Your new struct be like:
type struct Watcher {
events chan Event
doneChan chan bool
}
Upvotes: 0