Shiv Aggarwal
Shiv Aggarwal

Reputation: 499

mongodb aggregation subquery : Mongodb php adapter

I have below collection:

**S_SERVER**  –  **S_PORT**  –  **D_PORT**  –  **D_SERVER**  –  **MBYTES**
L0T410R84LDYL – 2481 – 139 – MRMCRUNCHAPP – 10
MRMCASTLE – 1904 – 445 – MRMCRUNCHAPP – 25
MRMXPSCRUNCH01 – 54769 – 445 – MRMCRUNCHAPP - 2
MRMCASTLE – 2254 – 139 – MRMCRUNCHAPP - 4
MRMCASTLE – 2253 – 445 – MRMCRUNCHAPP -35
MRMCASTLE – 987 – 445 – MRMCRUNCHAPP – 100
MRMCASTLE – 2447 – 445 – MRMCRUNCHAPP – 12
L0T410R84LDYL – 2481 – 139 – MRMCRUNCHAPP - 90
MRMCRUNCHAPP – 61191 – 1640 – OEMGCPDB – 10

Firstly, I need the top 30 S_SERVER as per total MBYTES transferred from each S_SERVER. This is I am able to get with following query :

$sourcePipeline = array(
        array(
            '$group' => array(
                '_id' => array('sourceServer' => '$S_SERVER'),
                'MBYTES' => array('$sum' => '$MBYTES')
            ),
        ),
        array(
            '$sort' => array("MBYTES" => -1),
        ),
        array(
            '$limit' => 30
        )
    );
$sourceServers = $collection->aggregate($sourcePipeline);

I also need Top 30 D_PORT as per total MBYTES transferred from each D_PORT for individual S_SERVER. I am doing this by running for loop from above servers results and getting them individually one by one for each S_SERVER.

$targetPortPipeline = array(
                array(
                    '$project' => array('S_SERVER' => '$S_SERVER', 'D_PORT' => '$D_PORT', 'MBYTES' => '$MBYTES')
                ),
                array(
                    '$match' => array('S_SERVER' => S_SERVER(find from above query, passed one by one in for loop)),
                ),
                array(
                    '$group' => array(
                        '_id' => array('D_PORT' => '$D_PORT'),
                        'MBYTES' => array('$sum' => '$MBYTES')
                    ),
                ),
                array(
                    '$sort' => array("MBYTES" => -1),
                ),
                array(
                    '$limit' => $limit
                )
            );
$targetPorts = $collection->aggregate($targetPortPipeline);

But this process is taking too much time. I need an efficient way to get required results in same query. I know i am using Mongodb php adapter to accomplish this. You can let me know the aggregation function in javascript format also. I will convert it into php.

Upvotes: 0

Views: 1094

Answers (1)

Neil Lunn
Neil Lunn

Reputation: 151092

You problem here essentially seems to be that you are issuing 30 more queries for your initial 30 results. There is no simple solution to this, and a single query seems right out at the moment, but there are a few things you can consider.

As an additional note, you are not alone in this as this is a question I have seen before, which we can refer to as a "top N results problem". Essentially what you really want is some way to combine the two result sets so that each grouping boundary (source server) itself only has a maximum N results, while at that top level you are also restricting those results again to the top N result values.

Your first aggregation query you the results for the top 30 "source servers" you want, and that is just fine. But rather than looping additional queries from this you could try creating an array with just the "source server" values from this result and passing that to your second query using the $in operator instead:

db.collection.aggregate([
    // Match should be first
    { "$match": { "S_SERVER": { "$in": sourceServers } } },

    // Group both keys
    { "$group": {
        "_id": {
            "S_SERVER": "$S_SERVER",
            "D_SERVER": "$D_SERVER"
        },
        "value": { "$sum": "$MBYTES" }
    }},

    // Sort in order of key and largest "MBYTES" 
    { "$sort": { "S_SERVER": 1, "value": -1 } }
])

Noting that you cannot "limit" here as this contains every "source server" from the initial match. You can also not "limit" on the grouping boundary, which is essentially what is missing from the aggregation framework to make this a two query result otherwise.

As this contains every "dest server" result and possibly beyond the "top 30" you would be processing the result in code and skipping returned results after the "top 30" are retrieved at each grouping (source server) level. Depending on how many results you have, this may or may not be the most practical solution.

