deamon
deamon

Reputation: 92437

How to connect a Subscriber with a reactor.core.publisher.Flux?

I want to connect a Subscriber with a Reactor Flux. However my little example doesn't produce any output:

public static void main(String[] args) throws InterruptedException {
    Flux.just("a", "b", "c")
            .subscribe(new BaseSubscriber<String>() {
                @Override
                protected void hookOnSubscribe(Subscription ignored) {
                }

                @Override
                protected void hookOnNext(String value) {
                    System.out.print(value);
                }
            });
    Thread.sleep(1000);
}

I tried the same with an org.reactivestreams.Subscriber:

    Flux.just("a", "b", "c")
            .subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                }

                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                }
            });

But again, no output.

If I try the simplest from:

Flux.just("a", "b", "c").subscribe(System.out::println);

... it produces the expected output:

a
b
c

How can I use a Subscriber to receive the values from the Flux?

Upvotes: 0

Views: 2347

Answers (1)

deamon
deamon

Reputation: 92437

The key is to request n more values from the source. Until that call nothing happens.

Example with BaseSubscriber:

Flux<String> source = Flux.just("a", "b", "c");
source.subscribe(new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1); // <-- here
            }

            @Override
            protected void hookOnNext(String value) {
                request(1); // <-- here
                System.out.println(value);
            }
        });

And the same with a standard Subscriber:

Flux<String> source = Flux.just("a", "b", "c");
source.subscribe(new Subscriber<String>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // <-- here
    }

    @Override
    public void onNext(String s) {
        subscription.request(1); // <-- here
        System.out.println(s);
    }

    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
    }
});

Upvotes: 3

Related Questions