Michael Connor
Michael Connor

Reputation: 4232

Dependency management in node.js with highland.js

I am getting huge value out of node.js and loving the stream processing model. I'm mostly using it for stream processing with data enrichment and ETL like jobs.

For enrichment, I may have a record like this...

{ "ip":"123.45.789.01", "productId": 12345 }

I would like to enrich this perhaps by adding product details

{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 }

The data for the descriptions and the data for the prices both come from separate streams. What is the best way to approach such dependencies in highland?

H = require('highland')

descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]'])
  .flatMap(JSON.parse)

priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]'])
  .flatMap(JSON.parse)

#  the file is a 10G file with a json record on each line
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'}))
  .splitBy("\n")
  .take(100000) # just take 100k for testing
  .filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines
  .doto((v)->
    # here i want to add the decription from the descriptionStream
    # and i want to add the price from the price stream.
    # in order to do that, i need to make the execution of this
    # stream dependent on the completion of the first two and
    # availability of that data.  this is easy with declarative
    # programming but less intuitive with functional programming
  )
  .toArray((results)->
    # dump my results here
  )

Any thoughts?

Upvotes: 0

Views: 43

Answers (2)

Michael Connor
Michael Connor

Reputation: 4232

Here is a stab at this. Is this the right approach?

H = require('highland')

# these values would come from some api/file
descriptionStream = H([{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}])
  .reduce({}, (memo,v)->
    memo[v.productId] = v;
    return memo
  )

# these values would come from some api/file
priceStream = H([{"productId":1,"price":4.00},{"productId":2,"price":1.25}])
  .reduce({}, (memo,v)->
    memo[v.productId] = v;
    return memo
  )

H([descriptionStream, priceStream])
  .series()
  .toArray((dependencies)->
    [descriptionIndex, priceIndex] = dependencies

    # these values would come from an api/file
    H([{productId:1},{productId:2}])
      .doto((v)-> v.description = descriptionIndex[v.productId].description)
      .doto((v)-> v.price = priceIndex[v.productId].price)
      .each((v)->
        console.log(JSON.stringify(v))
      )
  )

This gives me the right results but not sure if this is the elegant way to do stream dependency. I'm also assuming that if you needed the price or description stream more than once then you would fork them.

{"productId":1,"description":"Coca-Cola 12Pk","price":4}
{"productId":2,"description":"Coca-Cola 20oz Bottle","price":1.25}

Upvotes: 0

Yuri Zarubin
Yuri Zarubin

Reputation: 11677

If you're using highland.js, you can use .map and supply a function in order to modify each item.

e.g.

var stream = _([{ "ip":"123.45.789.01", "productId": 12345 }]).map(function (x) {
   x.productName = 'Coca-Cola 12 Pack'
   return x;
});

Upvotes: 0

Related Questions