Reputation: 7521
I'm using the job-collection package to do the following:
event-stream
packageThe file is too large to buffer, so streaming is required. Here is a small file with a few examples of the metadata if you wish to try this.
Each job from the job-collection
package is already inside an async function:
var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');
function (job, callback) {
//This download is much too long to block
request({url: job.fileURL, encoding: null}, function (error, response, body) {
if (error) console.error('Error downloading File');
if (response.statusCode !== 200) console.error(downloadResponse.statusCode, 'Status not 200');
var responseEncoding = response.headers['content-type'];
console.log('response encoding is %s', responseEncoding);
if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
console.log('Received binary/octet-stream');
var regexSplit = /WARC\/1\./;
response.pipe(zlib.createGunzip()
.pipe(EventStream.split(regexSplit))
.pipe(EventStream.map(function (webpageMetaData) {
/* Need parse the metaData or pass each webpageMetaData to function
* This next function could block if it had to */
searchPageMetaData(webpageMetaData); // pass each metadatum to this function to update a collection - this function can be synchronous
}));
} else {
console.error('Wrong encoding');
}
});
}
function searchWebPageMetaData(metaData) {
// Parse JSON and search collection for match
}
Meteor.bindEnvironment
? - do I I bind the environment for each time I pass to searchWebPageMetaData()
? Do I need to expressly use fibers here?process.stdout
. Am I supposed to put the stream into one of Meteor's wrapMeteor.wrapAsync
. Do I want to wrap the innermost searchWebPageMetaData()
function in Meteor.wrapAsync
? (think I'm answering this yes as I type)I've spent quite a while learning about Meteor's wrapAsync
, and bindEnvironment
but having trouble bringing it all together and understanding where to use them.
SUPPLEMENT 1
Just to clarify, the steps are:
I was trying to do something like this, except the core code I need help with was in a function in a different file. The following code has most of @electric-jesus' answer in there.
processJobs('parseWatFile', {
concurrency: 1,
cargo: 1,
pollInterval: 1000,
prefetch: 1
}, function (job, callback) {
if (job.data.watZipFileLink) {
queue.pause();
console.log('queue should be paused now');
var watFileUrl = 'https://s3.amazonaws.com/ja-common-crawl/exampleWatFile.wat.gz';
function searchPageMetaData(webpageMetaData, callback) {
console.log(webpageMetaData); // Would be nice to just get this function logging each webPageMetaData
future.return(callback(webpageMetaData)); //I don't need this to return any value - do I have to return something?
}
if (!watFile)
console.error('No watFile passed to downloadAndSearchWatFileForEntity ');
var future = new Future(); // Doc Brown would be proud.
if(typeof callback !== 'function') future.throw('callbacks are supposed to be functions.');
request({url: watFile, encoding: null}, function (error, response, body) {
if (error) future.throw('Error Downloading File');
if (response.statusCode !== 200) future.throw('Expected status 200, got ' + response.statusCode + '.');
var responseEncoding = response.headers['content-type'];
if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
var regexSplit = /WARC\/1\./;
response.pipe(zlib.createGunzip()
.pipe(EventStream.split(regexSplit))
.pipe(EventStream.map(function (webpageMetaData) {
searchPageMetaData(webpageMetaData, callback);
})
));
} else {
future.throw('Wrong encoding');
}
});
return future.wait();
} else {
console.log('No watZipFileLink for this job');
job.log('ERROR: NO watZipFileLink from commonCrawlJob collection');
}
queue.resume();
job.done;
callback();
}
Upvotes: 6
Views: 2129
Reputation: 7030
This is quite tricky if you want to handle all errors correctly. So one should ask themself, what to do if: you code throws an exception, or error
event handler is called. You want that errors propagate correctly, that is, are thrown as an exception in the fiber calling streaming code. I implemented something like this for one of our job-collecton tasks, for extracting tar files.
First you need some helper functions:
bindWithFuture = (futures, mainFuture, fun, self) ->
wrapped = (args...) ->
future = new Future()
if mainFuture
future.resolve (error, value) ->
# To resolve mainFuture early when an exception occurs
mainFuture.throw error if error and not mainFuture.isResolved()
# We ignore the value
args.push future.resolver()
try
futures.list.push future
fun.apply (self or @), args
catch error
future.throw error
# This waiting does not really do much because we are
# probably in a new fiber created by Meteor.bindEnvironment,
# but we can still try to wait
Future.wait future
Meteor.bindEnvironment wrapped, null, self
wait = (futures) ->
while futures.list.length
Future.wait futures.list
# Some elements could be added in meantime to futures,
# so let's remove resolved ones and retry
futures.list = _.reject futures.list, (f) ->
if f.isResolved()
# We get to throw an exception if there was an exception.
# This should not really be needed because exception should
# be already thrown through mainFuture and we should not even
# get here, but let's check for every case.
f.get()
true # And to remove resolved
And then you can run something like:
mainFuture = new Future()
# To be able to override list with a new value in wait we wrap it in an object
futures =
list: []
bindWithOnException = (f) =>
Meteor.bindEnvironment f, (error) =>
mainFuture.throw error unless mainFuture.isResolved()
onWebpageMetaData = (metaData, callback) =>
return callback null if mainFuture.isResolved()
# Do whatever you want here.
# Call callback(null) when you finish.
# Call callback(error) if there is an error.
# If you want to call into a Meteor code inside some other callback for async code you use,
# use bindWithOnException to wrap a function and stay inside a Meteor environment and fiber.
MeteorCollection.insert
metaData: metaData
callback null
requestFuture = new Future()
request
url: job.fileURL
encoding: null
,
(error, response, body) ->
return requestFuture.throw error if error
return requestFuture.throw new Error "Expected status 200, got #{ response.statusCode }." unless response.statusCode is 200
requestFuture.return response
response = requestFuture.wait()
responseEncoding = response.headers['content-type']
throw new Error "Wrong encoding" unless responseEncoding in ['application/octet-stream', 'binary/octet-stream']
regexSplit = /WARC\/1\./
response.pipe(
zlib.createGunzip()
).pipe(
EventStream.split regexSplit
).pipe(
EventStream.map bindWithFuture futures, mainFuture, onWebpageMetaData
).on('end', =>
# It could already be resolved by an exception from bindWithFuture or bindWithOnException
mainFuture.return() unless mainFuture.isResolved()
).on('error', (error) =>
# It could already be resolved by an exception from bindWithFuture or bindWithOnException
mainFuture.throw error unless mainFuture.isResolved()
)
mainFuture.wait()
wait futures
Upvotes: 3
Reputation: 4486
Interesting, looks alright. I've never worked with job-collection
but it seems to be just a Mongo-driven task queue.. so I am assuming it works like a regular queue. I've always found for stuff with callback, I most certainly use the Future
pattern. e.g:
var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');
var Future = Npm.require('fibers/future');
var searchWebPageMetaData = function (metaData) {
// Parse JSON and search collection for match
// make it return something
var result = /droids/ig.test(metaData);
return result;
}
var processJob = function (job, callback) {
var future = new Future(); // Doc Brown would be proud.
if(typeof callback !== 'function') future.throw("Oops, you forgot that callbacks are supposed to be functions.. not undefined or whatever.");
//This download is much too long to block
request({url: job.fileURL, encoding: null}, function (error, response, body) {
if (error) future.throw("Error Downloading File");
if (response.statusCode !== 200) future.throw("Expected status 200, got " + downloadResponse.statusCode + ".");
var responseEncoding = response.headers['content-type'];
if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
var regexSplit = /WARC\/1\./;
response.pipe(zlib.createGunzip()
.pipe(EventStream.split(regexSplit))
.pipe(EventStream.map(function (webpageMetaData) {
/* Need parse the metaData or pass each webpageMetaData to function
* This next function could block if it had to */
// pass each metadatum to this function to update a collection - this function can be synchronous
future.return(callback(webpageMetaData)); // this way, processJob returns whatever we find in the completed webpage, via callback.
}));
} else {
future.throw('Wrong encoding');
}
});
return future.wait();
}
so whenever you assign variables here:
var currentJob = processJob(myjob, searchWebPageMetaData);
and even with synchronous type obtainment/variable assignment, you get your async stuff done and transported just-in-time for you.
To answer your questions,
Where to put Meteor.bindEnvironment? - do I I bind the environment for each time I pass to searchWebPageMetaData()? Do I need to expressly use fibers here?
not really, i believe the explicit use of fibers/future
already take care of this.
The stream stops when running this if I run it to process.stdout. Am I supposed to put the stream into one of Meteor's wrap
how do you mean? I vaguely remember process.stdout is blocking, that might be a cause. again, wrapping the result in a future
should take care of this.
I'm aware of Meteor.wrapAsync. Do I want to wrap the innermost searchWebPageMetaData() function in Meteor.wrapAsync? (think I'm answering this yes as I type)
Take a look at the Meteor.wrapAsync helper code. It's basically a future
resolution applied, of course you can do this then again you can also explicitly use fibers/future
on its own with no problem.
Will the stream slow to compensate for the slowness of the DB calls? My guess is no but how do I deal with that?
Not really sure what you mean here.. but since we're trying to use asynchronous fibers, my guess is no as well. I've yet to see any slowness with the use of fibers. Probably only if there are multiple jobs launched (and concurrently running) at once, then you will have a performance issue in terms of memory usages. Keep the concurrent queue low as Fibers can be quite powerful in running stuff at the same time. You only have one core to process it all, that's a sad fact as node can't multi-core :(
Upvotes: 3