Joe Daley
Joe Daley

Reputation: 46466

Switch to new observable with latest from previous observable as input

I have a stream of request objects:

let request$ = new Subject();

And I have a function that takes a request object and the current app state object, and returns a stream of new state objects:

function stream(request, state) {
    // Return a cold Observable of new state objects
}

When the first request is emitted, I want to call stream and pass in that request and an initial state object, and start processing the states that it returns.

Some time after that, a second request is emitted. The first stream needs to stop and be replaced with a new stream that is created by calling stream again, passing in the second request and the latest state that was emitted from the first stream.

I feel like maybe scan with a higher-order stream can do it but I am stuck here:

request$.pipe(
    scan((state$, request) => {
        let previousState = ???
        return stream(request, previousState);
    }, of(initialState)),
    switchAll()
)

EDIT - Example

In real life, the stream function establishes a websockets connection to a server, sends parts of the request and state arguments in the request, and then emits new state objects as messages are received from the server.

To simplify for an example, let's say request and state are just numbers, and the stream function just repeatedly adds request to state, emitting one result per second:

function stream(request, state) {
    return interval(1000).pipe(
        map(i => state + (i+1) * request)
    );
}
// e.g. stream(2, 10) will emit 12, 14, 16, 18, ...

The desired output would be:

initialState = 10

request$       2------------5-----------------
stream(2, 10)  --12--14--16-|
stream(5, 16)               --21--26--31--36--
...
output         --12--14--16---21--26--31--36--

Notice how at the time the 5 request is emitted, both the 5 and the latest state 16 are required to build the next stream.

Upvotes: 2

Views: 1032

Answers (2)

cartant
cartant

Reputation: 58420

A simple way to do this would be to use defer to create some per-subscription state in which the most recent state could be stored:

const source = defer(() => {
    let lastState = initialState;
    return request$.pipe(
        switchMap(request => stream(request, lastState)),
        tap(state => lastState = state)
    );
});

Used this way, defer will ensure that each subscription to source will have its own lastState.

Upvotes: 3

ibenjelloun
ibenjelloun

Reputation: 7733

The scan operator takes a function with previous state and the current value, the initialState also should not be an Observable.

I think this should work better :

request$.pipe(
    scan((previousState, request) => {
        return stream(request, previousState);
    }, initialState),
    switchAll()
)

Upvotes: 1

Related Questions