Reputation: 33
i'm kinda new to rxjs and can't get my head around this problem:
I have two streams:
From the incoming objects stream make a stream of the list of objects (with scan operator)
incoming: ----a--------b-------c----------d----------------\>
list: -------[a]----[a,b]----[a,b,c]----[a,b,c,d]---------\>
When a list object is selected (n), start a new stream
incoming: ----a--------b-------c----------d--------------------e-------->
list: -------[a]----[a,b]----[a,b,c]----[a,b,c,d]--------->
selected object: ---------------------------------c------->
new stream of list: ------[c,d]-----[c,d,e]--->
i can't get the last value of the list stream when the object is selected,,,
made a marble diagram for better understanding,
selectedObject$ = new BehaviorSubject(0);
incomingObjects$ = new Subject();
list$ = incomingObjects$.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, [])
)
newList$ = selectedObject$.pipe(
withLastFrom(list$),
switchMap(([index,list])=> incomingObjects$.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, list.slice(index))
))
)
Upvotes: 3
Views: 536
Reputation: 20534
A common pattern I use along with the scan operator is passing reducer functions instead of values to scan so that the current value can be used in the update operation. In this case you can link the two observables with a merge operator and map their values to functions that are appropriate - either adding to a list, or slicing the list after a selection.
// these are just timers for demonstration, any observable should be fine.
const incoming$ = timer(1000, 1000).pipe(map(x => String.fromCharCode(x + 65)), take(10));
const selected$ = timer(3000, 3000).pipe(map(x => String.fromCharCode(x * 2 + 66)), take(2));
merge(
incoming$.pipe(map(x => (s) => [...s, x])), // append to list
selected$.pipe(map(x => (s) => { // slice list starting from selection
const index = s.indexOf(x);
return (index !== -1) ? s.slice(index) : s;
}))
).pipe(
scan((list, reducer) => reducer(list), []) // run reducer
).subscribe(x => console.log(x)); // display list state as demonstration.
Upvotes: 2
Reputation: 17762
If I understand the problem right, you could follow the following approach.
The key point is to recognize that the list
Observable (i.e. the Observable obtained with the use of scan
) should be an hot Observable, i.e. an Observable that notifies independent on whether or not it is subscribed. The reason is that each new stream you want to create should have always the same source Observable as its upstream.
Then, as you already hint, the act of selecting a value should be modeled with a BehaviorSubject.
As soon as the select BehaviorSubject notifies a value selected, the previous stream has to complete
and a new one has to be subscribed. This is the job of switchMap
.
The rest is to slice
the arrays of numbers in the right way.
This is the complete code of this approach
const selectedObject$ = new BehaviorSubject(1);
const incomingObjects$ = interval(1000).pipe(take(10));
const incomingObjectsHot$ = new ReplaySubject<number[]>(1);
incomingObjects$
.pipe(
scan((acc, val) => {
acc.push(val);
return acc;
}, [])
)
.subscribe(incomingObjectsHot$);
selectedObject$
.pipe(
switchMap((selected) =>
incomingObjectsHot$.pipe(
map((nums) => {
const selIndex = nums.indexOf(selected);
if (selIndex > 0) {
return nums.slice(selIndex);
}
})
)
),
filter(v => !!v)
)
.subscribe(console.log);
An example can be seen in this stackblitz.
Upvotes: 0