Jim Green
Jim Green

Reputation: 1118

How to close grpc stream for server

From the grpc , the client could invoke CloseSend to close the stream to the server, But it seems that the server could not cut off the connection to the client.

Upvotes: 2

Views: 14946

Answers (3)

brettinternet
brettinternet

Reputation: 575

If you're trying to close the stream from the receiver then you can cancel the context passed to the stream.

ctx, cancel := context.WithCancel(ctx)
stream, err := client.StreamingRPC(ctx)
// do something on stream
cancel()

Sources:

Upvotes: 0

Jon Watte
Jon Watte

Reputation: 7228

The best way to solve this is to use three goroutines.

The problem is that the bidi handler will be blocked inside Recv(), and thus can't easily "return to the caller" when it wants to complete the stream.

Another problem is that Send() could block if network conditions are bad, or the other end is uncooperative (which is a very common reason why you'd want to terminate the connection in the first place!)

In general, you will also want to send messages to the sender from outside the receiver loop -- the typical case for using bidi streaming is to actually communicate with the rest of the world. For more insulated request/response patterns, the other modes are generally simpler to use. Allowing someone outside the synchronous send/receive loop to queue messages throws additional synchronization into the mix.

Thus, you can structure your code inside the Bidi handler as such:

  1. Create some waitable primitive -- an empty chan, a WaitGroup, or just a context with cancellation
  2. Start a goroutine that calls Recv() in a loop, and dispatches incoming requests
  3. Create a chan for outgoing messages, with some maximum depth
  4. Start another goroutine that dequeues from the outgoing chan, and calls Send()
  5. Block the incoming Bidi request handler on the waitable primitive

Now, when you want try to send to the sender chan, do so inside a select{}, which will go to default if the outgoing chan is full. You can use this as a signal that the other end should be disconnected, or just discard the message to avoid infinite buffering, depending on needs.

When wanting to disconnect the client from server side, you should unblock the waitable primitive that the bidi handler is blocking on. This will end up making the Send() and Recv() functions return with error, after which you can safely declare the connection "done."

There's also the regular case of the remote client closing, and the Recv() call returning error in which case you also need to shut this down.

If you want to actually close the chans involved, and support context cancellation in addition to disconnection of unresponsive clients, you will end up having to synchronize the chans with additional mutexes, to avoid accidentally sending on a closed chan -- there's no way to safely do that otherwise, because the Send()/Recv() functions don't unblock until the bidi handler has already returned. (If there are no external senders, all enqueuing to the sender happens from the receiving goroutine, this is not a problem. That's only true for very simple servers, though.)

Somewhat pseudo-code-ish:

func (m *MyThing) MyBidiServer(stream somepb.Thing_ThingServer) {
    can := make(chan struct{})
    thingRunner := &ThingRunner{
        cancel: can,
        send: make(chan *somepb.OutgoingMessage, 100),
    }
    m.addRunner(thingRunner) // assume this is mutexed
    defer m.removeRunner(thingRunner)
    go thingRunner.recvLoop(stream)
    go thingRunner.sendLoop(stream)
    // could be select here if you also have a context
    <-can
}

func (t *ThingRunner) recvLoop(stream somepb.Thing_ThingServer) {
    for {
        msg, err := stream.Recv()
        if err != nil {
            break
        }
    }
    t.cancelSafely() // in case client disconnected
}

func (t *ThingRunner) sendLoop(stream somepb.Thing_ThingServer) {
    for msg := range t.send {
        if err := stream.Send(msg); err != nil {
            break
        }
    }
    t.cancelSafely() // in case of network error
}

func (t *ThingRunner) SendMessage(ctx context.Context, msg *somepb.Message) error {
    if t.isCanceled() {
        return ThingClosedError
    }
    select {
        case t.send <- msg:
            // all is well
        case <-ctx.Done():
            return ctx.Err()
        default:
            // the client is too slow -- disconnect it
            t.cancelSafely()
            return ClientTooSlowError
    }
    return nil
}

There's a number of little management bits you'll need to take care of as well, but this is the gist of it. cancelSafely() needs to unblock the cancel chan, but it needs to also not cause future writes on t.send to panic, because sending on a closed chan will unconditionally panic.

Upvotes: 3

Piotr Kowalczuk
Piotr Kowalczuk

Reputation: 399

First of all, you should not think about gRPC in terms of connections. ServerStream will stop to send data as soon as handler returns an error. There is no need on the server-side to have a function like CloseSend.

If you want to execute some logic after return, „just” spawn a goroutine.

Upvotes: 1

Related Questions