ScarletPumpernickel
ScarletPumpernickel

Reputation: 678

Binding an API callback to an RxJava Observable

I'm trying make a reactive application that listens to a network socket on a separate thread for prices and got a bit stumped with how exactly to construct the Observable. Much of the interfaces I have are constrained by the API I am using and therefore cannot change. I distilled what I am trying to do as a test below, but I can't see how to fill in the body of the getPriceReactive() method such that the prices are printed on the console by the subscriber (see the comment in the code).

public class PriceObservableTest {

   // This interface is defined externally and used by the API
   private interface ITickHandler {
       void priceReceived(double price);
   }

   // Stores the price (currently just one double for illustration)
   private class Tick {
       double price = Double.NaN;
   }

   // Implementation of handler called by API when it receives a price
   private class TickHandler implements ITickHandler {
       private final Tick tick;

       TickHandler() { this.tick = new Tick(); }

       @Override public void priceReceived(double x) { tick.price = x; }
   }

   // This class emulates the API delivering prices from the socket
   private class PriceSource {
      private final Thread thread;

      PriceSource(final ITickHandler handler) {
          thread = new Thread(new Runnable() {
              final Random r = new Random();
              @Override public void run() {
                  while (!Thread.currentThread().isInterrupted()) {
                      try {
                          Thread.sleep(100);
                          handler.priceReceived(r.nextDouble() * 100);
                      } catch (InterruptedException e) {
                          break;
                      }
                  }
                  System.out.println("Price thread closed");
              }
         });
      }

      void subscribe() { thread.start(); }

      void unsubscribe() { thread.interrupt(); }
  }

  @Test
  public void simpleTest() throws Exception {

      final ITickHandler handler = new TickHandler();

      // Simulate some prices received periodically from a socket
      PriceSource prices = new PriceSource(handler);

      Observable<Tick> reactive = getPriceReactive(handler);

      reactive.subscribe(new Subscriber<Tick>() {
          @Override public void onCompleted() { }
          @Override public void onError(Throwable e) { }
          @Override public void onNext(Tick tick) {
              System.out.println("Received price: " + tick.price);
          }});

      // Observe prices for 1 second. The subscriber should print them to console
      prices.subscribe();
      Thread.sleep(1000); 
      prices.unsubscribe();
   }

   // Returns an observable that reacts to price changes
   private Observable<Tick> getPriceReactive(ITickHandler handler) {
       return Observable.create(new Observable.OnSubscribe<Tick>() {
           @Override public void call(Subscriber<? super Tick> subscriber) {

              // How to call subscriber.onNext() whenever
              // priceReceived() is called with a new price?

           }
       });
   }
}

Somehow subscriber.onNext() needs to be called whenever the API calls priceReceived(), but I can't quite see how to achieve this. Of course I could store a reference to the subscriber in the TickHandler but this kind of defeats the purpose of having an Observable, doesn't it?

Upvotes: 2

Views: 572

Answers (1)

supertopi
supertopi

Reputation: 3488

Transition to Observable in ITickHandler implementation. You are not controlling the subscriber(s) but the publisher

private class TickHandler implements ITickHandler {
   private final Tick tick;
   private final PublishSubject<Tick> priceSubject;

  TickHandler() { 
       this.tick = new Tick(); 
       this.priceSubject = PublishSubject.create();
   }

   @Override public void priceReceived(double x)
   { 
        tick.price = x; 
        priceSubject.onNext(tick);
   }

   public Observable<Tick> priceReceivedObservable()
   {
       return priceSubject.asObservable();   
   }
}

And you can use it in your tests like:

final ITickHandler handler = new TickHandler();
PriceSource prices = new PriceSource(handler);

handler.priceReceivedObservable()
       .subscribe(new Subscriber<Tick>() {
          @Override public void onCompleted() { }
          @Override public void onError(Throwable e) { }
          @Override public void onNext(Tick tick) {
              System.out.println("Received price: " + tick.price);
          }});

I warn you, it's not tested since I don't do a lot of Java :)

Upvotes: 2

Related Questions