Reputation: 400
I'm currently trying to implement a simple offline/online sync mechanism using observables.
Basically, I have two observables:
What I want to achieve is to combine the above observables so that:
false
, the combined observable shouldn't emit. In this case, the data observable should retain its statetrue
, the combined observable should emit every time there is data in the data observablefalse
to true
, it should emit for every value on the data observableA small example that currently uses filter and combineLatest can be found here: https://codesandbox.io/s/offline-sync-s5lv49?file=/src/index.js
Unfortunately, this doesn't behave as intended at all.
Is there any operator to achieve the required behavior? As an alternative, I could maybe poll the connection status of course and emit every X seconds. But ideally, I'd like a clean combination of Observables, though I'm a bit lost with what operator makes most sense.
To clear the idea up: I need to sync all data, not just the latest. So the data observable should buffer the data.
Upvotes: 1
Views: 3763
Reputation: 400
After also searching for the term "gate", I found the following stack overflow question and post: Conditional emission delays with rxjs
Basically, the answer is using delayWhen to achieve the desired result.
I've updated an example here: https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357
The crucial part is:
const offlineOnlineSubject = new BehaviorSubject(false);
const dataSubject = new Subject();
const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));
dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
console.log("Syncing data", {
counter
});
syncedIndicator.innerHTML += `<li>${counter}</li>`;
});
Wrapped in a custom typescript operator:
import { MonoTypeOperatorFunction, Observable } from 'rxjs';
import { delayWhen, filter } from 'rxjs/operators';
export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
const gateTriggerFn = () => gateTrigger.pipe(
filter((v) => v)
);
return (source: Observable<T | null | undefined>) => source.pipe(
delayWhen(gateTriggerFn)
);
}
It seems so far that this solution is doing what I intend it to do.
Upvotes: 1
Reputation: 569
So when we are in offline
mode, I store all the emitted data in a buffer, and once we change from offline
to online
, I simply emit everything in the buffer:
let dataBuffer = [];
let isPreviouslyOnline = false;
combineLatest([dataSubject, offlineOnlineSubject])
.pipe(
filter(([data, isOnline]) => {
if (!isOnline) {
if (!isPreviouslyOnline) {
dataBuffer.push(data);
}
isPreviouslyOnline = false;
return false;
}
return true;
}),
switchMap(([data]) => {
isPreviouslyOnline = true;
if (dataBuffer.length > 0) {
const tempData = [...dataBuffer];
dataBuffer = [];
return from(tempData);
} else {
return of(data);
}
})
)
.subscribe((data) => {
console.log("Data is: ", data);
});
Code sandbox: https://codesandbox.io/s/offline-sync-forked-k38kc3?file=/src/index.js
I think it works but it doesn't feel great, was trying to use the native rxjs
buffer operator to achieve it but couldn't figure out how. Would love to see if anyone has a better/cleaner solution.
Upvotes: 2
Reputation: 96889
It looks like were on the right path and could do just the following:
combineLatest([
offlineOnlineSubject,
dataSubject
])
.pipe(
filter(([online, counter]) => online),
)
.subscribe(([online, counter]) => {
syncedIndicator.textContent = counter;
});
Upvotes: 2