slippedon
slippedon

Reputation: 33

Add AllowDiskUse(true) to aggregation

I am getting a 'sort exceeded memory limit...' error which dictates using allowDiskUse(true) in my aggregate. My problem is that I can't quite figure out where to add that to my code. I have tried adding it as an object within the pipeline and as a property in the aggregate() method call and i get an error for both at runtime.

Code is below:

server.get('/cheap-flight-by-route', function (req, res, next) {

    Flights.aggregate(
        {$sort: {'fare.total_price': 1}},
        {$lookup: {
            from: 'travelroutes',
            localField: 'route',
            foreignField: '_id',
            as: 'routes'
        }},
        {$match: {
            'routes._id': {$exists: true}
        }},
        {$group: {
                _id: {
                    departureAirport: '$routes.departureAirport',
                    arrivalAirport: '$routes.arrivalAirport',
                },
                total_price: {$min: '$fare.total_price'},
                avg_price: {$avg: '$fare.total_price'},
                created: {$first: '$created'},
                doc: {$first: '$$ROOT'}
            }
        },
        {$project: {
            departureAirport: {$arrayElemAt: ['$_id.departureAirport', 0]},
            arrivalAirport: {$arrayElemAt: ['$_id.arrivalAirport', 0]},
            created : '$created',
            price: '$total_price',
            averagePrice: '$avg_price',
            'doc': 1,
            '_id': 0
        }},
        {$sort: {
            'created': 1,
            'departureAirport': 1,
            'arrivalAirport': 1
            },
        },
        function(err, cheapFlights){
            if (err) {
                log.error(err)
                return next(new errors.InvalidContentError(err.errors.name.message))
            }
            res.send(cheapFlights)
            next()
        }
    )  // <-- if I add a .allowDiskUse(true) here it throws a 'bad property' error
})

Upvotes: 2

Views: 23301

Answers (3)

jqIndy
jqIndy

Reputation: 444

MongoClient.connect("mongodb://localhost:27017/test", function(err, db) {
    // Get an aggregation cursor
    var cursor = db.collection('data').aggregate([
            {$match: {}}
        ], {
        allowDiskUse: true
      , cursor: {batchSize: 1000}
        });

    // Use cursor as stream
    cursor.on('data', function(data) {
        console.dir(data);
    });

    cursor.on('end', function() {
        db.close();
    });
});

Upvotes: 2

Yuriy Chihray
Yuriy Chihray

Reputation: 56

I make some changes in your code, try this:

server.get('/cheap-flight-by-route', function (req, res, next) {
    Flights.aggregate([
        {$sort: {
            'fare.total_price': 1
        } },
        {$lookup: {
            from: 'travelroutes',
            localField: 'route',
            foreignField: '_id',
            as: 'routes'
        } },
        {$match: {
            'routes._id': {$exists: true}
        } },
        {$group: {
            _id: {
                departureAirport: '$routes.departureAirport',
                arrivalAirport: '$routes.arrivalAirport',
            },
            total_price: {$min: '$fare.total_price'},
            avg_price: {$avg: '$fare.total_price'},
            created: {$first: '$created'},
            doc: {$first: '$$ROOT'}
        } },
        {$project: {
            departureAirport: {$arrayElemAt: ['$_id.departureAirport', 0]},
            arrivalAirport: {$arrayElemAt: ['$_id.arrivalAirport', 0]},
            created : '$created',
            price: '$total_price',
            averagePrice: '$avg_price',
            'doc': 1,
            '_id': 0
        } },
        {$sort: {
            'created': 1,
            'departureAirport': 1,
            'arrivalAirport': 1
        } }
    ],
    { 
        allowDiskUse: true
    },
    function (err, cheapFlights) {
        if (err) {
            log.error(err);
            return next(new errors.InvalidContentError(err.errors.name.message));
        }
        res.send(cheapFlights);
        next();
    });
});

Or you can try pipelines:

const JSONStream = require('JSONStream');
server.get('/cheap-flight-by-route', function (req, res) {
    let stream = Flights.aggregate([
        {$sort: {
            'fare.total_price': 1
        } },
        {$lookup: {
            from: 'travelroutes',
            localField: 'route',
            foreignField: '_id',
            as: 'routes'
        } },
        {$match: {
            'routes._id': {$exists: true}
        } },
        {$group: {
            _id: {
                departureAirport: '$routes.departureAirport',
                arrivalAirport: '$routes.arrivalAirport',
            },
            total_price: {$min: '$fare.total_price'},
            avg_price: {$avg: '$fare.total_price'},
            created: {$first: '$created'},
            doc: {$first: '$$ROOT'}
        } },
        {$project: {
            departureAirport: {$arrayElemAt: ['$_id.departureAirport', 0]},
            arrivalAirport: {$arrayElemAt: ['$_id.arrivalAirport', 0]},
            created : '$created',
            price: '$total_price',
            averagePrice: '$avg_price',
            'doc': 1,
            '_id': 0
        } },
        {$sort: {
            'created': 1,
            'departureAirport': 1,
            'arrivalAirport': 1
        } }
    ])
    .cursor()
    .exec();

    res.set('Content-Type', 'application/json');
    stream.pipe(JSONStream.stringify()).pipe(res);
});

Upvotes: 4

Yuriy Chihray
Yuriy Chihray

Reputation: 56

Try add after your function, as second aggregation parameter.

`
    function(err, cheapFlights){
        if (err) {
            log.error(err)
            return next(new errors.InvalidContentError(err.errors.name.message));
        }
        res.send(cheapFlights);
        next();
     }, 
     { allowDiskUse: true }
)
`

Upvotes: 0

Related Questions