Reputation: 1118
From the grpc , the client could invoke CloseSend to close the stream to the server, But it seems that the server could not cut off the connection to the client.
Upvotes: 2
Views: 14946
Reputation: 575
If you're trying to close the stream from the receiver then you can cancel the context passed to the stream.
ctx, cancel := context.WithCancel(ctx)
stream, err := client.StreamingRPC(ctx)
// do something on stream
cancel()
Sources:
Upvotes: 0
Reputation: 7228
The best way to solve this is to use three goroutines.
The problem is that the bidi handler will be blocked inside Recv(), and thus can't easily "return to the caller" when it wants to complete the stream.
Another problem is that Send() could block if network conditions are bad, or the other end is uncooperative (which is a very common reason why you'd want to terminate the connection in the first place!)
In general, you will also want to send messages to the sender from outside the receiver loop -- the typical case for using bidi streaming is to actually communicate with the rest of the world. For more insulated request/response patterns, the other modes are generally simpler to use. Allowing someone outside the synchronous send/receive loop to queue messages throws additional synchronization into the mix.
Thus, you can structure your code inside the Bidi handler as such:
Now, when you want try to send to the sender chan, do so inside a select{}
, which will go to default
if the outgoing chan is full. You can use this as a signal that the other end should be disconnected, or just discard the message to avoid infinite buffering, depending on needs.
When wanting to disconnect the client from server side, you should unblock the waitable primitive that the bidi handler is blocking on. This will end up making the Send() and Recv() functions return with error, after which you can safely declare the connection "done."
There's also the regular case of the remote client closing, and the Recv() call returning error in which case you also need to shut this down.
If you want to actually close the chans involved, and support context cancellation in addition to disconnection of unresponsive clients, you will end up having to synchronize the chans with additional mutexes, to avoid accidentally sending on a closed chan -- there's no way to safely do that otherwise, because the Send()/Recv() functions don't unblock until the bidi handler has already returned. (If there are no external senders, all enqueuing to the sender happens from the receiving goroutine, this is not a problem. That's only true for very simple servers, though.)
Somewhat pseudo-code-ish:
func (m *MyThing) MyBidiServer(stream somepb.Thing_ThingServer) {
can := make(chan struct{})
thingRunner := &ThingRunner{
cancel: can,
send: make(chan *somepb.OutgoingMessage, 100),
}
m.addRunner(thingRunner) // assume this is mutexed
defer m.removeRunner(thingRunner)
go thingRunner.recvLoop(stream)
go thingRunner.sendLoop(stream)
// could be select here if you also have a context
<-can
}
func (t *ThingRunner) recvLoop(stream somepb.Thing_ThingServer) {
for {
msg, err := stream.Recv()
if err != nil {
break
}
}
t.cancelSafely() // in case client disconnected
}
func (t *ThingRunner) sendLoop(stream somepb.Thing_ThingServer) {
for msg := range t.send {
if err := stream.Send(msg); err != nil {
break
}
}
t.cancelSafely() // in case of network error
}
func (t *ThingRunner) SendMessage(ctx context.Context, msg *somepb.Message) error {
if t.isCanceled() {
return ThingClosedError
}
select {
case t.send <- msg:
// all is well
case <-ctx.Done():
return ctx.Err()
default:
// the client is too slow -- disconnect it
t.cancelSafely()
return ClientTooSlowError
}
return nil
}
There's a number of little management bits you'll need to take care of as well, but this is the gist of it. cancelSafely() needs to unblock the cancel chan, but it needs to also not cause future writes on t.send to panic, because sending on a closed chan will unconditionally panic.
Upvotes: 3
Reputation: 399
First of all, you should not think about gRPC in terms of connections.
ServerStream
will stop to send data as soon as handler returns an error
. There is no need on the server-side to have a function like CloseSend
.
If you want to execute some logic after return, „just” spawn a goroutine.
Upvotes: 1