Reputation: 10267
I'm wondering how to transform an observable similarly to switchMap but instead of limiting to single active stream have multiple (limited) streams.
The purpose is to have multiple tasks working concurrently up to some tasks count limit, and allow new tasks to start with FIFO queue strategy, meaning any new task arrive will start immediately and the oldest task in queue will be canceled.
switchMap will create Observable for each emission of the source and will cancel previous running Observable stream once new one created, I want to achieve something similar but allow concurrency with some level (like flatMap), meaning allowing number of Observables to be created for each emission, and run concurrently up to some concurrency limit, when the concurrency limit is reached, the oldest observable will be cancel and the new one will started.
Actually, This is also similar to flatMap with maxConcurrent, but instead of new Observables waiting in queue when maxConcurrent is reached, cancel the older Observables and enter the new one immediately.
Upvotes: 2
Views: 526
Reputation: 4820
Though a ready made solution is unavailable, something like below should assist.
public static void main(String[] args) {
Observable.create(subscriber -> {
for (int i = 0; i < 5; i++) {
Observable.timer(i, TimeUnit.SECONDS).toBlocking().subscribe();
subscriber.onNext(i);
}
})
.switchMap(
n -> {
System.out.println("Main task emitted event - " + n);
return Observable.interval(1, TimeUnit.SECONDS).take((int) n * 3)
.doOnUnsubscribe(() -> System.out.println("Unsubscribed for main task event - "+ n));
}).subscribe(n2 -> System.out.println("\t" + n2));
Observable.timer(20, TimeUnit.SECONDS).toBlocking().subscribe();
}
Observable.create
section creates a slow producer which emits items in a fashion of emit 0, sleep for 1s and emit 1, sleep for 2s and emit 2 and so on.
switchMap
creates Observable
objects for each element which emits numbers every second. You also can note that it prints a line every time an element is emitted by the main Observable
and also when it is unsubscribed.
Thus, probably in your case, you might be interested to close the oldest task with doOnUnsubscribe
. Hope it helps.
Below pseudo code might help better in understanding.
getTaskObservable()
.switchMap(
task -> {
System.out.println("Main task emitted event - " + task);
return Observable.create(subscriber -> {
initiateTaskAndNotify(task, subscriber);
}).doOnUnsubscribe(() -> checkAndKillIfMaxConcurrentTasksReached(task));
}).subscribe(value -> System.out.println("Done with task and got output" + value));
Upvotes: 0
Reputation: 69997
You could try with this transformer:
public static <T, R> Observable.Transformer<T, R> switchFlatMap(
int n, Func1<T, Observable<R>> mapper) {
return f ->
Observable.defer(() -> {
final AtomicInteger ingress = new AtomicInteger();
final Subject<Integer, Integer> cancel =
PublishSubject.<Integer>create().toSerialized();
return f.flatMap(v -> {
int id = ingress.getAndIncrement();
Observable<R> o = mapper.call(v)
.takeUntil(cancel.filter(e -> e == id + n));
cancel.onNext(id);
return o;
});
})
;
}
The demonstration:
public static void main(String[] args) {
PublishSubject<Integer> ps = PublishSubject.create();
@SuppressWarnings("unchecked")
PublishSubject<Integer>[] pss = new PublishSubject[3];
for (int i = 0; i < pss.length; i++) {
pss[i] = PublishSubject.create();
}
AssertableSubscriber<Integer> ts = ps
.compose(switchFlatMap(2, v -> pss[v]))
.test();
ps.onNext(0);
ps.onNext(1);
pss[0].onNext(1);
pss[0].onNext(2);
pss[0].onNext(3);
pss[1].onNext(10);
pss[1].onNext(11);
pss[1].onNext(12);
ps.onNext(2);
pss[0].onNext(4);
pss[2].onNext(20);
pss[2].onNext(21);
pss[2].onNext(22);
pss[1].onCompleted();
pss[2].onCompleted();
ps.onCompleted();
ts.assertResult(1, 2, 3, 10, 11, 12, 20, 21, 22);
}
Upvotes: 3