Dave Stein
Dave Stein

Reputation: 9344

How to pause a buffer from one source in RXJS?

I have a stream of events coming through via fromEventPattern like so:

fromEventPattern<IPsEvent>(addEventHandler).subscribe(ps$);

Due to business quirks, I expect that I will sometimes get an exception thrown, at which point I want to queue up the events and refire once that error state is resolved.

I've been trying the solution from Pausable buffer with RxJS to no avail. I am thinking it's because they are able to toggle through a separate observable whereas this is kind of asking to pause itself midstream. In the linked example I have blockingCallsAllowed$ rather than autoSave$. Here is my latest try:

const source$ = new Subject<IPsEvent>();

const blockingCallsAllowed$ = new BehaviorSubject(true);
const on$ = blockingCallsAllowed$.pipe(filter((v) => v));
const off$ = blockingCallsAllowed$.pipe(filter((v) => !v));

source$
  .pipe(
    map(() => {
      try {
        // line will throw exception at certain times
        myFunction();
        return true;
      } catch (e) {
        const i = setInterval(() => {
          try {
            myFunction();
            console.log('good again');
            blockingCallsAllowed$.next(true);
            clearInterval(i);
          } catch (er) {
            // still in flux
          }
        }, 50);
        return false;
      }
    }),
  )
  .subscribe(blockingCallsAllowed$);

const output$ = merge(
  source$.pipe(bufferToggle(off$, () => on$)),
  source$.pipe(windowToggle(on$, () => off$)),
).pipe(concatMap(from));

output$.subscribe((evt) => {
  console.log('After buffers', evt);
});

// Add events from the Ps API to the event stream
fromEventPattern(addEventHandler).subscribe(source$);

Everything fires fine until the first exception and then it never outputs what it had buffered away, even though it fires that things are good again in console.log.

I am thinking there is some timing issue around relying on source$.pipe in the same execution and then the interval running later with .next. Can't nail it though after many different permutations of this code.

Upvotes: 3

Views: 164

Answers (1)

Mrk Sef
Mrk Sef

Reputation: 8062

It's not clear to me what you're trying to implement. Though if you want to keep retrying myFunction() every 50ms until it succeeds and stop processing other events while this happens, concatMap basically does all that for you.

It will buffer emissions from the source while it waits for the inner observable to complete.

So what you're after might look like this:

source$.pipe(
  concatMap(_ => of(true).pipe(
    tap(_ => myFunction()),
    retryWhen(errors => errors.pipe(
      delay(50)
    ))
  ))
).subscribe();

Upvotes: 2

Related Questions