Reputation: 148
I a currently querying my mondo db for an array of urls in one collection which returns an array. I then want to use that array to go through another collection and find the matching elements for each element in the previous query's returned array. Is it proper to use forEach on the array and do individual queries? My code looks like such, the first function getUrls works great. The current error I get is:
(node:10754) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): TypeError: Cannot read property 'limit' of undefined (node:10754) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
async function useUrls () {
let domains = await getUrls()
let db = await mongo.connect("mongodb://35.185.206.31:80/lc_data")
let results = []
domains.forEach( domain =>{
let query = {"$match":
{"email_domain": domain}
}
let cursor = db.collection('circleback')
.aggregate([query], (err, data) =>{
if(err)
throw err;
console.log("cb", data)
}).limit(1100)
})
Upvotes: 1
Views: 1192
Reputation: 151072
As noted, the code in the question has a few problems, most of which can be addressed by looking at the full sample listing as supplied at the end of this response. What you are essentially asking for here is a variation on the "Top-N results" problem, for which there are a couple of ways to "practically" handle this.
So somewhat ranking from "worst" to "best":
So rather than "loop" your results of your function, you can alternately supply all the results to a query using $in
. That alleviates the need to "loop inputs", but the other thing needed here is the "top-N per output".
There really is not a "stable" mechanism in MongoDB for this as yet, but "if" it is plausible on the size of given collections then you can in fact simply $group
on your "distinct" keys matching the provided $in
arguments, and then $push
all documents into an array and $slice
the results:
let results = await db.collection('circleback').aggregate([
{ "$match": { "email_domain": { "$in": domains } } },
{ "$group": {
"_id": "$email_domain",
"docs": { "$push": "$$ROOT" }
}},
{ "$sort": { "_id": 1 } },
{ "$addFields": { "docs": { "$slice": [ "$docs", 0, 1100 ] } } }
]).toArray();
The "wider" issue here is that MongoDB has no way of "limiting" the array content on the initial $push
. And this in fact is awaiting a long outstanding issue. SERVER-9377.
So whilst we can do this sort of operation "in theory", it often is not practical at all since the 16MB BSON Limit often restricts that "initial" array size, even if the $slice
result would indeed stay under that cap.
Your code shows you are running under this environment, so I suggest you actually use it. Simply await
on each loop iteration from the source:
let results = [];
for ( let domain of domains ) {
results = results.concat(
await db.collection('circleback').find({ "email_domain": domain })
.limit(1100).toArray()
);
}
Simply functions allow you to do this, such as returning the standard cursor result of .find()
as an array via .toArray()
and then using .concat()
to join with previous arrays of results.
It's simple and effective, but we can probably do a little better
So instead of using a "loop" and await
on each called async function, you can instead execute them all ( or at least "most" ) concurrently instead. This is in fact part of the problem you presently have as presented in the question, because nothing actually "waits" for the loop iteration.
We could use Promise.all()
to effectively do this, however if it is actually a "very large" number of promises that would be running concurrently, this would run into the same problem as experienced, where the call stack is exceeded.
To avoid this, yet still have the benefits we can use Bluebird promises with Promise.map()
. This has a "concurrent limiter" option, that allows only a specified number of operations to act simultaneously:
let results = [].concat.apply([],
await Promise.map(domains, domain =>
db.collection('circleback').find({ "email_domain": domain })
.limit(1100).toArray()
,{ concurrency: 10 })
);
In fact you should even be able to use a library such as Bluebird promises to "plugin" the .map()
functionality to anything else that returns a Promise
, such as your "source" function returning the list of "domains"
. Then you could "chain" just as is shown in the later examples.
Future releases of MongoDB ( from MongoDB 3.6 ) actually have a new "Non-Correlated" form of $lookup
that allows a special case here. So going back to the original aggregation example, we can get the "distinct" values for each matching key, and then $lookup
with a "pipeline"
argument which would then allow a $limit
to be applied on results.
let results = await db.collection('circleback').aggregate([
{ "$match": { "email_domain": { "$in": domains } } },
{ "$group": { "_id": "$email_domain" }},
{ "$sort": { "_id": 1 } },
{ "$lookup": {
"from": "circleback",
"let": {
"domain": "$_id"
},
"pipeline": [
{ "$redact": {
"$cond": {
"if": { "$eq": [ "$email_domain", "$$domain" ] },
"then": "$$KEEP",
"else": "$$PRUNE"
}
}},
{ "$limit": 1100 }
],
"as": "docs"
}}
]).toArray();
This would then always stay under the 16MB BSON limit, presuming of course that the argument to $in
allowed that to be the case.
As a full Example Listing you can run, and generally play with as the default data set creation is intentionally quite large. It demonstrates all techniques described above as well as some general usage patterns to follow.
const mongodb = require('mongodb'),
Promise = require('bluebird'),
MongoClient = mongodb.MongoClient,
Logger = mongodb.Logger;
const uri = 'mongodb://localhost/bigpara';
function log(data) {
console.log(JSON.stringify(data,undefined,2))
}
(async function() {
let db;
try {
db = await MongoClient.connect(uri,{ promiseLibrary: Promise });
Logger.setLevel('info');
let source = db.collection('source');
let data = db.collection('data');
// Clean collections
await Promise.all(
[source,data].map( coll => coll.remove({}) )
);
// Create some data to work with
await source.insertMany(
Array.apply([],Array(500)).map((e,i) => ({ item: i+1 }))
);
let ops = [];
for (let i=1; i <= 10000; i++) {
ops.push({
item: Math.floor(Math.random() * 500) + 1,
index: i,
amount: Math.floor(Math.random() * (200 - 100 + 1)) + 100
});
if ( i % 1000 === 0 ) {
await data.insertMany(ops,{ ordered: false });
ops = [];
}
}
/* Fetch 5 and 5 example
*
* Note that the async method to supply to $in is a simulation
* of any real source that is returning an array
*
* Not the best since it means ALL documents go into the array
* for the selection. Then you $slice off only what you need.
*/
console.log('\nAggregate $in Example');
await (async function(source,data) {
let results = await data.aggregate([
{ "$match": {
"item": {
"$in": (await source.find().limit(5).toArray()).map(d => d.item)
}
}},
{ "$group": {
"_id": "$item",
"docs": { "$push": "$$ROOT" }
}},
{ "$addFields": {
"docs": { "$slice": [ "$docs", 0, 5 ] }
}},
{ "$sort": { "_id": 1 } }
]).toArray();
log(results);
})(source,data);
/*
* Fetch 10 by 2 example
*
* Much better usage of concurrent processes and only get's
* what is needed. But it is actually 1 request per item
*/
console.log('\nPromise.map concurrency example');
await (async function(source,data) {
let results = [].concat.apply([],
await source.find().limit(10).toArray().map(d =>
data.find({ item: d.item }).limit(2).toArray()
,{ concurrency: 5 })
);
log(results);
})(source,data);
/*
* Plain loop async/await serial example
*
* Still one request per item, requests are serial
* and therefore take longer to complete than concurrent
*/
console.log('\nasync/await serial loop');
await (async function(source,data) {
let items = (await source.find().limit(10).toArray());
let results = [];
for ( item of items ) {
results = results.concat(
await data.find({ item: item.item }).limit(2).toArray()
);
}
log(results);
})(source,data);
/*
* Non-Correlated $lookup example
*
* Uses aggregate to get the "distinct" matching results and then does
* a $lookup operation to retrive the matching documents to the
* specified $limit
*
* Typically not as efficient as the concurrent example, but does
* actually run completely on the server, and does not require
* additional connections.
*
*/
let version = (await db.db('admin').command({'buildinfo': 1})).version;
if ( version >= "3.5" ) {
console.log('\nNon-Correlated $lookup example $limit')
await (async function(source,data) {
let items = (await source.find().limit(5).toArray()).map(d => d.item);
let results = await data.aggregate([
{ "$match": { "item": { "$in": items } } },
{ "$group": { "_id": "$item" } },
{ "$sort": { "_id": 1 } },
{ "$lookup": {
"from": "data",
"let": {
"itemId": "$_id",
},
"pipeline": [
{ "$redact": {
"$cond": {
"if": { "$eq": [ "$item", "$$itemId" ] },
"then": "$$KEEP",
"else": "$$PRUNE"
}
}},
{ "$limit": 5 }
],
"as": "docs"
}}
]).toArray();
log(results);
})(source,data);
} else {
console.log('\nSkipped Non-Correlated $lookup demo');
}
} catch(e) {
console.error(e);
} finally {
db.close();
}
})();
Upvotes: 1