Reputation: 678
I'm trying make a reactive application that listens to a network socket on a separate thread for prices and got a bit stumped with how exactly to construct the Observable
. Much of the interfaces I have are constrained by the API I am using and therefore cannot change. I distilled what I am trying to do as a test below, but I can't see how to fill in the body of the getPriceReactive()
method such that the prices are printed on the console by the subscriber (see the comment in the code).
public class PriceObservableTest {
// This interface is defined externally and used by the API
private interface ITickHandler {
void priceReceived(double price);
}
// Stores the price (currently just one double for illustration)
private class Tick {
double price = Double.NaN;
}
// Implementation of handler called by API when it receives a price
private class TickHandler implements ITickHandler {
private final Tick tick;
TickHandler() { this.tick = new Tick(); }
@Override public void priceReceived(double x) { tick.price = x; }
}
// This class emulates the API delivering prices from the socket
private class PriceSource {
private final Thread thread;
PriceSource(final ITickHandler handler) {
thread = new Thread(new Runnable() {
final Random r = new Random();
@Override public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(100);
handler.priceReceived(r.nextDouble() * 100);
} catch (InterruptedException e) {
break;
}
}
System.out.println("Price thread closed");
}
});
}
void subscribe() { thread.start(); }
void unsubscribe() { thread.interrupt(); }
}
@Test
public void simpleTest() throws Exception {
final ITickHandler handler = new TickHandler();
// Simulate some prices received periodically from a socket
PriceSource prices = new PriceSource(handler);
Observable<Tick> reactive = getPriceReactive(handler);
reactive.subscribe(new Subscriber<Tick>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(Tick tick) {
System.out.println("Received price: " + tick.price);
}});
// Observe prices for 1 second. The subscriber should print them to console
prices.subscribe();
Thread.sleep(1000);
prices.unsubscribe();
}
// Returns an observable that reacts to price changes
private Observable<Tick> getPriceReactive(ITickHandler handler) {
return Observable.create(new Observable.OnSubscribe<Tick>() {
@Override public void call(Subscriber<? super Tick> subscriber) {
// How to call subscriber.onNext() whenever
// priceReceived() is called with a new price?
}
});
}
}
Somehow subscriber.onNext()
needs to be called whenever the API calls priceReceived()
, but I can't quite see how to achieve this. Of course I could store a reference to the subscriber in the TickHandler
but this kind of defeats the purpose of having an Observable
, doesn't it?
Upvotes: 2
Views: 572
Reputation: 3488
Transition to Observable
in ITickHandler
implementation. You are not controlling the subscriber(s) but the publisher
private class TickHandler implements ITickHandler {
private final Tick tick;
private final PublishSubject<Tick> priceSubject;
TickHandler() {
this.tick = new Tick();
this.priceSubject = PublishSubject.create();
}
@Override public void priceReceived(double x)
{
tick.price = x;
priceSubject.onNext(tick);
}
public Observable<Tick> priceReceivedObservable()
{
return priceSubject.asObservable();
}
}
And you can use it in your tests like:
final ITickHandler handler = new TickHandler();
PriceSource prices = new PriceSource(handler);
handler.priceReceivedObservable()
.subscribe(new Subscriber<Tick>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(Tick tick) {
System.out.println("Received price: " + tick.price);
}});
I warn you, it's not tested since I don't do a lot of Java :)
Upvotes: 2