Reputation: 5746
I have a case where I need to fan out receivers that send to the same channel:
func MessagesFromSQS(ctx context.Context, sqsClient sqsiface.SQSAPI) chan *sqs.Message {
messages := make(chan *sqs.Message)
go func() {
defer close(messages)
wg := sync.WaitGroup{}
for i := 0; i < parallelSQSReaders; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
// ...
for _, message := range result.Messages {
messages <- message
}
}
}
}()
}
wg.Wait()
}()
return messages
}
To me this makes sense. However, the race detector complains that different goroutines and sending and closing the channel. I realize that the goroutine thats responsible for sending should be the same one that closes, but what's the correct way to do this?
EDIT/SOLVED: Thanks for your responses. It turns out I wasn't reading the race detector stack trace correctly. I assumed that the code I changed introduced the bug, instead of uncovering a bug in the SQS mock. Once I correctly synchronized ReceiveMessage()
it was fine.
Upvotes: 1
Views: 1778
Reputation: 22117
Close the channel when you know there won't be any more writes i.e. when all worker go-routines are complete.
So:
wg.Wait()
close(messages)
P.S. I'd restructure your polling of context cancelation by combining it in a select with your channel writes e.g.
for _, message := range result.Messages {
select {
case messages <- message:
case <-ctx.Done():
return
}
}
Upvotes: 3