Reputation: 4977
I am using rxjava for maintenance tasks in the following way:
In a class where I need regular maintenance, I use the following static subscription, which results in the Observable being launched for the first time when the class is loaded into memory, and then in regular intervals as specified.
private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code
return Observable.just(null);
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe();
Now I have a situation, where I want to report the result of my maintenance to the UI.
Normally, I would use the following schema
Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
// some code
subscriber.onNext(result);
subscriber.onCompleted(); }
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override public void call(String result) {
// write to the UI
}
});
but here we have an Observable which is executed just once.
For an Observable which is executed in regular intervals, I couldn't find a way to call Action in Subscriber, so that I can pass the result using subscriber.onNext(). It looks like there is no suitable signature for the Observable, which can take the long from timer() and at the same time allows subscribe with action. But knowing rxjava I am sure there is a trick ;-)
I could make this work using zip to zip a Timer Observable and a one-time Observable (basically zipping the two versions together), but I would rather use the first structure because it behaves slightly different.
--
I tried to merge both versions into one in the following manner:
private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code // stays here to ensure there is no concurrency while executing
final String result = "result"; // I store the result in a final variable after some code has been finished
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(result); // then I use it in a new Observable and emit it
subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet)
}
});
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe(new Action1<String>() {
@Override public void call(String result) {
// so I can finally consume the result on the UI thread
}
});
Instead of creating and emitting a "null" Observable, I create one which allows me to send a result to the subscriber.
Pretty messy, but this should work, right? Any simpler solution? What are your thoughts?
Upvotes: 0
Views: 5409
Reputation: 311
I know it was since a while to add an answer to this question. However, it may help someone else. As i understand that you want to do create an observable and run it for example each 3 seconds. so please find my example below (You may emit a ready made operator like Observable.just or even create your own observable using Observable.create operator ):
Observable.interval(3,TimeUnit.SECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Long aLong) throws Exception {
return Observable.just("Hello " + Thread.currentThread().getName()+aLong);
/*
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try{
String data = fetchData("http://www.yahoo.com");
emitter.onNext(data.length()+"");
emitter.onComplete();
}catch (Exception e){
emitter.onError(e);
}
}
});
*/
}
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("RxResult","-- " + Thread.currentThread().getName() + " -- " +s);
}
});
public String fetchData(String url) throws IOException {
BufferedReader in = new BufferedReader(new InputStreamReader(new URL(url).openStream()));
String inputLine;
String result = "";
while((inputLine = in.readLine()) != null){
result += inputLine;
}
return result.length()+"";
}
Upvotes: 0
Reputation: 11515
I'm not shure to understand your problem.
You want to use the same subscription from your Observable Timer and your other Observable ?
Maybe you can use a Subject to act as a bridge between your Observables.
Subject<?, ?> bridge = PublishSubject.create();
Observable.timer(5, SECONDS).flatMap(/* whatever*/).mergeWith(bridge).subscribe(/* toDo */);
Observable.create(/* subscription */).subscribe(bridge);
Each item emit by your "One time Observable" will be pushed into the brige, that will push it into the "timer" observable.
Is that what you're looking for ?
Upvotes: 1