Reputation: 11
I'm stuck here right now I hope someone can help me :-)
following scenario:
I have two streams which transfer the same data type. The first stream provides initial data (REST call) and completes afterwards. The second stream gets its data via websocket and does not complete.
const mockData: User[] = [
{ id: '1', status: 'active' },
{ id: '2', status: 'inactive' },
{ id: '3', status: 'active' }
];
const initial$: Observable<User[]> = of(mockData);
const sub$: BehaviorSubject<User[]> = new BehaviorSubject(mockData);
const dynamic$ = sub$.asObservable();
Now I want to merge both streams and create two new streams based on a filter criterion, one for the active and one for the deactivated users.
const merged$ = merge(initial$, dynamic$).pipe(
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b))
);
const grouped$ = merged$.pipe(
mergeMap((users: User[]) => from(users)),
tap(a => console.log('Before group by', a)),
groupBy(item => item.status),
tap(a => console.log('After group by', a.key)),
mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
tap(a => console.log('after merge map', a))
);
const activeByGrouped$ = grouped$.pipe(
filter(([key]) => key === 'active'),
map(([key, value]) => value)
);
activeByGrouped$.subscribe(a => console.log('final subscribe', a));
The whole thing works if both streams complete, but not if one is open. Attached is a stackblitz for better understanding.
>> stackblitz https://stackblitz.com/edit/typescript-ixc3nu <<
Upvotes: 1
Views: 124
Reputation: 11
Sometimes you can't see the forest for the trees - It was easier than expected.
interface User {
id: string;
status: 'active' | 'inactive';
}
const mockData: User[] = [
{ id: '1', status: 'active' },
{ id: '2', status: 'inactive' },
{ id: '3', status: 'active' }
];
const initial$: Observable<User[]> = of(mockData);
const sub$: BehaviorSubject<User[]> = new BehaviorSubject(mockData);
const dynamic$ = sub$.asObservable();
setTimeout(() => sub$.next([{ id: '1', status: 'active' }]), 2000);
const merged$ = merge(initial$, dynamic$).pipe(
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
tap(console.log),
shareReplay({ bufferSize: 1, refCount: true }),
);
const activeGrouped$ = merged$.pipe(
map((users: User[]) => users.filter((user: User) => user.status === 'active'))
);
const inactiveGrouped$ = merged$.pipe(
map((users: User[]) =>
users.filter((user: User) => user.status === 'inactive')
)
);
activeGrouped$.subscribe(a => console.log('final subscribe - active:', a));
inactiveGrouped$.subscribe(a => console.log('final subscribe - inactive:', a));
working demo: stackblitz
Upvotes: 0
Reputation: 96909
My understanding of what you want to achieve is that you want to have a stream of only active/inactive users. In this case toArray()
is not a good choice because it requires the source Observable to complete which never happens when you're using Subjects (unless you call complete()
yourself).
So it appears you could just restructure your chain. However, you can't collect all results and then emit one large array (because again you the source doesn't complete). You could also emit all intermediate results using scan()
but it depends what behavior you want:
const grouped$ = merged$.pipe(
mergeMap((users: User[]) => from(users)),
groupBy(item => item.status),
);
const activeByGrouped$ = grouped$.pipe(
filter(group => group.key === 'active'),
mergeMap(group => group)
);
Your updated demo: https://stackblitz.com/edit/typescript-evvehx
Upvotes: 0