Reputation: 21
I am looking forward to merge streams and refreshing all subscribers.
For functional programming purposes concat do not update the current observable and I do not get what is the right way to do it.
This how I would like things to happen :
var observable = Rx.Observable.from(['stream1']);
var subscription = observable.subscribe(
function(x) {
console.log('Next: ' + x);
},
function(err) {
console.log('Error: ' + err);
},
function() {
console.log('Completed');
});
observable.concat(Rx.Observable.from(['stream2']));
This code only reads the first stream then goes to completed. When concat it creates an new Observable that I dont really want as I already subscribed to the first one.
What is the right way to do it as I can not even push into the first observable ?
Thanks !
Upvotes: 1
Views: 978
Reputation: 16892
What you are trying to do is not exactly possible like that, but you basically have two options to go, both involve the usage of a Subject
:
1) Manual emitting of data
const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();
Rx.Observable.merge(obj$, subj$)
.subscribe(
x => console.log('Next: ' + x),
x => console.log('Error: ' + x),
() => console.log('Complete')
);
subj$.next("stream2");
subj$.next("stream3");
However: In this scenario, the complete will never be called, because a Subject
never completes on its own - so if you need your complete
-handler to be triggered you would have to add a manual subj$.complete();
to the end.
2) Multicasting through a subject
const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();
subj$.subscribe(
x => console.log('Next: ' + x),
x => console.log('Error: ' + x),
() => console.log('Complete')
);
obs$.subscribe(x => subj$.next(x));
const obs2$ = Rx.Observable.of("stream2");
obs2$.subscribe(x => subj$.next(x));
In this scenario the Subject
will basically act as a "proxy" that will only propagate data, but no error- or complete-triggers.
Both solutions are not really "nice" - but maybe you can outline your use-case a little better, I'm sure there's a proper solutions for it that does not involve any complicated workaround.
In case you just want to have a way to continuously provide data form a perpetual Observable, you should use a BehaviorSubject
- it works in a way, that you can emit data on it and subscribe to it at the same time:
class Service {
public data$ = new BehaviorSubject(someInitialDataOrNull);
public getData() {
makeSomeHttpCall()
.subscribe(data => data$.next(data));
}
}
class Component {
constructor() {
theService.data$.subscribe(data => console.log(data));
}
}
Here is a link to the old docs of the BehaviorSubject
(it basically still works the same way expect for onNext
being next
now, ect...)
Upvotes: 2