rezamm
rezamm

Reputation: 71

How to close all grpc server streams using gracefulStop?

I'm trying to stop all clients connected to a stream server from server side. Actually I'm using GracefulStop method to handle it gracefully.

I am waiting for os.Interrupt signal on a channel to perform a graceful stop for gRPC. but it gets stuck on server.GracefulStop() when the client is connected.

func (s *Service) Subscribe(_ *empty.Empty, srv clientapi.ClientApi_SubscribeServer) error {
    ctx := srv.Context()

    updateCh := make(chan *clientapi.Update, 100)
    stopCh := make(chan bool)
    defer func() {
        stopCh<-true
        close(updateCh)
    }

    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer func() {
            ticker.Stop()
            close(stopCh)
        }
        for {
            select {
            case <-stopCh:
                return
            case <-ticker.C:
                updateCh<- &clientapi.Update{Name: "notification": Payload: "sample notification every 1 second"}
            }
        }
    }()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()

        case notif := <-updateCh:
            err := srv.Send(notif)
            if err == io.EOF {
                return nil
            }

            if err != nil {
                s.logger.Named("Subscribe").Error("error", zap.Error(err))
                continue
            }
        }
    }
}

I expected the context in method ctx.Done() could handle it and break the for loop. How to close all response streams like this one?

Upvotes: 3

Views: 12164

Answers (2)

Doug Fawley
Doug Fawley

Reputation: 1111

Typically clients need to assume that RPCs can terminate (e.g. due to connection errors or server power failure) at any moment. So what we do is GracefulStop, sleep for a short time period to allow in-flight RPCs an opportunity to complete naturally, then hard-Stop the server. If you do need to use this termination signal to end your RPCs, then the answer by @colminator is probably the best choice. But this situation should be unusual, and you may want to spend some time analyzing your design if you do find it is necessary to manually end streaming RPCs at server shutdown.

Upvotes: 1

colm.anseo
colm.anseo

Reputation: 22057

Create a global context for your gRPC service. So walking through the various pieces:

  • Each gRPC service request would use this context (along with the client context) to fulfill that request
  • os.Interrupt handler would cancel the global context; thus canceling any currently running requests
  • finally issue server.GracefulStop() - which should wait for all the active gRPC calls to finish up (if they haven't see the cancelation immediately)

So for example, when setting up the gRPC service:

pctx := context.Background()
globalCtx, globalCancel := context.WithCancel(pctx)

mysrv := MyService{
    gctx: globalCtx
}

s := grpc.NewServer()
pb.RegisterMyService(s, mysrv)

os.Interrupt handler initiates and waits for shutdown:

globalCancel()
server.GracefulStop()

gRPC methods:

func(s *MyService) SomeRpcMethod(ctx context.Context, req *pb.Request) error {

    // merge client and server contexts into one `mctx`
    // (client context will cancel if client disconnects)
    // (server context will cancel if service Ctrl-C'ed)

    mctx, mcancel := mergeContext(ctx, s.gctx)

    defer mcancel() // so we don't leak, if neither client or server context cancels

    // RPC WORK GOES HERE
    // RPC WORK GOES HERE
    // RPC WORK GOES HERE

    // pass mctx to any blocking calls:
    // - http REST calls
    // - SQL queries etc.
    // - or if running a long loop; status check the context occasionally like so:

    // Example long request (10s)
    for i:=0; i<10*1000; i++ {
        time.Sleep(1*time.Milliscond)

        // poll merged context
        select {
            case <-mctx.Done():
                return fmt.Errorf("request canceled: %s", mctx.Err())
            default:
        }
    }
}

And:

func mergeContext(a, b context.Context) (context.Context, context.CancelFunc) {
    mctx, mcancel := context.WithCancel(a) // will cancel if `a` cancels

    go func() {
        select {
        case <-mctx.Done(): // don't leak go-routine on clean gRPC run
        case <-b.Done():
            mcancel() // b canceled, so cancel mctx 
        }
    }()

    return mctx, mcancel
}

Upvotes: 8

Related Questions