horak90
horak90

Reputation: 125

Vert.x - streaming HTTP request body with rxJava blocks

I am attempting to stream a large HTTP request between my Vert.x services. For this purpose, I am using io.vertx.reactivex.ext.web.client.WebClient, which takes a Flowable as input for the rxSendStream method. The Flowable in question is based on a ReplaySubject populated with buffers generated by another method.

However, I have encountered an issue where the other service does not receive all the buffers. Specifically:

  1. The HTTP request appears to emit a certain number of buffers and then blocks, leaving the request incomplete.

  2. On the consumer side, fewer buffers are received than expected, and the process becomes stuck without completing the request.

I have tested the behavior with Flowable directly (without using ReplaySubject), and the result appears to be the same.

Could this issue be related to how WebClient handles backpressure? Is it possible that the backpressure handling mechanism is causing the request to block?

thanks for any help

I am using the following snipets of the code :

var subject = ReplaySubject.<Buffer>create();
var flowableStream = subject.toFlowable(BackpressureStrategy.BUFFER);


...


var request = webClient.put(SERVICE_PATH)
                    .putHeader(CONTENT_TYPE.toString(), APPLICATION_JSON);
            setOptionalParameters(httpServerRequest, request);

            return request.as(BodyCodec.buffer())
                    .expect(RESPONSE_CREATED_PREDICATE)
                    .rxSendStream(flowableStream)                                             
                    .ignoreElement()
                    .doOnComplete(() -> LOG.info("executed successfully"));


...

subject.onNext(...)

Upvotes: 0

Views: 11

Answers (0)

Related Questions