user26132960
user26132960

Reputation: 1

Quarkus Multi from EventBus consumer

There are many clients which establish grpc connections to the server and waiting for the commands. Server itself receives these commands from different sources. How to redirect these commands from incoming sources to opened grpc channels as a stream ?

I managed to do it but without streaming (just using Uni, example is below). However in this case a grpc channel is closed after every command send from server to the client and client needs to re-establish a connection to the server. Is there a way to do similar using Quarkus,smallrye Multi and stream commands to clients as soon as they come from outside ?

@GrpcService
public class GrpcServiceExampleImpl implements GrpcServiceExample {

    @Inject
    EventBus eventBus;

    public Uni<Response> registerClient(Request request) {
        return Uni.createFrom().<CommandRequest>emitter(emitter -> {
                    MessageConsumer<Request> consumer = eventBus.localConsumer("command " + request.getId());
                    consumer.handler(m -> {
                        m.reply(new Response("Command Acknowledged"));
                        emitter.complete(m.body());
                        consumer.unregisterAndForget();
                    });
                })
                .onItem().transform(req -> new Response(req.getMessage()));
    }

    @Override
    public Uni<Response> sendCommand(Request request) {
        Uni<Message<Response>> uni = eventBus.<Response>request("command " + request.getId(), request);
        return uni.onItem().transform(Message::body);
    }
}

Upvotes: 0

Views: 87

Answers (0)

Related Questions