Kr0e
Kr0e

Reputation: 2239

Aggregate and Reduce Nested Documents and Arrays

EDIT: Our use case: We get continues reports from servers about visitors. We pre-aggregate the data on the servers for a few seconds aber after that insert these "reports" into MongoDB.

In our dashboard we would like to query the different browsers, OSes, geolocation (country etc.) based on time ranges.

So like: Within the last 7 days, there were 1000 visitors using Chrome, 500 from Germany, 200 from England and so on.

I'm pretty stuck with a MongoDB query we need for our dashboard.

We have following report entries:

{
    "_id" : ObjectId("59b9d08e402025326e1a0f30"),
    "channel_perm_id" : "c361049fb4144b0e81b71c0b6cfdc296",
    "source_id" : "insomnia",
    "start_timestamp" : ISODate("2017-09-14T00:42:54.510Z"),
    "end_timestamp" : ISODate("2017-09-14T00:42:54.510Z"),
    "timestamp" : ISODate("2017-09-14T00:42:54.510Z"),
    "resource_uri" : "b755d62a-8c0a-4e8a-945f-41782c13535b",
    "sources_info" : {
        "browsers" : [
            {
                "name" : "Chrome",
                "count" : NumberLong(2)
            }
        ],
        "operating_systems" : [
            {
                "name" : "Mac OS X",
                "count" : NumberLong(2)
            }
        ],
        "continent_ids" : [
            {
                "name" : "EU",
                "count" : NumberLong(1)
            }
        ],
        "country_ids" : [
            {
                "name" : "DE",
                "count" : NumberLong(1)
            }
        ],
        "city_ids" : [
            {
                "name" : "Solingen",
                "count" : NumberLong(1)
            }
        ]
    },
    "unique_sources" : NumberLong(1),
    "requests" : NumberLong(1),
    "cache_hits" : NumberLong(0),
    "cache_misses" : NumberLong(1),
    "cache_hit_size" : NumberLong(0),
    "cache_refill_size" : NumberLong("170000000000")
}

Now, we need to aggregate these reports based on timestamp. So far, so easy:

