Reputation: 100190
In one of my projects, I have the following code, and upon migrating to RxJS5, Rx.Observer seems to no longer be defined:
let index = 0;
let obsEnqueue = this.obsEnqueue = new Rx.Subject();
this.queueStream = Rx.Observable.create(obs => {
var push = Rx.Observer.create(v => { // ! error
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
});
return obsEnqueue.subscribe(push);
});
this.push = (v) => {
obsEnqueue.next(v);
index++;
};
this no longer works because Rx.Observer
is not defined
in the migration guide:
https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md
it says:
Observer is an interface now
However, that shouldn't meant that Rx.Observer, even if it's an interface, shouldn't have a "static" method, called create.
Anyway, Rx.Observer
doesn't seem to exist anymore.
I get this error:
TypeError: Cannot read property 'create' of undefined
How can I create an Observer somehow yielding similar results to my code above?
Upvotes: 2
Views: 384
Reputation: 2858
If I understood what this partial code is trying to do ...
I think No,
I don't see how you can make this "simpler".
perhaps what is there to be improved
is to make it more "reusable",
make it a module ?,
or perhaps a Rx operator if there isn't one like yet...
this could be an attempt to it
/*
"dependencies": {
"rxjs": "^5.0.2"
}
*/
import {Observable, Observer, Subject, Subscriber} from "rxjs";
export interface ICircularQueue<T> extends Observable<T> {
push(value: T): void;
}
/**
* on every push use 'NEXT' observer/subscription,
* in the order they've been subscribed,
* cycling back to 1st subscription after last
*/
export function create<T>(): ICircularQueue<T> {
let index = 0;
let obsEnqueue = new Subject<T>();
let queueStream = Observable.create((obs: Observer<T>) => {
let push = Subscriber.create<T>(v => {
// ! error ?
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
});
return obsEnqueue.subscribe(push);
});
queueStream.push = (v: T) => {
obsEnqueue.next(v);
index++;
};
return queueStream;
}
then a basic test ....
import * as CircularQueue from "./CircularQueue";
import * as assert from "assert";
const $in = (array: any[], x: any) => {
for (let $x of array) {
if ($x === x) { return true; }
}
return false;
};
describe("CircularQueue", () => {
it("works", () => {
let queue = CircularQueue.create();
let result: number[] = [];
queue.subscribe(x => {
assert.ok($in([0, 4, 8], x));
result.push(0);
});
queue.subscribe(x => {
assert.ok($in([1, 5, 9], x));
result.push(1);
});
queue.subscribe(x => {
assert.ok($in([2, 6, 10], x));
result.push(2);
});
queue.subscribe(x => {
assert.ok($in([3, 7, 11], x));
result.push(3);
});
for (let i = 0; i < 12; i++) {
queue.push(i);
}
assert.equal(result.join(), "0,1,2,3,0,1,2,3,0,1,2,3");
});
});
Upvotes: 1
Reputation: 12687
From the source:
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
Observer<T>
is an interface with the onNext
, onCompleted
and onError
method. An interface is only a language construct. It is merely used by the typescript compiler to type-check objects requiring an Observer<T>
. It is erased on compilation.
The class Subscriber<T>
implements the interface Observer<T>
.
It means that Subscriber<T>
is the actual concrete class with the above methods.
So you use var push = Rx.Subscriber.create(v => { [...]
instead.
Note:
In the original Rx implementation, the interfaces were IObservable<T>
and IObserver<T>
and used extension methods to allow composition. When it came to JS, they had to have methods on the prototype of Observable
/ Observer
itself to enable composition - so the class itself had the methods.
Upvotes: 1
Reputation: 96949
To be honest I don't understand what you code does but even though Observer
class doesn't exist any more it was mostly replaced by Subscriber
class that is used in almost the same way as Observer
.
It has a static Subscriber.create
method. See https://github.com/ReactiveX/rxjs/blob/master/src/Subscriber.ts#L32
This method returns a Subscriber
object that you can later use such as obsEnqueue.subscribe(push);
.
Upvotes: 2
Reputation: 9425
Why not just inline the onNext function directly into the subscribe?
this.queueStream = Rx.Observable.create(obs => {
return obsEnqueue.subscribe(
v => {
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
}
);
});
Upvotes: 1