Reputation: 320
I'm using NGRX and Effects for basic stuff in an app. The app does bluetooth le communication and refreshes/writes various parameters constantly. However sometimes it's necessary to pause this refreshing.
I'm having trouble with pausing the execution of an NGRX Effect which uses concatMap to queue actions into a serial queue. The queued actions should still be processed after some sort "continue" signal has been given or when the bluetoothService.paused property becomes false again. There can be many ReadFromDevice actions queued in the concatMap.
The bluetooth service has a boolean property this.bluetoothService.pauseCommunication, however I don't know how to integrate that into the Effect. I have tried various (probably stupid) things but failed so far. Unfortunately I currently can not change the bluetoothService code.
I know I can cancel the complete concatMap by throwing an error, but that's not what I need. I just need to pause the processing until the boolean flag becomes false.
This is a simplified example Effect I'm using
@Effect()
readParameterFromDevice$: Observable<Action> = this.actions$.pipe(
ofType<ReadFromDevice>(CommunicationActionTypes.ReadFromDevice),
map(action => action.payload),
concatMap(async request => {
try {
const result = await this.bluetoothService.readFromDevice(
request
);
return new ReadSuccess({
result
});
} catch (error) {
return new ReadError({
result
});
}
})
);
Would be great if anybody could point me into the right direction.
Upvotes: 1
Views: 550
Reputation: 15505
I think you're looking for the buffer
operator.
Buffers the source Observable values until closingNotifier emits.
See the docs.
import { fromEvent, interval } from 'rxjs';
import { buffer } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const interval = interval(1000);
const buffered = interval.pipe(buffer(clicks));
buffered.subscribe(x => console.log(x));
Or the bufferToggle
operator.
Buffers the source Observable values starting from an emission from openings and ending when the output of closingSelector emits.
See the docs
import { fromEvent, interval, empty } from 'rxjs';
import { bufferToggle } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const openings = interval(1000);
const buffered = clicks.pipe(bufferToggle(openings, i =>
i % 2 ? interval(500) : empty()
));
buffered.subscribe(x => console.log(x));
Upvotes: 1