Elliot Chance
Elliot Chance

Reputation: 5746

How to close a channel with multiple senders?

I have a case where I need to fan out receivers that send to the same channel:

func MessagesFromSQS(ctx context.Context, sqsClient sqsiface.SQSAPI) chan *sqs.Message {
    messages := make(chan *sqs.Message)

    go func() {
        defer close(messages)
        wg := sync.WaitGroup{}

        for i := 0; i < parallelSQSReaders; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()

                for {
                    select {
                    case <-ctx.Done():
                        return

                    default:
                        // ...

                        for _, message := range result.Messages {
                            messages <- message
                        }
                    }
                }
            }()
        }

        wg.Wait()
    }()

    return messages
}

To me this makes sense. However, the race detector complains that different goroutines and sending and closing the channel. I realize that the goroutine thats responsible for sending should be the same one that closes, but what's the correct way to do this?

EDIT/SOLVED: Thanks for your responses. It turns out I wasn't reading the race detector stack trace correctly. I assumed that the code I changed introduced the bug, instead of uncovering a bug in the SQS mock. Once I correctly synchronized ReceiveMessage() it was fine.

Upvotes: 1

Views: 1778

Answers (1)

colm.anseo
colm.anseo

Reputation: 22117

Close the channel when you know there won't be any more writes i.e. when all worker go-routines are complete.

So:

wg.Wait()
close(messages)

P.S. I'd restructure your polling of context cancelation by combining it in a select with your channel writes e.g.

for _, message := range result.Messages {

    select {
        case messages <- message:
        case <-ctx.Done():
            return
    }

}

Upvotes: 3

Related Questions