Reputation: 313
I am currently trying to figure out a way to pause an observable. After looking at blesh's post on github https://github.com/ReactiveX/rxjs/issues/1542 I think I am on the right track. But for some reason the takeWhile() on my observer is being ignored once I perform a switchMap() to it from my pauser Subject.
This ends correctly:
export class CompositionService {
cursor = -1;
pauser = new Subject();
interval;
init = (slides) => {
let waitUntil = 0;
return this.interval = Observable
.range(0, slides.length)
.mergeMap((i) => {
let next = Observable.of(i).delay(waitUntil);
waitUntil += !!slides[i]["duration"] ? slides[i]["duration"] : 0;
return next;
})
.scan((cursor) => {
return this.cursor = cursor = slides[cursor + 1] ? cursor + 1 : -1;
}, this.cursor)
.map(cursor => slides[cursor])
.takeWhile((slide) => {
return !!slide;
});
};
// these methods are not called for this sample
play = () => {
this.pauser.next(false);
};
pause = () => {
this.pauser.next(true);
};
};
This works when called in this way:
it("should subscribe to init", (done) => {
slides.forEach((slide, i) => {
if (slide.duration) {
slide.duration = slide.duration / 100;
}
});
composition.init(slides).subscribe(
(slide) => {
console.log(slide);
},
(err) => {
console.log("Error: " + err);
},
() => {
done();
});
});
While the previous example works as advertised the interval Observer never ends when I add some "magic":
export class CompositionService2 {
cursor = -1;
pauser = new Subject();
interval;
init = (slides) => {
let waitUntil = 0;
this.interval = Observable
.range(0, slides.length)
.mergeMap((i) => {
let next = Observable.of(i).delay(waitUntil);
waitUntil += !!slides[i]["duration"] ? slides[i]["duration"] : 0;
return next;
})
.scan((cursor) => {
return this.cursor = cursor = slides[cursor + 1] ? cursor + 1 : -1;
}, this.cursor)
.map(cursor => slides[cursor])
.takeWhile((slide) => {
return !!slide;
});
return this.pauser
// leaving commented for clarity of the end game
// .switchMap( paused => paused ? Observable.never() : this.interval );
// however, not even a straight forward switchMap is yeilding the expected results
.switchMap( paused => this.interval );
};
play = () => {
this.pauser.next(false);
};
pause = () => {
this.pauser.next(true);
};
};
Called in this way:
it("should subscribe to init", (done) => {
slides.forEach((slide, i) => {
if (slide.duration) {
slide.duration = slide.duration / 100;
}
});
composition.init(slides).subscribe(
(slide) => {
console.log(slide);
},
(err) => {
console.log("Error: " + err);
},
() => {
//I never get here!!!!!
done();
});
// kickstart my heart!
composition.play();
});
Anyone have any ideas what I am doing wrong here?
Upvotes: 1
Views: 665
Reputation: 18663
You aren't completing the outer stream. In the first version you are completing when the takeWhile
completes the stream. However, once you nest that inside of a switchMap
. You are only ever completing the inner stream, since the outer one (a Subject
) never completes. When this is flattened out it appears as a never ending stream to the subscriber.
If you want to make it complete you will need to terminate the stream at some point, for instance:
composition.init(slides)
.take(3)
.subscribe(
(slide) => {
console.log(slide);
},
(err) => {
console.log("Error: " + err);
},
() => {
//I never get here!!!!!
done();
});
I'm not super convinced that Rx is actually the right tool here, since streams are not really designed to be "paused", since you can't actually stop the Observable from continuing to propagate. You probably have noticed the number of hoops you are jumping through to store state in between pauses, so it might make sense to consider using generators or another library like IxJS
. But I digress.
Upvotes: 1