Gernot R. Bauer
Gernot R. Bauer

Reputation: 400

RxJS: Combine two observables so the observable only emits if the value of the first observable is true

I'm currently trying to implement a simple offline/online sync mechanism using observables.

Basically, I have two observables:

  1. Connection observable: The first observable gives me the information whether or not there is an internet connections. It emits when the network state changes
  2. Data observable: The second observable has the data that needs to synced. It emits when there is new data to be synced

What I want to achieve is to combine the above observables so that:

A small example that currently uses filter and combineLatest can be found here: https://codesandbox.io/s/offline-sync-s5lv49?file=/src/index.js

Unfortunately, this doesn't behave as intended at all.

Is there any operator to achieve the required behavior? As an alternative, I could maybe poll the connection status of course and emit every X seconds. But ideally, I'd like a clean combination of Observables, though I'm a bit lost with what operator makes most sense.

To clear the idea up: I need to sync all data, not just the latest. So the data observable should buffer the data.

Upvotes: 1

Views: 3763

Answers (3)

Gernot R. Bauer
Gernot R. Bauer

Reputation: 400

After also searching for the term "gate", I found the following stack overflow question and post: Conditional emission delays with rxjs

Basically, the answer is using delayWhen to achieve the desired result.

I've updated an example here: https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357

The crucial part is:

const offlineOnlineSubject = new BehaviorSubject(false);
const dataSubject = new Subject();

const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));

dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
  console.log("Syncing data", {
    counter
  });

  syncedIndicator.innerHTML += `<li>${counter}</li>`;
});

Wrapped in a custom typescript operator:

import { MonoTypeOperatorFunction, Observable } from 'rxjs';
import { delayWhen, filter } from 'rxjs/operators';

export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
  const gateTriggerFn = () => gateTrigger.pipe(
    filter((v) => v)
  );

  return (source: Observable<T | null | undefined>) => source.pipe(
    delayWhen(gateTriggerFn)
  );
}

It seems so far that this solution is doing what I intend it to do.

Upvotes: 1

Nam
Nam

Reputation: 569

So when we are in offline mode, I store all the emitted data in a buffer, and once we change from offline to online, I simply emit everything in the buffer:

let dataBuffer = [];
let isPreviouslyOnline = false;

combineLatest([dataSubject, offlineOnlineSubject])
  .pipe(
    filter(([data, isOnline]) => {
      if (!isOnline) {
        if (!isPreviouslyOnline) {
          dataBuffer.push(data);
        }

        isPreviouslyOnline = false;
        return false;
      }

      return true;
    }),
    switchMap(([data]) => {
      isPreviouslyOnline = true;
      if (dataBuffer.length > 0) {
        const tempData = [...dataBuffer];
        dataBuffer = [];

        return from(tempData);
      } else {
        return of(data);
      }
    })
  )
  .subscribe((data) => {
    console.log("Data is: ", data);
  });

Code sandbox: https://codesandbox.io/s/offline-sync-forked-k38kc3?file=/src/index.js

I think it works but it doesn't feel great, was trying to use the native rxjs buffer operator to achieve it but couldn't figure out how. Would love to see if anyone has a better/cleaner solution.

Upvotes: 2

martin
martin

Reputation: 96889

It looks like were on the right path and could do just the following:

combineLatest([
  offlineOnlineSubject,
  dataSubject
])
  .pipe(
    filter(([online, counter]) => online),
  )
  .subscribe(([online, counter]) => {
    syncedIndicator.textContent = counter;
  });

Upvotes: 2

Related Questions