Reputation: 471
I'm having issues with nodeJS streaming. The issue I have is in one of my streams I send a request, using the request module (which is asynchronous) and by the time the request gets a response, the whole stream finishes.
Here is the code
var fs = require('fs');
var csv = require('fast-csv');
var path = require('path');
var request = require('request');
var JSONStream = require('JSONStream');
var locations = [];
var urls = [];
var MAP_QUEST_KEY = 'mykey';
var MAP_QUEST_URL = 'http://www.mapquestapi.com/geocoding/v1/address?key=' + MAP_QUEST_KEY + '&inFormat=json&json=';
var CSV_FILE_NAME = 'smaller_stores.csv';
var inFile = path.join(__dirname, 'input', CSV_FILE_NAME);
var outFile = path.join(__dirname, 'output', CSV_FILE_NAME);
// create a location object
var createLocationJSONStream = function (data) {
var location = {
street: data.street_no + ' ' + data.street_name,
city: data.city_name,
state: data.state_id
};
var url = MAP_QUEST_URL + JSON.stringify(location);
// this request finishes after stream
request({
url: 'url',
method: 'GET',
json: location
}, function (error, response, body) {
if (error) {
console.log(error);
} else {
data.lat = body.results[0].locations[0].latLng.lat;
data.lng = body.results[0].locations[0].latLng.lng;
}
return data;
});
};
var readFileStream = fs.createReadStream(inFile);
var writeFileStream = fs.createWriteStream(outFile);
var parseCsvStream = csv.parse({
trim: true,
headers: true,
objectMode: false
});
var deserializeJSONStream = JSONStream.parse();
var streamingEvent = readFileStream.pipe(parseCsvStream).transform(createLocationJSONStream).pipe(deserializeJSONStream);
// eventually want to write out to csv
//.pipe(csv.createWriteStream({headers: true}))
//.pipe(fs.createWriteStream(outFile, {encoding: 'utf8'}));
streamingEvent.on('data', function (data) {
console.log(JSON.stringify(data));
});
streamingEvent.on('end', function () {
console.log('end');
});
Here is the output:
end
200
application/json; charset=utf-8
200
application/json; charset=utf-8
200
application/json; charset=utf-8
Here is the example csv:
region_id region_name street_no street_name sitetype_id country_name timezone_no geocode_id city_name state_id zip_code county_name
3 West 350 ORCHARD AVE N USA 4 954824536 UKIAH CA 95482-4536 MENDOCINO
1 South 1000 4TH AVE N USA 7 333041903 FORT LAUDERDALE FL 33304-1903 BROWARD
0 Northeast 1370 HURFVILLE RD N USA 7 80963818 DEPTFORD NJ 08096-3818 GLOUCESTER
I know I am handling the streams wrong, but dont know what is the easiest way to fix it? Pipe to the request?
Upvotes: 0
Views: 202
Reputation: 708046
If you want the async request()
operation to finish before you start your stream code, then you need to put the stream code INSIDE the completion callback for your request. That way, you don't start your stream code until after you've received the data from your request()
operation.
In addition, you can't return a value form an async operation like you were trying to do return data;
. Returning data from a callback just goes back into the callback infrastructure. The outer function has long since already returned and subsequent lines of code after the outer function have already been executed. Instead, you must consume the result INSIDE the callback itself or make a function call from within that callback and pass the data to that other function.
I don't follow exactly what you're trying to do with the streams, but here's where they would go if you want to make sure the request()
operation is done before you do your streams:
var fs = require('fs');
var csv = require('fast-csv');
var path = require('path');
var request = require('request');
var JSONStream = require('JSONStream');
var locations = [];
var urls = [];
var MAP_QUEST_KEY = 'mykey';
var MAP_QUEST_URL = 'http://www.mapquestapi.com/geocoding/v1/address?key=' + MAP_QUEST_KEY + '&inFormat=json&json=';
var CSV_FILE_NAME = 'smaller_stores.csv';
var inFile = path.join(__dirname, 'input', CSV_FILE_NAME);
var outFile = path.join(__dirname, 'output', CSV_FILE_NAME);
// create a location object
var createLocationJSONStream = function (data) {
var location = {
street: data.street_no + ' ' + data.street_name,
city: data.city_name,
state: data.state_id
};
var url = MAP_QUEST_URL + JSON.stringify(location);
// this request finishes after stream
request({
url: 'url',
method: 'GET',
json: location
}, function (error, response, body) {
if (error) {
console.log(error);
} else {
// get the latLng position so we can use it in our streaming
var position = body.results[0].locations[0].latLng;
var readFileStream = fs.createReadStream(inFile);
var writeFileStream = fs.createWriteStream(outFile);
var parseCsvStream = csv.parse({
trim: true,
headers: true,
objectMode: false
});
var deserializeJSONStream = JSONStream.parse();
var streamingEvent = readFileStream.pipe(parseCsvStream).transform(createLocationJSONStream).pipe(deserializeJSONStream);
// eventually want to write out to csv
//.pipe(csv.createWriteStream({headers: true}))
//.pipe(fs.createWriteStream(outFile, {encoding: 'utf8'}));
streamingEvent.on('data', function (data) {
console.log(JSON.stringify(data));
});
streamingEvent.on('end', function () {
console.log('end');
});
}
});
};
The latLng coordinates are now available in the position
variable anywhere in your streaming code and your streaming operation does not start until that data is ready.
FYI, I've found that any code involving multiple async operations is ultimately easier to write, understand and do robust error handling if you use promises instead of plain callbacks. This requires learning how promises work and often involves "promisifying" some operations (turning regular async operations into operations that return a promise). So, for the longer term, I'd suggest you start learning promises. I use the Bluebird promise library for my node.js development as it has lots of helpful features.
Upvotes: 1