Mazzy
Mazzy

Reputation: 14219

Promisify streams

I'm trying to promisify streams but it appears harder than I expected. Here is my attempt:

'use strict'

const Promise = require('bluebird')
const Twitter = require('twitter')

const TwitterStream = module.exports = function TwitterStream (config) {
  // init Twitter Streaming API for OAuth
  this.stream = new Twitter({
    consumer_key: config.get('/twitter/consumerKey'),
    consumer_secret: config.get('/twitter/consumerSecret'),
    access_token_key: config.get('/twitter/accessTokenKey'),
    access_token_secret: config.get('/twitter/accessTokenSecret')
  })
  .stream('statuses/filter', {
    track: config.get('/twitter/track')
  })
}

TwitterStream.prototype.receive = function () {
  return new Promise((resolve, reject) => {
    this.stream.on('data', resolve).on('error', reject)
  })
}

TwitterStream.prototype.destroy = function () {
  this.stream.destroy()
}

The main problem is that when I create the object

const stream = new TwitterStream(config)

stream.receive().then((data) => console.log(data))

when I execute only one object is read. no other data are streamed.

  TwitterStream.prototype.receive = function () {
     return new Promise((resolve, reject) => {
       this.stream
         .on('data', (data) => resolve(data)
         .on('error', (error) => reject(error))
       })
    }

Upvotes: 2

Views: 4006

Answers (4)

Michał Karpacki
Michał Karpacki

Reputation: 2658

I think you might want to take a look at my, already promisified streams in scramjet.

For your Twitter example this code should work well:

const stream = new Twitter({
    consumer_key: config.get('/twitter/consumerKey'),
    consumer_secret: config.get('/twitter/consumerSecret'),
    access_token_key: config.get('/twitter/accessTokenKey'),
    access_token_secret: config.get('/twitter/accessTokenSecret')
})
.stream('statuses/filter', {
   track: config.get('/twitter/track')
})
.pipe(new scramjet.DataStream)

Then perform any transformations you like... for example map the stream somehow and accumulate the stream into an array when you're done.

stream.map(
    function (a) { return modifyTheTweetSomehow(a); } // a Promise can be returned here
).accumulate(
    function(a, i) { a.push(i); },
    []
) // this returns a Promise that will be resolved on stream end.

I hope you like it. :)

Upvotes: 0

t.niese
t.niese

Reputation: 40872

Here is a not tested and most likely still buggy code to illustrate how you could do it with promises:

function defer() {
  var resolve, reject;
  var promise = new Promise(function() {
    resolve = arguments[0];
    reject = arguments[1];
  });
  return {
    resolve: resolve,
    reject: reject,
    promise: promise
  };
}


TwitterStream.prototype.receive = function() {

  this.stream
    .on('data', data => {
      this.dataCache = this.dataCache || [];
      this.dataCache.push(data);
      this.tryToSendData()
    })
    .on('end', () => {
      this.finished = true;
      this.tryToSendData()
    })
    .on('error', err => {
      this.lastError = err;
      // error handling still missing
    })

  return this;
}

TwitterStream.prototype.tryToSendData = function() {
  if (this.defered) {
    let defered = this.defered;
    this.defered = null;

    // if data is available or finished then pass the first element of buffer (or undefined)
    defered.resolve(this.dataCache.shift())
  }
}

TwitterStream.prototype.getNextData = function() {
  if (this.dataCache.length > 0 || this.finished) {
    // if data is available or finished then pass the first element of buffer (or undefined)
    return Promise.resolve(this.dataCache.shift());
  } else {
    // otherwise we need a defered object
    this.defered = defer();
  }
}

The usage could then look like this:

stream.receive().getNextData()
  .then(function processData(data) {
    if (data) {
      console.dir(data);

      // if data is available then continue requestin the data
      return stream.getNextData().then(processData);
    }
  })

It is a rare case where you could use Deferreds.

Upvotes: 0

olivarra1
olivarra1

Reputation: 3409

By using Rx extensions, it's pretty straightforward:

TwitterStream.prototype.receive = function () {
    return Rx.Observable.create((observer) => {
        this.stream
            .on('data', (data) => observer.onNext(data))
            .on('error', (err) => observer.onError(err));
    });
}

And then

const stream = new TwitterStream(config)

stream.receive().subscribe((data) => console.log(data));

Upvotes: 2

Bhargav
Bhargav

Reputation: 918

You need to return a promise in the callback of the stream.on function. Right now, the receive method when being called just returns a promise which once resolved returns the value or error.

Upvotes: 0

Related Questions