Ankit Kumar
Ankit Kumar

Reputation: 1463

gRPC How to wait for the headers from StreamingServer on client side

For a simple gRPC service defined by proto file:

service HelloService {
    rpc sayHello() returns (stream string)
}

How can i wait on the client side to first wait for the headers to arrive before i start to process the response from server? I tried using ClientInterceptor and override the onHeaders() method but it gets called after the call to sayHello() is completed already. How can i validate for a specific header inside the sayHello client and proceed with the call based on the validity of the header?

public class SomeHeaderInterceptor implements ClientInterceptor {
private static final String FULL_METHOD_NAME = "sayHello";
public static CallOptions.Key<String> someHeader = CallOptions.Key.of("some_header_active", "false");

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
                                                           CallOptions callOptions, Channel channel) {
    return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) {

        @Override
        public void start(Listener<RespT> responseListener, Metadata headers) {
            super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
                @Override
                public void onHeaders(Metadata headers) {
                    Metadata.Key<String> SAYHELLO_ACTIVE_HEADER = Metadata.Key.of("some_header_active",
                            Metadata.ASCII_STRING_MARSHALLER);
                    if (methodDescriptor.getFullMethodName().equals(FULL_METHOD_NAME)) {
                        if (!headers.containsKey(SAYHELLO_ACTIVE_HEADER)) {
                            LOGGER.logError("some_header activation missing from header: " + headers);
                        } else {
                            callOptions.withOption(someHeader, "true");
                            Context.current().withValue(Context.key("test"), "testvalue");
                        }
                    }

                    super.onHeaders(headers);
                }
            }, headers);
        }
    };
}

And the code for sayHello is as follows:

public Iterator<String> sayHello() {
Iterator<String> stream = blockingStub.sayHello();

// wait for the sayhello active header 
boolean isActive = Boolean.parseBoolean(blockingStub.getCallOptions().getOption(SomeHeaderInterceptor. someHeader));
System.out.println("the some_header header value is: " + isActive);
System.out.println("the context key : " + Context.key("test").get(Context.current()));

return stream;
}

In the sayHello() code above, it doesn't wait for the headers to arrive and context to be set in the onHeaders(). how can i do that? I only want to return the stream back to the client caller after i validate the presence of the some_header in the stream sent by the server.

Upvotes: 3

Views: 2123

Answers (1)

Eric Anderson
Eric Anderson

Reputation: 26464

The headers are sent by the server before the first message, so the easy way would be to call stream.hasNext(), which will block waiting on a message. In many RPCs the first message comes pretty soon after the response headers, so this would work reasonably well.

As an aside, I noticed you experimenting with CallOptions and Context:

callOptions.withOption(someHeader, "true");
Context.current().withValue(Context.key("test"), "testvalue");

Neither of those lines really does anything because both objects are immutable. The with* calls create a new instance, so you have to use the return value for the line to do anything. Also, CallOptions and Context predominantly pass information in the opposite direction, like from the client application to interceptors. To "reverse" the direction, the application needs to set up a value that is mutable, like AtomicReference or a callback function, and then the interceptor could interact with that value.

If there may be a noticeable delay between when the server responds with the headers and the first message, then things get more complex. The code calling sayHello():

CompletableFuture<Boolean> future = new CompletableFuture<>();
Iterator<String> stream = blockingStub
    .withOption(SomeHeaderInterceptor.SOME_HEADER, future)
    .sayHello();
// wait for the sayhello active header 
boolean isActive = future.get();

And then in the interceptor:

private static final String FULL_METHOD_NAME =
    //"helloworld.Greeter/SayHello";
    GreeterGrpc.getSayHelloMethod().getFullMethodName();
public static final CallOptions.Key<CompletableFuture<Boolean>> SOME_HEADER =
    CallOptions.Key.create("some_header_active", CompletableFuture.completedFuture(false));
private static final Metadata.Key<String> SAYHELLO_ACTIVE_HEADER =
    Metadata.Key.of("some_header_active", Metadata.ASCII_STRING_MARSHALLER);

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
                                                           CallOptions callOptions, Channel channel) {
    CompletableFuture<Boolean> future = callOptions.getOption(SOME_HEADER);
    if (!methodDescriptor.getFullMethodName().equals(FULL_METHOD_NAME)) {
        future.complete(false);
        return channel.newCall(methodDescriptor, callOptions);
    }
    return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) {

        @Override
        public void start(Listener<RespT> responseListener, Metadata headers) {
            super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
                @Override
                public void onHeaders(Metadata headers) {
                    if (!headers.containsKey(SAYHELLO_ACTIVE_HEADER)) {
                        // Could also be future.complete(false)
                        future.completeExceptionally(new Exception("some_header activation missing from header: " + headers));
                    } else {
                        future.complete(true);
                    }
                    super.onHeaders(headers);
                }

                @Override
                public void onClose(Status status, Metadata trailers) {
                    // onHeaders() might not have been called, especially if there was an error
                    if (!future.isDone()) {
                        future.completeExceptionally(status.asRuntimeException(trailers));
                    }
                    super.onClose(status, trailers);
                }
            }, headers);
        }
    };
}

If you only need to do validation, not actually delay, then you could avoid the delay and just let the interceptor fail the call if validation fails:

private static final String FULL_METHOD_NAME =
    //"helloworld.Greeter/SayHello";
    GreeterGrpc.getSayHelloMethod().getFullMethodName();
private static final Metadata.Key<String> SAYHELLO_ACTIVE_HEADER =
    Metadata.Key.of("some_header_active", Metadata.ASCII_STRING_MARSHALLER);

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
                                                           CallOptions callOptions, Channel channel) {
    if (!methodDescriptor.getFullMethodName().equals(FULL_METHOD_NAME)) {
        return channel.newCall(methodDescriptor, callOptions);
    }
    // We use context to cancel since it is thread-safe, whereas ClientCall.cancel is not
    CancellableContext context = Context.current().withCancellation();
    class ValidatingListener extends ForwardingClientCallListener<RespT> {
        private Listener<RespT> responseListener;

        public ValidatingListener(Listener<RespT> responseListener) {
            this.responseListener = responseListener;
        }

        @Override
        protected Listener<RespT> delegate() {
            return responseListener;
        }

        @Override
        public void onHeaders(Metadata headers) {
            if (!headers.containsKey(SAYHELLO_ACTIVE_HEADER)) {
                Listener<RespT> saved = responseListener;
                responseListener = new Listener<RespT>() {}; // noop listener; throw away future events
                Status status = Status.UNKNOWN.withDescription(
                        "some_header activation missing from header: " + headers);
                context.cancel(status.asRuntimeException());
                saved.onClose(status, new Metadata());
                return;
            }
            // Validation successful
            super.onHeaders(headers);
        }

        @Override
        public void onClose(Status status, Metadata trailers) {
            context.close();
        }
    }

    Context toRestore = context.attach();
    try {
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) {

            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                super.start(new ValidatingListener(responseListener), headers);
            }
        };
    } finally {
        context.detach(toRestore);
    }
}

Upvotes: 5

Related Questions