Reputation: 24567
I'd like to build a wrapper class, that does something before and after each emitted value of an Observable.
Here's what I came up with:
class Wrapper<T> {
wrapped$: Observable<T>;
_dataSubject = new Subject<T>();
data$ = this._dataSubject.pipe(
tap(_ => console.log("BEFORE"),
//
// map( ??? )
//
);
constructor(wrapped$: Observable<T>) {
this.wrapped$ = wrapped$.pipe(
tap(_ => console.log("AFTER")
);
}
}
let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.data$.subscribe(val => console.log(val));
subject.next("foo")
The console output should be:
BEFORE
foo
AFTER
I can't figure out how to connect the $wrapped
Observable with the _dataSubject
.
But maybe I'm completely wrong and it needs a different approach.
Upvotes: 5
Views: 12826
Reputation: 99
Since rxjs 7, there is now more option on the tap operator using the new TapObeserver input. You can now surround your function without creating a wrapper class.
import { of } from 'rxjs';
import { delay, tap } from 'rxjs/operators';
import { MonoTypeOperatorFunction } from 'rxjs/interfaces';
function log<T>(): MonoTypeOperatorFunction<T> {
return tap<T>({
subscribe: () => console.log('Log before'),
finalize: () => console.log('Log after')
})
}
const load$ = of(42).pipe(delay(500));
load$.pipe(
log()
).subscribe((v) => console.log('Emitted', v));
// Output:
// Log before
// Emitted 42
// Log after
Upvotes: 0
Reputation: 361
If you wanted to do this without a class using RxJs 6:
return of(null)
.pipe(tap(_ => {
console.log('BEFORE');
}))
.pipe(switchMap(_ => wrappedObservable))
.pipe(tap(_ => {
console.log('AFTER');
}))
Upvotes: 0
Reputation: 24567
Thanks for all your input!
I now came up with my own solution. The AFTER
and DATA
output are in the wrong order, but I figured out that this isn't important as long as they come at the same time
The code still needs some refactoring, but at least it works for now!
class Wrapper {
_taskDoneSubject = new Subject<boolean>();
taskDone$ = this._taskDoneSubject.asObservable();
wrappedSwitchMap<T, R>(wrapped: (val: T) => Observable<R>): OperatorFunction<T, R> {
return switchMap<T, R>(val => { // can also be done using mergeMap
this._taskDoneSubject.next(false);
return wrapped(val).pipe(
tap(_ => this._taskDoneSubject.next(true))
);
});
}
}
Usage:
test(valueEmittingObservable: Observable<string>) {
let wrapper = new Wrapper();
wrapper.taskDone$.subscribe(val => console.log("task done? ", val);
valueEmittingObservable.pipe(
// wrapper.wrappedSwitchMap(val => anObservableThatTakesAWhile(val))
wrapper.wrappedSwitchMap(val => of("foo").pipe(delay(5000)) // simulated
).subscribe(val => console.log("emitted value: ", val);
}
Output:
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo
Or if it's emitting faster than 5 seconds:
task done? false
(... 1 second until next emit ...)
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo
Upvotes: 0
Reputation: 18312
The best way (though complicated) is to create a new operator, similar to tap
but that does something before and after the value is emitted.
You can see it working in the example (it's in ES6, as SO code snippets don't accept TypeScript, but you'll get the idea)
function wrap(before, after) {
return function wrapOperatorFunction(source) {
return source.lift(new WrapOperator(before, after));
};
}
class WrapOperator {
constructor(before, after) {
this.before = before;
this.after = after;
}
call(subscriber, source) {
return source.subscribe(new WrapSubscriber(subscriber, this.before, this.after));
}
}
class WrapSubscriber extends Rx.Subscriber {
constructor(destination, before, after) {
super(destination);
this.before = before;
this.after = after;
}
_next(value) {
this.before ? this.before(value) : null;
this.destination.next(value);
this.after ? this.after(value) : null;
}
}
// Now:
const observable = Rx.Observable.from([1, 2, 3, 4]);
observable.pipe(
wrap(value => console.log('before', value), value => console.log('after', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
// For what you want:
// let's simulate that, for each value in the array, we'll fetch something from an external service:
// we want the before to be executed when we make the request, and the after to be executed when it finishes. In this // case, we just combine two wrap operators and flatMap, for example:
observable.pipe(
wrap(value => console.log('BEFORE REQUEST', value)),
Rx.operators.flatMap(value => {
const subject = new Rx.Subject();
setTimeout(() => { subject.next(value); subject.complete(); }, 5000);
return subject;
}),
wrap(undefined, value => console.log('AFTER REQUEST', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
<script src="https://unpkg.com/@reactivex/rxjs@5.5.0/dist/global/Rx.js"></script>
As stated, maybe a bit complicated, but it integrates seamlessly with RxJS operators and it is always a good example to know how to create our own operators :-)
For what you say in your comment, you can check the last example. There, I combine two wrap
operators. The first one only uses the before
callback, so it only executes something before a value is been emitted. As you see, because the source observable is from an array, the four before
callbacks are executed immediately. Then, we apply flatMap
. To it, we apply a new wrap
, but this time with just the after
callback. So, this callback is only called after the observables returned by flatMap
yield their values.
Of course, if instead an observable from an array, you'd have one made from an event listener, you'd have:
before
callback is executed just before the event fired is pushed by the observable.after
callback is executed.This is where having operators pays off, as they're easily combined. Hope this suits you.
Upvotes: 3
Reputation: 1315
So, from what I understood, you can do something like this:
class Wrapper<T> {
_dataSubject: Subject<T>;
wrapped$: Observable<T>;
constructor(wrapped$: Subject<T>) {
this._dataSubject = wrapped$;
this.wrapped$ = this._dataSubject.pipe(
tap(_ => console.log("BEFORE")),
tap(data => console.log(data)),
tap(_ => console.log("AFTER"))
);
}
}
And then:
let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.wrapped$.subscribe();
subject.next("foo")
Upvotes: 4
Reputation: 17762
What about something like this
import {Observable} from 'rxjs';
export class DoBeforeAfter<T> {
wrapped$: Observable<T>;
constructor(wrapped$: Observable<T>, doFunction: (data: any) => void) {
this.wrapped$ = Observable.of(null)
.do(_ => console.log("BEFORE"))
.switchMap(_ => wrapped$)
.do(doFunction)
.do(_ => console.log('AFTER'));
}
}
to be consumed like this
const source = Observable.of('NOW');
const beforeAfter = new DoBeforeAfter(source, data => console.log(data));
beforeAfter.wrapped$.subscribe(
null,
error => console.error(error),
() => console.log('DONE')
)
It looks a bit cumbersome, but maybe it can help
Upvotes: 12