LeniM
LeniM

Reputation: 21

RxJs merging streams

I am looking forward to merge streams and refreshing all subscribers.

For functional programming purposes concat do not update the current observable and I do not get what is the right way to do it.

This how I would like things to happen :

var observable = Rx.Observable.from(['stream1']);
var subscription = observable.subscribe(
    function(x) {
        console.log('Next: ' + x);
    },
    function(err) {
        console.log('Error: ' + err);
    },
    function() {
        console.log('Completed');
    });
observable.concat(Rx.Observable.from(['stream2']));

This code only reads the first stream then goes to completed. When concat it creates an new Observable that I dont really want as I already subscribed to the first one.

What is the right way to do it as I can not even push into the first observable ?

Thanks !

Upvotes: 1

Views: 978

Answers (1)

Olaf Horstmann
Olaf Horstmann

Reputation: 16892

What you are trying to do is not exactly possible like that, but you basically have two options to go, both involve the usage of a Subject:

1) Manual emitting of data

const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();

Rx.Observable.merge(obj$, subj$)
    .subscribe(
        x => console.log('Next: ' + x),
        x => console.log('Error: ' + x),
        () => console.log('Complete')
    );

subj$.next("stream2");
subj$.next("stream3");

However: In this scenario, the complete will never be called, because a Subject never completes on its own - so if you need your complete-handler to be triggered you would have to add a manual subj$.complete(); to the end.


2) Multicasting through a subject

const obs$ = Rx.Observable.of("stream1");
const subj$ = new Rx.Subject();

subj$.subscribe(
    x => console.log('Next: ' + x),
    x => console.log('Error: ' + x),
    () => console.log('Complete')
);

obs$.subscribe(x => subj$.next(x));
const obs2$ = Rx.Observable.of("stream2");
obs2$.subscribe(x => subj$.next(x));

In this scenario the Subject will basically act as a "proxy" that will only propagate data, but no error- or complete-triggers.

Both solutions are not really "nice" - but maybe you can outline your use-case a little better, I'm sure there's a proper solutions for it that does not involve any complicated workaround.


In case you just want to have a way to continuously provide data form a perpetual Observable, you should use a BehaviorSubject - it works in a way, that you can emit data on it and subscribe to it at the same time:

class Service {
    public data$ = new BehaviorSubject(someInitialDataOrNull);

    public getData() {
        makeSomeHttpCall()
            .subscribe(data => data$.next(data));
    }
}

class Component {
    constructor() {
        theService.data$.subscribe(data => console.log(data));
    }
}

Here is a link to the old docs of the BehaviorSubject (it basically still works the same way expect for onNext being next now, ect...)

Upvotes: 2

Related Questions