Marcin Pevik
Marcin Pevik

Reputation: 173

Add streams dynamically to combined stream (eg forkJoin)

My function (lets call it myFunction) is getting an array of streams (myFunction(streams: Observable<number>[])). Each of those streams produces values from 1 to 100, which acts as a progress indicator. When it hits 100 it is done and completed. Now, when all of those observables are done I want to emit a value. I could do it this way:

public myFunction(streams: Observable<number>[]) {
  forkJoin(streams).subscribe(_values => this.done$.emit());
}

This works fine, but imagine following case:

  1. myFunction gets called with 2 streams
  2. one of those streams is done, second one is still progressing
  3. myFunction gets called (again) with 3 more streams (2nd one from previous call is still progressing)

I'd like to somehow add those new streams from 3rd bullet to the "queue", which would result in having 5 streams in forkJoin (1 completed, 4 progressing).

I've tried multiple approaches but can't get it working anyhow... My latest approach was this:

private currentProgressObs: Observable<any> | null = null;
private currentProgressSub: Subscription | null = null;

public myFunction(progressStreams: Observable<number>[]) {
    const isUploading = this.cumulativeUploadProgressSub && !this.cumulativeUploadProgressSub.closed;
    const currentConcatObs = this.currentProgressObs?.pipe(concatAll());
    const currentStream = isUploading && this.currentProgressObs ? this.currentProgressObs : of([100]);
        if (this.currentProgressSub) {
            this.currentProgressSub.unsubscribe();
            this.currentProgressSub = null;
        }
        this.currentProgressObs = forkJoin([currentStream, ...progressStreams]);
        this.currentProgressSub = this.currentProgressObs.subscribe(
            _lastProgresses => {
                this._isUploading$.next(false); // <----- this is the event I want to emit when all progress is completed
                this.currentProgressSub?.unsubscribe();
                this.currentProgressSub = null;
                this.currentProgressObs = null;
            },
        );
}

Above code only works for the first time. Second call to the myFunction will never emit the event.

I also tried other ways. I've tried recursion with one global stream array, in which I can add streams while the subscription is still avctive but... I failed. How can I achieve this? Which operator and in what oreder should I use? Why it will or won't work?

Upvotes: 0

Views: 290

Answers (2)

BizzyBob
BizzyBob

Reputation: 14740

Your problem is that each time your call your function, you are creating a new observable. Your life would be much easier if all calls of your function pushed all upload jobs through the same stream.

You can achieve this using a Subject.

I would suggest you push single "Upload Jobs" though a simple subject and design an observable that emits the state of all upload jobs whenever anything changes: A simple class that offers a createJob() method to submit jobs, and a jobs$ observable to reference the state:

class UploadService {
  private jobs = new Subject<UploadJob>();
  public jobs$ = this.jobs.pipe(
    mergeMap(job => this.processJob(job)),
    scan((collection, job) => collection.set(job.id, job), new Map<string, UploadJob>()),
    map(jobsMap => Array.from(jobsMap.values()))
  );

  constructor() {
    this.jobs$.subscribe();
  }

  public createJob(id: string) {
    this.jobs.next({ id, progress: 0 });
  }
  
  private processJob(job: UploadJob) {
    // do work and return observable that
    // emits updated status of UploadJob
  }
}

Let's break it down:

  • jobs is a simple subject, that we can push "jobs" through
  • createJob simply calls jobs.next() to push the new job through the stream
  • jobs$ is where all the magic happens. It receives each UploadJob and uses:
    • mergeMap to execute whatever function actually does the work (I called it processJob() for this example) and emits its values into the stream
    • scan is used to accumulate these UploadJob emissions into a Map (for ease of inserting or updating)
    • map is used to convert the map into an array (Map<string, UploadJob> => UploadJob[])
  • this.jobs$.subscribe() is called in the constructor of the class so that jobs will be processed

Now, we can easily derive your isUploading and cumulativeProgress from this jobs$ observable like so:

  public isUploading$ = this.jobs$.pipe(
    map(jobs => jobs.some(j => j.progress !== 100)),
    distinctUntilChanged()
  );

  public progress$ = this.jobs$.pipe(
    map(jobs => {
      const current = jobs.reduce((sum, j) => sum + j.progress, 0) / 100;
      const total   = jobs.length ?? current;
      
      return current / total;
    })
  );

Here's a working StackBlitz demo.

Upvotes: 1

Here is my suggestion for your issue.

We will have two subjects, one to count the number of request being processed (requestsInProgress) and one more to mange the requests that are being processed (requestMerger)

So the thing that will do is whenever we want to add new request we will pass it to the requestMerger Subject.

Whenever we receive new request for processing in the requestMerger stream we will first increment the requestInProgress counter and after that we will merge the request itself in the source observable. While merging the new request/observable to the source we will also add the finalize operator in order to track when the request has been completed (reached 100), and when we hit the completion criteria we will decrement the request counter with the decrementCounter function.

In order to emit result e.g. to notify someone else in the app for the state of the pending requests we can subscribe to the requestsInProgress Subject.

You can test it out either here or in this stackBlitz

let {
  interval,
  Subject,
  BehaviorSubject 
} = rxjs
let {
  mergeMap,
  map,
  takeWhile,
  finalize,
  first,
  distinctUntilChanged
} = rxjs.operators


// Imagine next lines as a service
// Subject responsible for managing strems
let requestMerger = new Subject();
// Subject responsible for tracking streams in progress
let requestsInProgress = new BehaviorSubject(0);

function incrementCounter() {
  requestsInProgress.pipe(first()).subscribe(x => {
    requestsInProgress.next(x + 1);
  });
}

function decrementCounter() {
  requestsInProgress.pipe(first()).subscribe(x => {
    requestsInProgress.next(x - 1);
  });
}
// Adds request to the request being processed
function addRequest(req) {
  // The take while is used to complete the request when we have `value === 100` , if you are dealing with http-request `takeWhile` might be redudant, because http request complete by themseves (e.g. the finalize method of the stream will be called even without the `takeWhile` which will decrement the requestInProgress counter)
  requestMerger.next(req.pipe(takeWhile(x => x < 100)));
}
// By subscribing to this stream you can determine if all request are processed or if there are any still pending
requestsInProgress
  .pipe(
    map(x => (x === 0 ? "Loaded" : "Loading")),
    distinctUntilChanged()
  )
  .subscribe(x => {
    console.log(x);
    document.getElementById("loadingState").innerHTML = x;
  });

// This Subject is taking care to store or request that are in progress
requestMerger
  .pipe(
    mergeMap(x => {
      // when new request is added (recieved from the requestMerger Subject) increment the requrest being processed counter
      incrementCounter();
      return x.pipe(
        finalize(() => {
          // when new request has been completed decrement the requrest being processed counter
          decrementCounter();
        })
      );
    })
  )
  .subscribe(x => {
    console.log(x);
  });
// End of fictional service

// Button that adds request to be processed
document.getElementById("add-stream").addEventListener("click", () => {
  addRequest(interval(1000).pipe(map(x => x * 25)));
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.6/rxjs.umd.min.js"></script>

<div style="display:flex">
<button id="add-stream">Add stream</button>

<h5>Loading State:  <span id="loadingState">false</span> </h5>
</div>

Upvotes: 1

Related Questions