coder101
coder101

Reputation: 455

Better alternative to using concatMap twice within pipe?

I want to execute a pre-defined promise (promise 2) recursively after another promise (promise 1) resolves to see when my connection to a peripheral device returns false, following which I can do something else. I'd like to be able to do so using observables. Here's the code I've come up with so far and have 2 questions:

1) Is there a better alternative to using concatMap twice in my code in the reprovision() function,

and

2) Is there a better way to implement my code to do what I've described above?

Here's the code which uses observables to execute promise 1 (only once) and promise 2 recursively and then does something with the result:

reprovision(){
    this.bleMessage = this.prepareMessageService.prepareMessage(
      MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
    );

    from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
      concatMap(x => interval(1000)),
      concatMap(x => this.bleService.checkBleConnection1(this.deviceId)),
      takeWhile(x => x, true)).subscribe(
        resp => console.log('still connected to peripheral'),
        err =>  {
          console.log('disconnected from peripheral, do something else');
        }
      );
}

Here's the code for promise 2 (this.bleService.checkBleConnection1), which I want executed recursively until the peripheral is disconnected from my mobile device. The idea behind this promise is that it should resolve as long as my mobile device is still connected to a peripheral, and throw an error once that connection fails.

checkBleConnection1(deviceId: string){
  return BleClient.getConnectedDevices([]) //BleClient.getConnectedDevices([]) returns a promise
  .then(resp => {
    if (!resp.some(elem => deviceId === elem.deviceId)){
      throw new Error('Peripheral connection failed');
    }
    return true;
  })
  .catch(error => {
    throw error;
  });
}

Upvotes: 2

Views: 498

Answers (2)

Saptarsi
Saptarsi

Reputation: 888

From your post what I understood is

  • You want to check recursively with 1sec interval by calling this.bleService.checkBleConnection1(this.deviceId) infinitely if device is found
  • You don't want check more if device is not found

So for recursive call there is an operator in Rxjs called expand link

To achieve 1 sec interval adding delay(1000) while returning for recursive call

SO here is my version using expand in place of concatMap(x => interval(1000))

using expand there is no need of using takeWhile,so am removing it

reprovision(){
  this.bleMessage = this.prepareMessageService.prepareMessage(
    MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
  );

  from(this.bleService.bleWrite('custom', this.bleMessage))
    .pipe(
      expand((x) => from(this.bleService.checkBleConnection1(this.deviceId)).pipe(delay(1000)))
    )
    .subscribe({
      next: (resp) => console.log('still connected to peripheral'),
      error: (err) =>
        console.error('disconnected from peripheral, do something else'),
      complete: () => console.log('Completed !'),
    });
}

Upvotes: 1

bryan60
bryan60

Reputation: 29315

I believe the goal here is to check the connection every 1 second after you've completed the first promise (side note: this is called polling, not recursion, it'd be recursive if you called reprovision() from inside reprovision() or something similar, you can poll recursively but you're not here and generally don't want to unless you have to).

You can't really get rid of the second concatMap because you have to switch into it based on the interval, you can separate and clean up the streams a little bit like this:

const pollConnection$ = interval(1000).pipe(
  concatMap(i => this.bleService.checkBleConnection1(this.deviceId)),
  takeWhile(x => x, true)
);

from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
  concatMap(x => pollConnection$),
).subscribe(
    resp => console.log('still connected to peripheral'),
    err =>  {
      console.log('disconnected from peripheral, do something else');
    }
  );

but you need to be careful with an operator like concatMap as it can build backpressure which is bad for an app. if these requests can take longer than 1 second to respond, then you'll be building a queue of requests which can deteriorate your application performance. safer alternatives are switchMap and exhaustMap depending on desired behavior

Upvotes: 1

Related Questions