peterc
peterc

Reputation: 7843

Rx: Buffer with variable back pressured buffer time

I have played a little with Rx, but still consider myself newish to the world. I have a problem, and I am wondering if I can solve it via Rx. My initial use case is in C#, but may later want the same in JS (though if any code snippets in answers, any language of pseudo code is fine)

Currently I have a client app which simply buffers data (transactions) it creates, and sends to the server every 5 seconds. This data can contain many individual transactions, e.g. it has stored many and now come online so we want to send perhaps thousands to the server. When sending many as just described, the 8 second delay and buffering is fine (the data is already delayed anyway). However, when this client is, for example connected, and we create a transaction in real-time (i.e. just 1 or 2 single transactions), I would like to be able to send these immediately, i.e. not wait the 8 seconds.

So it looks similar to the Rx buffer, crossed with maybe the debounce? I have attempted to draw a marble diagram to help explain using an advanced graphic package (not) paint.

enter image description here

So, to walk through this, the 1st red marble is received and forwarded straight away. Next the yellow marble is also forwarded straight away as it has been 1 second since the last red marble.

Now the light blue marble, and a bunch of others come in less than 1 second, so we now want to buffer these, as we don't want to spam the network with possibly thousands of requests - what we will do here is buffer for say 5 seconds, and then send however many we have buffered, every 5 seconds, until this "spurt" has finished. After this, we then want to return to sending any other "individual" requests as they come.

It doesn't have t be exactly like the above, basically we want

I have found a number of other posts similar, but not the same as this.

I've come up with some clunky "manual" ways of doing this (using just standard lists, and various timers etc), but was wondering would this be possible using Rx to do some of this work, and hopefully less prone to bugs?

Thanks in advance for any help here!

Upvotes: 3

Views: 444

Answers (1)

Richard Matsen
Richard Matsen

Reputation: 23463

I think you picked it already, buffer and debounce as the buffer trigger,

when this client is, for example connected

If you want to added a connection event, you could merge that into bufferTrigger as well.

console.clear()

const source = new Rx.Subject();
const bufferTrigger = source.debounceTime(500);

source
  .buffer(bufferTrigger)
  .subscribe(console.log);

setTimeout(() => source.next('red'), 0);
setTimeout(() => source.next('yellow'), 1000);
setTimeout(() => source.next('lightblue'), 3000);
setTimeout(() => source.next('green'), 3100);
setTimeout(() => source.next('blue'), 3200);
setTimeout(() => source.next('pink'), 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

Example with additional trigger 5 secs after last emit

const source = new Rx.Subject();
const lastEmit = new Rx.Subject();
const maxDelayAfterLastEmit = lastEmit.delay(5000);
const bufferTrigger = source.debounceTime(500)
  .merge(maxDelayAfterLastEmit);

const emits = source
  .buffer(bufferTrigger)
  .do(x => lastEmit.next(x))
  .filter(x => x.length);

var start = new Date().getTime();
emits.subscribe(x => console.log((new Date().getTime() - start)/1000  + "s " + x));

setTimeout(() => source.next('red'), 0);
setTimeout(() => source.next('yellow'), 1000);
setTimeout(() => source.next('lightblue'), 3000);
setTimeout(() => source.next('green'), 3100);
setTimeout(() => source.next('blue1'), 3200);
setTimeout(() => source.next('blue2'), 3300);
setTimeout(() => source.next('blue3'), 3600);
setTimeout(() => source.next('pink1'), 4000);
setTimeout(() => source.next('pink2'), 4400);
setTimeout(() => source.next('pink3'), 4800);
setTimeout(() => source.next('pink4'), 5200);
setTimeout(() => source.next('pink5'), 5600);
setTimeout(() => source.next('pink6'), 6000);
setTimeout(() => source.next('pink7'), 6400);
setTimeout(() => source.next('pink8'), 6800);
setTimeout(() => source.next('pink9'), 7200);
setTimeout(() => source.next('pink10'), 7700);
setTimeout(() => source.next('pink11'), 8200);
setTimeout(() => source.next('pink12'), 8600);
setTimeout(() => source.next('pink13'), 9000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

Upvotes: 1

Related Questions