db.channel_report.aggregate([{
  $group: {
    _id: {
      $dateToString: {
        format: "%Y",
        date: "$timestamp"
      }
    },
    sources_info: {
      $push: "$sources_info"
    }
  },
}];

But now it gets difficult for me. As you might already noticed, the sources_info object is the problem.

Instead of just "pushing" all sources info into array per group, we need to actually accumulate it.

So, if we have something like this:

{
  sources_info: [
    {
      browsers: [
        {
          name: "Chrome, 
          count: 1
        }
      ]
    },
    {
      browsers: [
        {
          name: "Chrome, 
          count: 1
        }
      ]
    }
  ]
}

The array should be reduced to this:

{
  sources_info:
    {
      browsers: [
        {
          name: "Chrome, 
          count: 2
        }
      ]
    }
}

We migrated from MySQL to MongoDB for analytics, but I have no clue how to model this behaviour in Mongo. Regarding the docs I almost think it is not possible, at least not with the current data structure.

Is there a nice solution for this? Or maybe even a different kind of data structure?

Cheers, Chris from StriveCDN

Upvotes: 2

Views: 6613

Answers (1)

Neil Lunn
Neil Lunn

Reputation: 151092

The basic problem you have is that you are using "named keys" where you probably really should be instead using values to a consistent attribute path. This means instead of keys like "browsers", this probably should simply be "type": "browser" and so on on each entry.

The reasoning for this should become apparent on the general approaches to aggregating the data. It also really helps in querying in general. But the approaches basically involve coercing your initial data format into this kind of structure in order to aggregate it first.

With most recent releases ( MongoDB 3.4.4 and greater ), we can work with your named keys via $objectToArray and manipulate as follows:

db.channel_report.aggregate([
  { "$project": {
    "timestamp": 1,
    "sources": {
      "$reduce": {
        "input": {
          "$map": {
            "input": { "$objectToArray": "$sources_info" },
            "as": "s",
            "in": {
              "$map": {
                "input": "$$s.v",
                "as": "v",
                "in": {
                  "type": "$$s.k",
                  "name": "$$v.name",
                  "count": "$$v.count"    
                }
              }
            }
          }     
        },
        "initialValue": [],
        "in": { "$concatArrays": ["$$value", "$$this"] }
      }
    }
  }},
  { "$unwind": "$sources" },
  { "$group": {
    "_id": { 
      "year": { "$year": "$timestamp" },
      "type": "$sources.type",
      "name": "$sources.name"
    },
    "count": { "$sum": "$sources.count" }
  }},
  { "$group": {
    "_id": { "year": "$_id.year", "type": "$_id.type" },
    "v": { "$push": { "name": "$_id.name", "count": "$count" } }  
  }},
  { "$group": {
    "_id": "$_id.year",
    "sources_info": {
      "$push": { "k": "$_id.type", "v": "$v" }  
    }  
  }},
  { "$addFields": {
    "sources_info": { "$arrayToObject": "$sources_info" }  
  }}
])

Taking that back a notch to MongoDB 3.4 ( which should be default on most hosted services by now ) you could alternately declare each key name manually:

db.channel_report.aggregate([
  { "$project": {
    "timestamp": 1,
    "sources": {
      "$concatArrays": [
        { "$map": {
          "input": "$sources_info.browsers",
          "in": {
            "type": "browsers",
            "name": "$$this.name",
            "count": "$$this.count"  
          }  
        }},
        { "$map": {
          "input": "$sources_info.operating_systems",
          "in": {
            "type": "operating_systems",
            "name": "$$this.name",
            "count": "$$this.count"  
          }  
        }},
        { "$map": {
          "input": "$sources_info.continent_ids",
          "in": {
            "type": "continent_ids",
            "name": "$$this.name",
            "count": "$$this.count"  
          }  
        }},
        { "$map": {
          "input": "$sources_info.country_ids",
          "in": {
            "type": "country_ids",
            "name": "$$this.name",
            "count": "$$this.count"  
          }  
        }},
        { "$map": {
          "input": "$sources_info.city_ids",
          "in": {
            "type": "city_ids",
            "name": "$$this.name",
            "count": "$$this.count"  
          }  
        }}
      ]  
    }  
  }},
  { "$unwind": "$sources" },
  { "$group": {
    "_id": { 
      "year": { "$year": "$timestamp" },
      "type": "$sources.type",
      "name": "$sources.name"
    },
    "count": { "$sum": "$sources.count" }
  }},
  { "$group": {
    "_id": { "year": "$_id.year", "type": "$_id.type" },
    "v": { "$push": { "name": "$_id.name", "count": "$count" } }  
  }},
  { "$group": {
    "_id": "$_id.year",
    "sources": {
      "$push": { "k": "$_id.type", "v": "$v" }  
    }  
  }},
  { "$project": {
    "sources_info": {
      "browsers": {
        "$arrayElemAt": [
          "$sources.v",
          { "$indexOfArray": [ "$sources.k", "browsers" ] }
        ]    
      },
      "operating_systems": {
        "$arrayElemAt": [
          "$sources.v",
          { "$indexOfArray": [ "$sources.k", "operating_systems" ] }
        ]    
      },
      "continent_ids": {
        "$arrayElemAt": [
          "$sources.v",
          { "$indexOfArray": [ "$sources.k", "continent_ids" ] }
        ]    
      },
      "country_ids": {
        "$arrayElemAt": [
          "$sources.v",
          { "$indexOfArray": [ "$sources.k", "country_ids" ] }
        ]    
      },
      "city_ids": {
        "$arrayElemAt": [
          "$sources.v",
          { "$indexOfArray": [ "$sources.k", "city_ids" ] }
        ]    
      }
    }    
  }}
])

We can even wind that back to MongoDB 3.2 by using $map and $filter in place of $indexOfArray, but the general approach is the main thing to explain.

Concatenate arrays

The main thing that needs to happen is to take the data from the many different arrays with named keys and make a "single array" with a "type" property representing each key name. This is arguably how the data should be stored in the first place, and the first aggregation stage of either approach comes out like this:

/* 1 */
{
    "_id" : ObjectId("59b9d08e402025326e1a0f30"),
    "timestamp" : ISODate("2017-09-14T00:42:54.510Z"),
    "sources" : [ 
        {
            "type" : "browsers",
            "name" : "Chrome",
            "count" : NumberLong(2)
        }, 
        {
            "type" : "operating_systems",
            "name" : "Mac OS X",
            "count" : NumberLong(2)
        }, 
        {
            "type" : "continent_ids",
            "name" : "EU",
            "count" : NumberLong(1)
        }, 
        {
            "type" : "country_ids",
            "name" : "DE",
            "count" : NumberLong(1)
        }, 
        {
            "type" : "city_ids",
            "name" : "Solingen",
            "count" : NumberLong(1)
        }
    ]
}

Unwind and Group

Part of the data you want to accumulate on actually includes those "type" and "name" properties from "within" the array. Whenever you need to accumulate across documents from "within an array", the process you use is $unwind in order to be able to access those values as part of the grouping key.

What this means is that after using $unwind on the combined array, you then want to $group on both of those keys and the reduced "timestamp" detail in order to $sum the "count" values.

Since you then have "sub-levels" of detail ( i.e each name of browser within browsers ) then you use additional $group pipeline stages, gradually decreasing the granularity of the grouping keys and using $push to accumulate the details into arrays.

In either case, omitting the very last stage of output the accumulated structure comes out as:

/* 1 */
{
    "_id" : 2017,
    "sources_info" : [ 
        {
            "k" : "continent_ids",
            "v" : [ 
                {
                    "name" : "EU",
                    "count" : NumberLong(1)
                }
            ]
        }, 
        {
            "k" : "city_ids",
            "v" : [ 
                {
                    "name" : "Solingen",
                    "count" : NumberLong(1)
                }
            ]
        }, 
        {
            "k" : "country_ids",
            "v" : [ 
                {
                    "name" : "DE",
                    "count" : NumberLong(1)
                }
            ]
        }, 
        {
            "k" : "browsers",
            "v" : [ 
                {
                    "name" : "Chrome",
                    "count" : NumberLong(2)
                }
            ]
        }, 
        {
            "k" : "operating_systems",
            "v" : [ 
                {
                    "name" : "Mac OS X",
                    "count" : NumberLong(2)
                }
            ]
        }
    ]
}

This really is the final state of the data, though not represented in the same form as it was originally found. It is arguably complete at this point as any further processing is merely cosmetic to output as named keys again.

Output to named keys

As shown the varied approaches are either looking up the array entries by the matching key name, or by using $arrayToObject to transform the array content back into an object with named keys.

An alternate is also to simply do that very last manipulation in code, as shown by this .map() example of manipulating the cursor result in the shell:

db.channel_report.aggregate([
  { "$project": {
    "timestamp": 1,
    "sources": {
      "$reduce": {
        "input": {
          "$map": {
            "input": { "$objectToArray": "$sources_info" },
            "as": "s",
            "in": {
              "$map": {
                "input": "$$s.v",
                "as": "v",
                "in": {
                  "type": "$$s.k",
                  "name": "$$v.name",
                  "count": "$$v.count"    
                }
              }
            }
          }     
        },
        "initialValue": [],
        "in": { "$concatArrays": ["$$value", "$$this"] }
      }
    }
  }},
  { "$unwind": "$sources" },
  { "$group": {
    "_id": { 
      "year": { "$year": "$timestamp" },
      "type": "$sources.type",
      "name": "$sources.name"
    },
    "count": { "$sum": "$sources.count" }
  }},
  { "$group": {
    "_id": { "year": "$_id.year", "type": "$_id.type" },
    "v": { "$push": { "name": "$_id.name", "count": "$count" } }  
  }},
  { "$group": {
    "_id": "$_id.year",
    "sources_info": {
      "$push": { "k": "$_id.type", "v": "$v" }  
    }  
  }},
  /*
  { "$addFields": {
    "sources_info": { "$arrayToObject": "$sources_info" }  
  }}
  */
]).map( d => Object.assign(d,{
  "sources_info": d.sources_info.reduce((acc,curr) =>
    Object.assign(acc,{ [curr.k]: curr.v }),{})
}))

Which of course applies to either aggregation pipeline approach.

And of course even $concatArrays can be replaced with $setUnion as long as all the entries have a unique identifying combination of "name" and "type" ( as they appear to be ), and that means with application of modifying the final output by processing the cursor instead you can apply the technique even as far back as MongoDB 2.6.

Final Output

And the final output ( actually aggregated of course, but the question only samples one document ) accumulates for all the sub-keys and reconstructs from the last sample output as shown as:

{
    "_id" : 2017,
    "sources_info" : {
        "continent_ids" : [ 
            {
                "name" : "EU",
                "count" : NumberLong(1)
            }
        ],
        "city_ids" : [ 
            {
                "name" : "Solingen",
                "count" : NumberLong(1)
            }
        ],
        "country_ids" : [ 
            {
                "name" : "DE",
                "count" : NumberLong(1)
            }
        ],
        "browsers" : [ 
            {
                "name" : "Chrome",
                "count" : NumberLong(2)
            }
        ],
        "operating_systems" : [ 
            {
                "name" : "Mac OS X",
                "count" : NumberLong(2)
            }
        ]
    }
}

Where every array entry under each key of sources_info is reduced down to it's cumulative count for every other entry sharing the same "name".

Upvotes: 12

Related Questions