Pascal
Pascal

Reputation: 2164

RXJS Observable stretch

I have a Rx.Observable.webSocket Subject. My server endpoint can not handle messages receiving the same time (<25ms). Now I need a way to stretch the next() calls of my websocket subject.

I have created another Subject requestSubject and subscribe to this. Then calling next of the websocket inside the subscription.

requestSubject.delay(1000).subscribe((request) => {
  console.log(`SENDING: ${JSON.stringify(request)}`);
  socketServer.next(JSON.stringify(request));
});

Using delay shifts each next call the same delay time, then all next calls emit the same time later ... thats not what I want.

I tried delay, throttle, debounce but it does not fit.

The following should illustrate my problem

Stream 1 | ---1-------2-3-4-5---------6----

    after some operation ...

Stream 2 | ---1-------2----3----4----5----6-

Upvotes: 5

Views: 278

Answers (3)

Tom Daniel
Tom Daniel

Reputation: 323

Mark van Straten's solution didn't work completely accurately for me. I found a much more simple and accurate solution based from here.

const source = from([100,500,1500,1501,1502,1503]).pipe(
    mergeMap(i => of(i).pipe(delay(i)))
);

const delayMs = 500;
const stretchedSource = source.pipe(
  concatMap(e => concat(of(e), EMPTY.pipe(delay(delayMs))))
);

Upvotes: 0

Olaf Horstmann
Olaf Horstmann

Reputation: 16892

Here are two solutions using a custom stream and using only rxjs-operators - since it looks quite complicated I would not advice you to use this solution, but to use a custom stream (see 1st example below):

Custom stream (MUCH easier to read and maintain, probably with better performance as well):

const click$ = Rx.Observable
  .fromEvent(document.getElementById("btn"), "click")
  .map((click, i) => i);

const spreadDelay = 1000;
let prevEmitTime = 0;

click$
  .concatMap(i => {  // in this case you could also use "flatMap" or "mergeMap" instead of "concatMap"
    const now = Date.now();
    if (now - prevEmitTime > spreadDelay) {
      prevEmitTime = now;
      return Rx.Observable.of(i); // emit immediately
    } else {
      prevEmitTime += spreadDelay;
      return Rx.Observable.of(i).delay(prevEmitTime - now); // emit somewhere in the future
    }
  })
  .subscribe((request) => {
      console.log(`SENDING: ${request}`);
  });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
<button id="btn">Click me!</button>


Using only RxJS Operators (contains issues, probably shouldn't use):

const click$ = Rx.Observable
  .fromEvent(document.getElementById("btn"), "click")
  .map((click, i) => i);

click$
  // window will create a new substream whenever no click happened for 1001ms (with the spread out 
  .window(click$
    .concatMap(i => Rx.Observable.of(i).delay(1000))
    .debounceTime(1001)
  )
  .mergeMap(win$ => Rx.Observable.merge(
    win$.take(1).merge(), // emitting the "first" click immediately
    win$.skip(1)
      .merge()
      .concatMap(i => Rx.Observable.of(i).delay(1000)) // each emission after the "first" one will be spread out to 1 seconds
  ))
  .subscribe((request) => {
      console.log(`SENDING: ${request}`);
  });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
<button id="btn">Click me!</button>

Upvotes: 1

Mark van Straten
Mark van Straten

Reputation: 9425

Had to tinker a bit, its not as easy as it looks:

//example source stream
const source = Rx.Observable.from([100,500,1500,1501,1502,1503])
  .mergeMap(i => Rx.Observable.of(i).delay(i))
  .share();

stretchEmissions(source, 1000)
  .subscribe(val => console.log(val));

function stretchEmissions(source, spacingDelayMs) {
  return source
    .timestamp()
    .scan((acc, curr) => {
      // calculate delay needed to offset next emission
      let delay = 0;
      if (acc !== null) {
        const timeDelta = curr.timestamp - acc.timestamp;
        delay = timeDelta > spacingDelayMs ? 0 : (spacingDelayMs - timeDelta);
      }
  
      return {
        timestamp: curr.timestamp,
        delay: delay,
        value: curr.value
      };
    }, null)
    .mergeMap(i => Rx.Observable.of(i.value).delay(i.delay), undefined, 1);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

Basically we need to calculate the needed delay between emissions so we can space them. We do this using timestamp() of original emissions and the mergeMap overload with a concurrency of 1 to only subscribe to the next delayed value when the previous is emitted. This is a pure Rx solution without further side effects.

Upvotes: 2

Related Questions