xakepp35
xakepp35

Reputation: 3242

How to solve pub-sub problem in go and grpc?

In go grpc service I have a receiver(publisher) event loop, and publisher can detect that it wants sender to stop. But channel principles says that we should not close channels on receiver side, only on sender side. How should it be threated?

Situation is like following. Imagine a chat. 1st client - subscriber - receives message, and its streaming cannot be done without goroutine due to grpc limitations. And 2nd client - publisher is sending a message to chat, so its another goroutine. You have to pass a message from publisher to subscriber receiving client, ONLY if subscriber not closed its connection (forces closing a channel from receiver side)

The problem in code:

//1st client goroutine - subscriber
func (s *GRPCServer) WatchMessageServer(req *WatchMessageRequest, stream ExampleService_WatchMessageServer) error {
    ch := s.NewClientChannel()
    // natively blocks goroutine with send to its stream, until send gets an error
    for {
        msg, ok := <-ch
        if !ok {
            return nil
        }
        err := stream.Send(msg) // if this fails - we need to close ch from receiver side to "propagate" closing signal
        if err != nil {
            return err
        }
    }
}

//2nd client goroutine - publisher
func (s *GRPCServer) SendMessage(ctx context.Context, req *SendMessageRequest) (*emptypb.Empty, error) {
    for i := range s.clientChannels {
        s.clientChannels[i] <- req
        // no way other than panic, to know when to remove channel from list. or need to make a wrapper with recover..
    }
    return nil
}

Upvotes: 0

Views: 2046

Answers (1)

xakepp35
xakepp35

Reputation: 3242

I've initially got a clue by searhing, and solution idea was provided in an answer here thanks to that answer.

Providing streaming solution sample code, i guess its an implementation for a generic pub-sub problem:

//1st client goroutine - subscriber
func (s *GRPCServer) WatchMessageServer(req *WatchMessageRequest, stream ExampleService_WatchMessageServer) error {
    s.AddClientToBroadcastList(stream)
    select {
    case <-stream.Context().Done(): // stackoverflow promised that it would signal when client closes stream
        return stream.Context().Err() // stream will be closed immediately after return
    case <-s.ctx.Done(): // program shutdown
        return s.ctx.Err()
    }
}

//2nd client goroutine - publisher
func (s *GRPCServer) SendMessage(ctx context.Context, req *SendMessageRequest) (*emptypb.Empty, error) {
    for i := range s.clientStreams {
        err := s.clientStreams.Send(req)
        if err != nil {
            s.RemoveClientFromBroadcastList(s.clientStreams[i])
        }
    }
    return nil
}

Upvotes: 0

Related Questions