Benjamin M
Benjamin M

Reputation: 24567

Wrap an Observable. Do something before and after each emitted value

I'd like to build a wrapper class, that does something before and after each emitted value of an Observable.

Here's what I came up with:

class Wrapper<T> {
    wrapped$: Observable<T>;

    _dataSubject = new Subject<T>();
    data$ = this._dataSubject.pipe(
        tap(_ => console.log("BEFORE"),
        //
        // map( ??? )
        //
    );

    constructor(wrapped$: Observable<T>) {
        this.wrapped$ = wrapped$.pipe(
            tap(_ => console.log("AFTER")
        );
    }
}

let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.data$.subscribe(val => console.log(val));
subject.next("foo")

The console output should be:

BEFORE
foo
AFTER

I can't figure out how to connect the $wrapped Observable with the _dataSubject.

But maybe I'm completely wrong and it needs a different approach.

Upvotes: 5

Views: 12826

Answers (6)

Alexis Michel
Alexis Michel

Reputation: 99

Since rxjs 7, there is now more option on the tap operator using the new TapObeserver input. You can now surround your function without creating a wrapper class.

import { of } from 'rxjs';
import { delay, tap } from 'rxjs/operators';
import { MonoTypeOperatorFunction } from 'rxjs/interfaces';

function log<T>(): MonoTypeOperatorFunction<T> {
  return tap<T>({
    subscribe: () => console.log('Log before'),
    finalize: () => console.log('Log after')
  })
}

const load$ = of(42).pipe(delay(500));

load$.pipe(
  log()
).subscribe((v) => console.log('Emitted', v));


// Output:
// Log before
// Emitted 42
// Log after

Upvotes: 0

kenset
kenset

Reputation: 361

If you wanted to do this without a class using RxJs 6:

return of(null)
    .pipe(tap(_ => {
        console.log('BEFORE');
    }))
    .pipe(switchMap(_ => wrappedObservable))
    .pipe(tap(_ => {
        console.log('AFTER');
    }))

Upvotes: 0

Benjamin M
Benjamin M

Reputation: 24567

Thanks for all your input!

I now came up with my own solution. The AFTER and DATA output are in the wrong order, but I figured out that this isn't important as long as they come at the same time

The code still needs some refactoring, but at least it works for now!

class Wrapper {
    _taskDoneSubject = new Subject<boolean>();
    taskDone$ = this._taskDoneSubject.asObservable();

    wrappedSwitchMap<T, R>(wrapped: (val: T) => Observable<R>): OperatorFunction<T, R> {
        return switchMap<T, R>(val => { // can also be done using mergeMap
            this._taskDoneSubject.next(false);
            return wrapped(val).pipe(
                tap(_ => this._taskDoneSubject.next(true))
            );
        });
    }
}

Usage:

test(valueEmittingObservable: Observable<string>) {

    let wrapper = new Wrapper();
    wrapper.taskDone$.subscribe(val => console.log("task done? ", val);

    valueEmittingObservable.pipe(
        // wrapper.wrappedSwitchMap(val => anObservableThatTakesAWhile(val))
        wrapper.wrappedSwitchMap(val => of("foo").pipe(delay(5000)) // simulated
    ).subscribe(val => console.log("emitted value: ", val);

}

Output:

task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo

Or if it's emitting faster than 5 seconds:

task done? false
(... 1 second until next emit ...)
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo

Upvotes: 0

Oscar Paz
Oscar Paz

Reputation: 18312

The best way (though complicated) is to create a new operator, similar to tap but that does something before and after the value is emitted.

You can see it working in the example (it's in ES6, as SO code snippets don't accept TypeScript, but you'll get the idea)

function wrap(before, after) {
    return function wrapOperatorFunction(source) {
        return source.lift(new WrapOperator(before, after));
    };
}
class WrapOperator {
    constructor(before, after) {
        this.before = before;
        this.after = after;
    }
    call(subscriber, source) {
        return source.subscribe(new WrapSubscriber(subscriber, this.before, this.after));
    }
}

class WrapSubscriber extends Rx.Subscriber {
    constructor(destination, before, after) {
        super(destination);
        this.before = before;
        this.after = after;
    }
    _next(value) {
        this.before ? this.before(value) : null;
        this.destination.next(value);
        this.after ? this.after(value) : null;
    }
}

// Now:

const observable = Rx.Observable.from([1, 2, 3, 4]);

observable.pipe(
   wrap(value => console.log('before', value), value => console.log('after', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));


// For what you want:
// let's simulate that, for each value in the array, we'll fetch something from an external service:
// we want the before to be executed when we make the request, and the after to be executed when it finishes. In this // case, we just combine two wrap operators and flatMap, for example:

observable.pipe(
  wrap(value => console.log('BEFORE REQUEST', value)),
  Rx.operators.flatMap(value => {
    const subject = new Rx.Subject();
    setTimeout(() => { subject.next(value); subject.complete(); }, 5000);
    return subject;
  }),
  wrap(undefined, value => console.log('AFTER REQUEST', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
<script src="https://unpkg.com/@reactivex/rxjs@5.5.0/dist/global/Rx.js"></script>

As stated, maybe a bit complicated, but it integrates seamlessly with RxJS operators and it is always a good example to know how to create our own operators :-)

For what you say in your comment, you can check the last example. There, I combine two wrap operators. The first one only uses the before callback, so it only executes something before a value is been emitted. As you see, because the source observable is from an array, the four before callbacks are executed immediately. Then, we apply flatMap. To it, we apply a new wrap, but this time with just the after callback. So, this callback is only called after the observables returned by flatMap yield their values.

Of course, if instead an observable from an array, you'd have one made from an event listener, you'd have:

  1. before callback is executed just before the event fired is pushed by the observable.
  2. The asynchronous observable executes.
  3. The asynchronous observable yields values after a time t.
  4. The after callback is executed.

This is where having operators pays off, as they're easily combined. Hope this suits you.

Upvotes: 3

Quentin Fonck
Quentin Fonck

Reputation: 1315

So, from what I understood, you can do something like this:

class Wrapper<T> {

    _dataSubject: Subject<T>;
    wrapped$: Observable<T>;

    constructor(wrapped$: Subject<T>) {
        this._dataSubject = wrapped$;
        this.wrapped$ = this._dataSubject.pipe(
          tap(_ => console.log("BEFORE")),
          tap(data => console.log(data)),
          tap(_ => console.log("AFTER"))
        );
    }
}

And then:

let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.wrapped$.subscribe();
subject.next("foo")

Upvotes: 4

Picci
Picci

Reputation: 17762

What about something like this

import {Observable} from 'rxjs';

export class DoBeforeAfter<T> {
    wrapped$: Observable<T>;


    constructor(wrapped$: Observable<T>, doFunction: (data: any) => void) {
        this.wrapped$ = Observable.of(null)
            .do(_ => console.log("BEFORE"))
            .switchMap(_ => wrapped$)
            .do(doFunction)
            .do(_ => console.log('AFTER'));
    }

}

to be consumed like this

const source = Observable.of('NOW');
const beforeAfter = new DoBeforeAfter(source, data => console.log(data));

beforeAfter.wrapped$.subscribe(
        null,
        error => console.error(error),
        () => console.log('DONE')

)

It looks a bit cumbersome, but maybe it can help

Upvotes: 12

Related Questions