Reputation: 6988
I am trying to figure out the RxJS pattern for declaring my "subscribes" such that they execute in order.
Here's the situation I'm in:
var flag = false;
// Stream that non-deterministically emits `true` when flag is true
var random$ = Rx.Observable.interval(1000)
.map(() => setFlagToRandomTrueOrFalse())
.filter(x => x)
.first();
// How can I declare my "subscribes" so that the console.logs happen in order?
var sub1 = random$.subscribe(() => console.log(1), console.log, resetFlagToFalse);
var sub2 = random$.subscribe(() => console.log(2), console.log, resetFlagToFalse);
function setFlagToRandomTrueOrFalse() {
flag = flag || !!Math.floor(Math.random() * 2);
return flag;
}
function resetFlagToFalse() { flag = false; }
This currently prints 1
and 2
in random order due to the async nature of .subscribe
.
Also, what is the proper name for these "subscribes"? Total RxJS noob here.
Upvotes: 5
Views: 1735
Reputation: 18665
eeerr I am not sure the order you observe is because of the async nature of subscribe
.
First of all, as currently written, random$
is a cold source, i.e. a producer which has not been subscribed yet. That producer will start producing every time you subscribe to it. That means here, you are starting the producer twice, so you will have two different random values emitted for each, so the log will be shown at different times, given that you have not subscribed to the same sequence of values, even when it looks very much like so.
A more clear description of the distinction between cold and hot streams is Hot and Cold observables : are there 'hot' and 'cold' operators?. Spend as much time as necessary to understand this because this is very important before you start to do anything fancy with Rxjs.
If you want to subscribe to the same sequence of values, i.e. if you want the producer to multicast its values to all subscribers (hot behavior), instead of restarting for each subscriber (cold behavior), you need to use a variation of the multicast
operator, as explained in the abovementioned link. That means in your case
var random$ = Rx.Observable.interval(1000)
.map(() => setFlagToRandomTrueOrFalse())
.filter(x => x)
.share();
Going back to your question about data propagation order, it is indeed mostly deterministic, which does not mean always simple. In a simple case as yours, the first subscribe will indeed be executed first. That is because under the hood, multicast
-like operators are using subjects, which emit immediately (hot source) the value they receive to all their subscribers, in order of subscription. For information about subjects, have a look at What are the semantics of different RxJS subjects?.
Last thing, it helps a lot to reason about your streams if you compose pure functions. Having a closure variable such as flag
makes thing really more complex very fast.
I recommend you to have a look here before delving much further into Rxjs : The introduction to Reactive Programming you've been missing
Upvotes: 4
Reputation: 11581
You can use publish() and connect()
to perform the actions in sequence like this
var flag = false;
// Stream that non-deterministically emits `true` when flag is true
var random$ = Rx.Observable.interval(1000)
.map(() => setFlagToRandomTrueOrFalse())
.filter(x => x)
.first().publish();
// How can I declare my "subscribes" so that the console.logs happen in order?
var sub1 = random$.subscribe(() => console.log(1), console.log, resetFlagToFalse);
var sub2 = random$.subscribe(() => console.log(2), console.log, resetFlagToFalse);
random$.connect();
function setFlagToRandomTrueOrFalse() {
flag = flag || !!Math.floor(Math.random() * 2);
return flag;
}
function resetFlagToFalse() { flag = false; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.1.0/rx.all.js"></script>
Upvotes: 2