kharandziuk
kharandziuk

Reputation: 12900

Highland.js and buffer(or queue)

So, I have a code bellow to model the issue:

const H = require('highland');
const Promise = require('bluebird');

let i = 0
const stream = H(function(push, next) {
  console.log('read', i)
  push(null, i)
  i++;
  Promise.delay(100).then(() => next())
})


stream
  .flatMap(function(x) {
    console.log('start writing', x)
    return H(Promise.delay(2000, 'y').tap(() => console.log('finish writing', x)))
  })
  .done()

which produces output like this:

read 0
start writing 0
finish writing 0
read 1
start writing 1

the problem: I want to have a buffer of some size where I will stack the data from producer. So, with the buffer of size 1, output should look like:

read 0
start writing 0
read 1
finish writing 0
start writing 1
read 2

so, I want to buffer value from producer if the "producer" is "busy". Is it possible to do such thing with highland?

Upvotes: 0

Views: 380

Answers (2)

kharandziuk
kharandziuk

Reputation: 12900

The answer is in my arcticle

The idea here – buffering is a part of stream api. Highland just manipulate streams

Upvotes: 0

Michał Karpacki
Michał Karpacki

Reputation: 2658

No. It's not possible (not by promises, however yes - with callbacks) and they did state that in this longish issue thread

One of the reasons why I decided to write scramjet.

A simple case like yours:

let i = 0;
const stream = new (require("scramjet").DataStream)({read() { this.push(i++); })

stream.map(
    (x) => Promise.delay(2000, 'y').tap(() => console.log('finish writing', x))
).accumulate(
    () => 0
).then(
    () => console.log("done")
)

It just works as you want it. :)

Upvotes: 0

Related Questions