Moving on where this is not so practical, you are then pretty much stuck with getting that output into another collection as interim step. If you have a MongoDB 2.6 version or above, this can be as simple as adding an $out pipeline stage at the end of the statement. For earlier versions, you can do the equivalent statement using mapReduce:

db.collection.mapReduce(
    function() {
        emit(
            {
                "S_SERVER": this["S_SERVER"],
                "D_SERVER": this["D_SERVER"]
            },
            this.MBYTES
        );
    },
    function(key,values) {
        return Array.sum( values );
    },
    {
        "query": { "S_SERVER": { "$in": sourceServers } },
        "out": { "replace": "output" }
    }
)

That is essentially the same process as the previous aggregation statement, while also noting that mapReduce does not sort the output. That is what is covered by an additional mapReduce operation on the resulting collection:

db.output.mapReduce(
    function() {

        if ( cServer != this._id["S_SERVER"] ) {
            cServer = this._id["S_SERVER"];
            counter = 0;
        }

        if ( counter < 30 )
            emit( this._id, this.value );

        counter++;
    },
    function(){},  // reducer is not actually called
    { 
        "sort": { "_id.S_SERVER": 1, "value": -1 },
        "scope": { "cServer": "", "counter": 0 }
    }
)

The implementation here is the "server side" version of the "cursor skipping" that was mentioned earlier. So you are still processing every result, but the returned results over the wire are limited to the top 30 results under each "source server".

As stated, still pretty horrible in that this must scan "programmatically" through the results to discard the ones you do not want, and depending again on your volume, you might be better of simply issuing a a .find() for each "source server" value in these results while sorting and limiting the results

sourceServers.forEach(function(source) {
    var cur = db.output.find({ "_id.S_SERVER": source })
        .sort({ "value": -1 }).limit(30);
    // do something with those results
);

And that is still 30 additional queries but at least you are not "aggregating" every time as that work is already done.

As a final note that is actually too much detail to go into in full, you could possibly approach this using a modified form of the initial aggregation query that was shown. This comes more as a footnote for if you have read this far without the other approaches seeming reasonable then this is likely the worst fit due to the memory constraints this is likely to hit.

The best way to introduce this is with the "ideal" case for a "top N results" aggregation, which of course does not actually exist but ideally the end of the pipeline would look something like this:

    { "$group": {
        "_id": "$S_SERVER",
        "results": { 
            "$push": {
                "D_SERVER": "$_id.D_SERVER",
                "MBYTES": "$value"
            }, 
            "$limit": 30
        }
    }}

So the "non-existing" factor here is the ability to "limit" the number of results that were "pushed" into the resulting array for each "source server" value. The world would certainly be a better place if this or similar functionality were implemented as it makes solving problems such as this quite easy.

Since it does not exist, you are left with resorting to other methods to get the same result and end up with listings like this example, except actually a lot more complex in this case.

Considering that code you would be doing something along the lines of:

  1. Group all the results back into an array per server
  2. Group all of that back into a single document
  3. Unwind the first server results
  4. Get the first entry and group back again.
  5. Unwind the results again
  6. Project and match the found entry
  7. Discard the matching result
  8. Rinse and repeat steps 4 - 7 30 times
  9. Store back the first server document of 30 results
  10. Rinse and repeat for 3 - 9 for each server, so 30 times

You would never actually code this directly, and would have to code generate the pipeline stages. It is very likely to blow up the 16MB limit, probably not on the pipeline document itself but very likely to do so on the actual result set as you are pushing everything into arrays.

You might also note how easy this would be to blow up completely if your actual results did not contain at least 30 top values for each server.

The whole scenario comes down to a trade-off on which method suits your data and performance considerations the best:

  1. Live with 30 aggregation queries from your initial result.
  2. Reduce to 2 queries and discard unwanted results in client code.
  3. Output to a temporary collection and use a server cursor skip to discard results.
  4. Issue the 30 queries from a pre-aggregated collection.
  5. Actually go to the trouble of implementing a pipeline that produces the results.

In any case, due to the complexities of aggregating this in general, I would go for producing your result set periodically and storing it in it's own collection rather that trying to do this in real time.

The data would not be the "latest" result and only as fresh as how often you update it. But actually retrieving those results for display becomes a simple query, returning at maximum 900 fairly compact results without the overhead of aggregating for every request.

Upvotes: 1

Related Questions