nuvio
nuvio

Reputation: 2625

Receiving items from reactive stream SubmissionPublisher

I was trying out some of the new features in Java 9. So I put together a test to have a publisher, emitting numbers at a given rate. I also implemented a Subscriber to listen to those publications and just print them to console.

Although I might not fully understand how to use this Api, because the onNext() method is not printing anything and getLastItem() only returns 0.

The only part that seems to work is the onSubscribe() which correctly initialises the lastItem variable.

@Test
public void testReactiveStreams(){
    //Create Publisher
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    //Register Subscriber
    TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
    publisher.subscribe(subscriber);

    assertTrue(publisher.hasSubscribers());

    //Publish items
    System.out.println("Publishing Items...");

    List.of(1,2,3,4,5).stream().forEach(i -> {
        publisher.submit(i);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // blah
        }
    });
    assertEquals(5, subscriber.getLastItem());

    publisher.close();
}


private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {

    private int lastItem;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscribed");
        lastItem = 0;
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received : "+item);
        lastItem += 1; // expect increment by 1
        assertTrue(lastItem == item);
    }

    @Override
    public void onError(Throwable throwable) {
        // nothing for the moment
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
    }

    public int getLastItem(){
        return lastItem;
    }
}

Can someone tell me what am I doing wrong in my test please? I would expect the test to print those numbers and return 5 as last item.

I have to say I am only using Observables and Subjects in Angular2, although they seems easier to understand.

Upvotes: 5

Views: 1202

Answers (2)

Nicolai Parlog
Nicolai Parlog

Reputation: 51040

The Flow API implements a feature called backpressure (explained here in the context of RxJava), which means the publisher should not be able to overwhelm the subscriber by publishing items faster than it can process them. The way JDK 9 implements that is by having the subscriber request items from the subscription.

For your test, the TestIntegerSubscriber should request items onSubscription, let's say 10, and keep track of how often onNext has been called, so it can request more once the 10 items were pushed.

I wrote a section about the Flow API that goes into a little more detail. It also describes the interaction between publisher, subscriber, and subscription:

  1. Create a Publisher and a Subscriber.
  2. Subscribe the subscriber with Publisher::subscribe.
  3. The publisher creates a Subscription and calls Subscriber::onSubscription with it so the subscriber can store the subscription.
  4. At some point the subscriber calls Subscription::request to request a number of items.
  5. The publisher starts handing items to the subscriber by calling Subscriber::onNext. It will never publish more than the requested number of items.
  6. The publisher might at some point be depleted or run into trouble and call Subscriber::onComplete or Subscriber::onError, respectively.
  7. The subscriber might either continue to request more items every now and then or cut the connection by calling Subscription::cancel.

Upvotes: 4

nuvio
nuvio

Reputation: 2625

I have done some more research and I can see the code can be made to work by adding a Subscription member in the Subscriber implementation as follows:

public class MySubscriber<T> implements Subscriber<T> {  
  private Subscription subscription;  

...

@Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscribed");
            subscription.request(1);
            lastItem = 0;
        }

...

  @Override
        public void onNext(Integer item) {
            System.out.println("Received : "+item);
            lastItem += 1; // expect increment by 1
            assertTrue(lastItem == item);
            subscription.request(1);
        }

where the subscription.request(1); I think it means I am consuming 1 item from the publisher. I don't know...actually would be good if someone clarifies it to me.

Upvotes: 0

Related Questions