Reputation: 821
import { combineLatest } from 'rxjs';
const observable_1 = get_first_observable();
const observable_2 = get_second_observable();
console.log('first log', observable_1, observable_2);
observable_1.subscribe(e => console.log('second log', e));
observable_2.subscribe(e => console.log('third log', e));
const combined = combineLatest(observable_1, observable_2);
console.log('fourth log', combined);
combined.subscribe(e => console.log('fifth log', e)); // throws TypeError warning, doesn't work
I have a portion of code that behaved just as above, it was working fine until yesterday, when I made some changes to what I thought was unrelated code.
The first log
confirms that both of these variables are indeed Observables
.
The second log
and third log
confirm that each of observable is emitting at least one value.
The fourth log
confirms that combineLatest()
is returning an Observable
type.
The fifth log
never fires, and commenting that line out removes the warning. Anywhere I attempt to do combined.subscribe()
it throws the following TypeError warning.
TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
What is going on here?? I have tried everything I can think of to no avail. It seems to me that if I am passing two valid Observables to combineLatest()
, and they each emit a value, then it should work as expected.
As requested, the sources of the two observables are as follows:
import { BehaviorSubject } from 'rxjs';
import { distinctUntilChanged, map } from 'rxjs/operators';
const get_first_observable = () => {
const subject = new BehaviorSubject<string>(null);
// I am using subject.next(...) elsewhere
return subject.asObservable().pipe(distinctUntilChanged());
};
const get_second_observable = () => {
// where store is a redux Store
const store_subject = new BehaviorSubject(store.getState());
store.subscribe(() => {
store_subject.next(store.getState());
});
const stream = store_subject.asObservable();
const my_observable = stream.pipe(
map(state => {
return format_data(state) || [];
})
);
return my_observable;
};
Alright, new update.
This is really strange to me and I don't understand it at all, but I added the line of code below inside get_second_observable()
before it returns my_observable
, and then magically all the code works. Now if I comment that line back out it doesn't work anymore.. How does that make any sense? I don't save the new observable to a variable or use it at all. Just combining them seems to make it work down the chain.
combineLatest(my_observable, of(true));
Upvotes: 3
Views: 1210
Reputation: 31
Changing:
import { combineLatest } from 'rxjs/internal/observable/combineLatest';
to:
import { combineLatest } from 'rxjs';
Fixed the error for me:
TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
While you already have the proper import, it's possible there is an improper import in code you're not showing. As others have stated, there is no obvious problem with the code you have presented.
Upvotes: 1