Reputation: 3242
In go grpc service I have a receiver(publisher) event loop, and publisher can detect that it wants sender to stop. But channel principles says that we should not close channels on receiver side, only on sender side. How should it be threated?
Situation is like following. Imagine a chat. 1st client - subscriber - receives message, and its streaming cannot be done without goroutine due to grpc limitations. And 2nd client - publisher is sending a message to chat, so its another goroutine. You have to pass a message from publisher to subscriber receiving client, ONLY if subscriber not closed its connection (forces closing a channel from receiver side)
The problem in code:
//1st client goroutine - subscriber
func (s *GRPCServer) WatchMessageServer(req *WatchMessageRequest, stream ExampleService_WatchMessageServer) error {
ch := s.NewClientChannel()
// natively blocks goroutine with send to its stream, until send gets an error
for {
msg, ok := <-ch
if !ok {
return nil
}
err := stream.Send(msg) // if this fails - we need to close ch from receiver side to "propagate" closing signal
if err != nil {
return err
}
}
}
//2nd client goroutine - publisher
func (s *GRPCServer) SendMessage(ctx context.Context, req *SendMessageRequest) (*emptypb.Empty, error) {
for i := range s.clientChannels {
s.clientChannels[i] <- req
// no way other than panic, to know when to remove channel from list. or need to make a wrapper with recover..
}
return nil
}
Upvotes: 0
Views: 2046
Reputation: 3242
I've initially got a clue by searhing, and solution idea was provided in an answer here thanks to that answer.
Providing streaming solution sample code, i guess its an implementation for a generic pub-sub problem:
//1st client goroutine - subscriber
func (s *GRPCServer) WatchMessageServer(req *WatchMessageRequest, stream ExampleService_WatchMessageServer) error {
s.AddClientToBroadcastList(stream)
select {
case <-stream.Context().Done(): // stackoverflow promised that it would signal when client closes stream
return stream.Context().Err() // stream will be closed immediately after return
case <-s.ctx.Done(): // program shutdown
return s.ctx.Err()
}
}
//2nd client goroutine - publisher
func (s *GRPCServer) SendMessage(ctx context.Context, req *SendMessageRequest) (*emptypb.Empty, error) {
for i := range s.clientStreams {
err := s.clientStreams.Send(req)
if err != nil {
s.RemoveClientFromBroadcastList(s.clientStreams[i])
}
}
return nil
}
Upvotes: 0