nicojs
nicojs

Reputation: 2055

How to use RXJS to share a pool of resources between multiple consumers

How can we divide work of consumers over a limited set of resources in RXJS?

I have a Pool class here (simplified):

class Pool<TResource> {

  private readonly resource$: Observable<TResource>;

  constructor(resource$: Observable<TResource>) {
    this.resource$ = resource$.pipe(
      // We use share replay here, so multiple calls to `schedule` will share the resources
      shareReplay()
    );
  }

  /**
   * Schedules a task to be executed on resources in the pool. Each input is paired with a resource, which allows async work to be done.
   * @param input$ The inputs to pair up with a resource.
   * @param task The task to execute on each resource
   */
  public schedule<TIn, TOut>(input$: Observable<TIn>, task: (resource: TResource, input: TIn) => Promise<TOut> | TOut): Observable<TOut> {
    const recycleBin = new Subject<TResource>();
    const resource$ = merge(recycleBin, this.resource$);

    return zip(resource$, input$).pipe(
      mergeMap(async ([resource, input]) => {
        const output = await task(resource, input);
        //  Recycles a resource so its re-emitted from the `resource$` observable.
        recycleBin.next(resource);
        return output;
      }),
      tap({ complete: () => recycleBin.complete() })
    );
  }
}

You can use it like this:

class CalculatorResource {
  expensiveCalculation(n: number) {
    return new Promise<number>(res => setTimeout(() => res(n*2), 1000));
  }
}

const pool = new Pool(of(new CalculatorResource(), new CalculatorResource()));
const input$ = of(1, 2, 3, 4);
const output$ = pool.schedule(input$, (calc, n) => calc.expensiveCalculation(n));
output$.subscribe(console.log)
// ...wait 1 sec
// Logs 2
// Logs 4
// ...wait 1 sec
// Logs 6
// Logs 8

This works as expected.

However, when we call schedule in parallel, the resources will also be distributed in parallel. This is not good, we want the resources to be distributed evenly, since the nature of the tasks they do make it so they can't be called in parallel.

const pool = new Pool(of(new CalculatorResource(), new CalculatorResource()));
const input$ = of(1, 2, 3, 4);
const parallelInput$ = of(5, 6, 7, 8);
pool.schedule(input$, (calc, n) =>
  calc.expensiveCalculation(n)
).subscribe(console.log);
pool.schedule(parallelInput$, (calc, n) =>
  calc.expensiveCalculation(n)
).subscribe(console.log);
// Actual output:

// ...wait 1 sec
// Logs 2
// Logs 4
// Logs 10
// Logs 12
// ...wait 1 sec
// Logs 6
// Logs 8
// Logs 14
// Logs 16

// What i would like to see:
// ...wait 1 sec
// Logs 2
// Logs 4
// ...wait 1 sec
// Logs 10
// Logs 12
// ...wait 1 sec
// Logs 6
// Logs 8
// ...wait 1 sec
// Logs 14
// Logs 16

Upvotes: 3

Views: 237

Answers (1)

Andrei Tătar
Andrei Tătar

Reputation: 8295

So the main thing is you need to share the actual part that does the work, not only the resources.

Here's a solution from me:

https://stackblitz.com/edit/rxjs-yyxjh2?devToolsHeight=100&file=index.ts

import { merge, Observable, Observer, of, Subject, zip } from 'rxjs';
import { ignoreElements, concatMap, switchMap } from 'rxjs/operators';

class Pool<TResource> {
  private readonly resourceFree$ = new Subject<TResource>();
  private readonly dispatcher$ = new Subject<{
    execute: (resource: TResource) => any;
    observer: Observer<any>;
  }>();
  private freeResources$ = merge(this.resource$, this.resourceFree$);
  readonly doWork$ = zip(this.freeResources$, this.dispatcher$).pipe(
    switchMap(async ([resource, work]) => {
      try {
        const result = await work.execute(resource);
        work.observer.next(result);
        work.observer.complete();
      } catch (err) {
        work.observer.error(err);
      }
      this.resourceFree$.next(resource);
    }),
    ignoreElements()
  );

  constructor(private resource$: Observable<TResource>) {}

  public schedule<TIn, TOut>(
    input$: Observable<TIn>,
    task: (resource: TResource, input: TIn) => Promise<TOut> | TOut
  ): Observable<TOut> {
    return input$.pipe(
      //you can use mergeMap here as well, depends on how fast you want to consume inputs
      concatMap((input) => {
        const work = {
          execute: (r) => task(r, input),
          observer: new Subject<TOut>(),
        };
        this.dispatcher$.next(work);
        return work.observer;
      })
    );
  }
}

class CalculatorResource {
  expensiveCalculation(n: number) {
    return new Promise<number>((res) => setTimeout(() => res(n * 2), 1000));
  }
}

const pool = new Pool(of(new CalculatorResource(), new CalculatorResource()));
pool.doWork$.subscribe(); //this is to start the pool dispatcher

const input$ = of(1, 2, 3, 4);
const parallelInput$ = of(5, 6, 7, 8);
pool
  .schedule(input$, (calc, n) => calc.expensiveCalculation(n))
  .subscribe(console.log, undefined, () => console.log('1st done'));
pool
  .schedule(parallelInput$, (calc, n) => calc.expensiveCalculation(n))
  .subscribe(console.log, undefined, () => console.log('2nd done'));

setTimeout(() => {
  pool
    .schedule(parallelInput$, (calc, n) => calc.expensiveCalculation(n))
    .subscribe(console.log, undefined, () => console.log('3rd done'));
}, 5000);

Upvotes: 1

Related Questions