Reputation: 46466
I have a stream of request objects:
let request$ = new Subject();
And I have a function that takes a request object and the current app state object, and returns a stream of new state objects:
function stream(request, state) {
// Return a cold Observable of new state objects
}
When the first request is emitted, I want to call stream
and pass in that request and an initial state object, and start processing the states that it returns.
Some time after that, a second request is emitted. The first stream needs to stop and be replaced with a new stream that is created by calling stream
again, passing in the second request and the latest state that was emitted from the first stream.
I feel like maybe scan
with a higher-order stream can do it but I am stuck here:
request$.pipe(
scan((state$, request) => {
let previousState = ???
return stream(request, previousState);
}, of(initialState)),
switchAll()
)
In real life, the stream
function establishes a websockets connection to a server, sends parts of the request and state arguments in the request, and then emits new state objects as messages are received from the server.
To simplify for an example, let's say request
and state
are just numbers, and the stream
function just repeatedly adds request
to state
, emitting one result per second:
function stream(request, state) {
return interval(1000).pipe(
map(i => state + (i+1) * request)
);
}
// e.g. stream(2, 10) will emit 12, 14, 16, 18, ...
The desired output would be:
initialState = 10
request$ 2------------5-----------------
stream(2, 10) --12--14--16-|
stream(5, 16) --21--26--31--36--
...
output --12--14--16---21--26--31--36--
Notice how at the time the 5
request is emitted, both the 5
and the latest state 16
are required to build the next stream.
Upvotes: 2
Views: 1032
Reputation: 58420
A simple way to do this would be to use defer
to create some per-subscription state in which the most recent state could be stored:
const source = defer(() => {
let lastState = initialState;
return request$.pipe(
switchMap(request => stream(request, lastState)),
tap(state => lastState = state)
);
});
Used this way, defer
will ensure that each subscription to source
will have its own lastState
.
Upvotes: 3
Reputation: 7733
The scan operator takes a function with previous state and the current value, the initialState
also should not be an Observable
.
I think this should work better :
request$.pipe(
scan((previousState, request) => {
return stream(request, previousState);
}, initialState),
switchAll()
)
Upvotes: 1