Kirill
Kirill

Reputation: 8311

Reactive streams publisher from Vertx webclient response body

I'm trying to write a wrapper for the Vert.x web-client to load response body from server using Publisher from reactivestreams:

import org.reactivestreams.Publisher;
import io.vertx.reactivex.ext.web.client.WebClient;

interface Storage {
  Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
  private final WebClient client;

  public WebStorage(final WebClient client) {
    this.client = client;
  }

  @Override
  public Publisher<ByteBuffer> load(final String key) {
    return client.get(String.format("https://myhost/path?query=%s", key))
      .rxSend()
      .toFlowable()
      .map(resp -> ByteBuffer.wrap(resp.body().getBytes()));
  }
}

This solution is not correct, since it's reading all body bytes in a blocking way with getBytes() call.

Is it possible to read response from Vert.x WebClient by chunks and convert it to Publisher (or Rx Flowable)?

Upvotes: 0

Views: 1530

Answers (2)

Sammers
Sammers

Reputation: 1098

I guess you can use ByteCodec.pipe:

import io.reactivex.Flowable;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.streams.WriteStream;
import io.vertx.reactivex.ext.web.client.WebClient;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import org.reactivestreams.Publisher;

import java.nio.ByteBuffer;

interface Storage {
    Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
    private final Vertx vertx = Vertx.vertx();
    private final WebClient client;

    public WebStorage(final WebClient client) {
        this.client = client;
    }

    @Override
    public Publisher<ByteBuffer> load(final String key) {
        final ReactiveWriteStream<Buffer> stream = ReactiveWriteStream.writeStream(vertx.getDelegate());
        client.get(String.format("https://myhost/path?query=%s", key))
            .as(BodyCodec.pipe(WriteStream.newInstance(stream)))
            .rxSend().subscribe();
        return Flowable.fromPublisher(stream).map(buffer -> ByteBuffer.wrap(buffer.getBytes()));
    }
}

Upvotes: 1

tsegismont
tsegismont

Reputation: 9128

The Vert.x Web client is not designed to stream the response body. It buffers content by design.

If you want to stream the content, you can use the underlying HTTP client which is more flexible.

Upvotes: 2

Related Questions