Christopher
Christopher

Reputation: 44284

How can I signal a channel sender to quit in golang?

I'm using channels in Go to process a data pipeline of sorts. The code looks something like this:

type Channels struct {
    inputs chan string
    errc   chan error
    quit   chan struct{}
}

func (c *Channels) doSomethingWithInput() {
    defer close(c.quit)
    defer close(c.errc)
    for input := range p.inputs {
        _, err := doSomethingThatSometimesErrors(input)
        if err != nil {
            c.errc <- err
            return
        }
    }
    doOneFinalThingThatCannotError()
    return
}

func (c *Channels) inputData(s string) {
        // This function implementation is my question
}

func StartProcessing(c *Channels, data ...string) error {
    go c.doSomethingWithInput()
    go func() {
        defer close(c.inputs)
        for _, i := range data {
            select {
            case <-c.quit:
                break
            default:
            }
            inputData(i)
            }
    }()
    // Block until the quit channel is closed.
    <-c.quit
    if err := <-c.errc; err != nil {
        return err
    }
    return nil
}

This seems like a reasonable way to communicate a quit signal between channel processors and is based on this blog post about concurrency patterns in Go.

The thing I struggle with using this pattern is the inputData function. Adding strings to the input channel needs to wait for doSomethingWithInput() to read the channel, but it also might error. inputData needs to try and feed the inputs channel but give up if told to quit. The best I could do was this:

func (c *Channels) inputData(s string) {
    for {
        select {
        case <-c.quit:
            return
        case c.inputs <- s:
            return
        }
    }
}

Essentially, "oscillate between your options until one of them sticks." To be clear, I don't think it's a bad design. It just feels... wasteful. Like I'm missing something clever. How can I tell a channel sender to quit in Go when a channel consumer errors?

Upvotes: 0

Views: 4171

Answers (1)

wldsvc
wldsvc

Reputation: 1272

Your inputData() is fine, that's the way to do it.

In your use case, your channel consumer, the receiver, aka doSomethingWithInput() is the one which should have control over the "quit" channel. As it is, if an error occurs, just return from doSomethingWithInput(), which will in turn close the quit channel and make the sender(s) quit (will trigger case <-quit:). That is in fact the clever bit.

Just watch out with your error channel that's not buffered and closed when doSomethingWithInput() exits. You cannot read it afterwards to collect errors. You need to close it in your main function and initialize it with some capacity (make(chan int, 10) for example), or create a consumer goroutine for it. You may also want to try reading it with a select statement: your error checking code, as it is, will block forever if there are no errors.

Upvotes: 3

Related Questions