Reputation: 455
I want to execute a pre-defined promise (promise 2) recursively after another promise (promise 1) resolves to see when my connection to a peripheral device returns false, following which I can do something else. I'd like to be able to do so using observables. Here's the code I've come up with so far and have 2 questions:
1) Is there a better alternative to using concatMap twice in my code in the reprovision() function,
and
2) Is there a better way to implement my code to do what I've described above?
Here's the code which uses observables to execute promise 1 (only once) and promise 2 recursively and then does something with the result:
reprovision(){
this.bleMessage = this.prepareMessageService.prepareMessage(
MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
);
from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
concatMap(x => interval(1000)),
concatMap(x => this.bleService.checkBleConnection1(this.deviceId)),
takeWhile(x => x, true)).subscribe(
resp => console.log('still connected to peripheral'),
err => {
console.log('disconnected from peripheral, do something else');
}
);
}
Here's the code for promise 2 (this.bleService.checkBleConnection1), which I want executed recursively until the peripheral is disconnected from my mobile device. The idea behind this promise is that it should resolve as long as my mobile device is still connected to a peripheral, and throw an error once that connection fails.
checkBleConnection1(deviceId: string){
return BleClient.getConnectedDevices([]) //BleClient.getConnectedDevices([]) returns a promise
.then(resp => {
if (!resp.some(elem => deviceId === elem.deviceId)){
throw new Error('Peripheral connection failed');
}
return true;
})
.catch(error => {
throw error;
});
}
Upvotes: 2
Views: 498
Reputation: 888
From your post what I understood is
this.bleService.checkBleConnection1(this.deviceId)
infinitely if device is foundSo for recursive call there is an operator in Rxjs called expand
link
To achieve 1 sec interval adding delay(1000)
while returning for recursive call
SO here is my version using expand
in place of concatMap(x => interval(1000))
using expand
there is no need of using takeWhile
,so am removing it
reprovision(){
this.bleMessage = this.prepareMessageService.prepareMessage(
MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
);
from(this.bleService.bleWrite('custom', this.bleMessage))
.pipe(
expand((x) => from(this.bleService.checkBleConnection1(this.deviceId)).pipe(delay(1000)))
)
.subscribe({
next: (resp) => console.log('still connected to peripheral'),
error: (err) =>
console.error('disconnected from peripheral, do something else'),
complete: () => console.log('Completed !'),
});
}
Upvotes: 1
Reputation: 29315
I believe the goal here is to check the connection every 1 second after you've completed the first promise (side note: this is called polling, not recursion, it'd be recursive if you called reprovision()
from inside reprovision()
or something similar, you can poll recursively but you're not here and generally don't want to unless you have to).
You can't really get rid of the second concatMap
because you have to switch into it based on the interval, you can separate and clean up the streams a little bit like this:
const pollConnection$ = interval(1000).pipe(
concatMap(i => this.bleService.checkBleConnection1(this.deviceId)),
takeWhile(x => x, true)
);
from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
concatMap(x => pollConnection$),
).subscribe(
resp => console.log('still connected to peripheral'),
err => {
console.log('disconnected from peripheral, do something else');
}
);
but you need to be careful with an operator like concatMap
as it can build backpressure which is bad for an app. if these requests can take longer than 1 second to respond, then you'll be building a queue of requests which can deteriorate your application performance. safer alternatives are switchMap
and exhaustMap
depending on desired behavior
Upvotes: 1