nimeshkiranverma
nimeshkiranverma

Reputation: 1428

MapReduce in PyMongo

My Mongo collection : Impressions has docs in the following format:-

   {
        _uid: 10,
        "impressions": [
            {
                "pos": 6,
                "id": 123,
                "service": "furniture"
            },
            {
                "pos": 0,
                "id": 128,
                "service": "electronics"
            },
            {
                "pos": 2,
                "id": 127,
                "service": "furniture"
            },
            {
                "pos": 2,
                "id": 125,
                "service": "electronics"
            },
            {
                "pos": 10,
                "id": 124,
                "service": "electronics"
            }
        ]
      },
     {
        _uid: 11,
        "impressions": [
            {
                "pos": 1,
                "id": 124,
                "service": "furniture"
            },
            {
                "pos": 10,
                "id": 124,
                "service": "electronics"
            },
            {
                "pos": 1,
                "id": 123,
                "service": "furniture"
            },
            {
                "pos": 21,
                "id": 122,
                "service": "furniture"
            },
            {
                "pos": 3,
                "id": 125,
                "service": "electronics"
            },
            {
                "pos": 10,
                "id": 121,
                "service": "electronics"
            }
            ]
         },
            .
            .
            .
            .
            .

Each of the doc in the collection has "impressions" key which is an array of dictionaries. In each dictionary "id" is the id of the entity, "service" is the service type and "pos"is the position of the item in the search page results. My aim is to find out the count of number of impressions for every "id" in each category. So for the above data for "service" == "furniture", I want to have this as my aggregation results:-

[
{"id": 123,"impressions_count":2},
{"id": 127,"impressions_count":1},
{"id": 124,"impressions_count":1},
{"id": 122,"impressions_count":1}
]

I tried to aggregate on the "id" using MAPREDUCE via following function in a python script

def fetch_impressions():
    try:
        imp_collection = get_mongo_connection('Impressions')
        map = Code("""
                function(){
                    for( x in this.impressions){
                        var flat_id = x['id'];
                        var service_type = x['service']
                        emit(parseInt(flat_id),1);
                        }
                    };
                """)

                        """)
        reduce = Code("""
                        function(a,b){
                            return Array.sum(b);
                            };
                        """)

        results = imp_collection.map_reduce(map, reduce, 'aggregation_result')
        return results
    except Exception as e:
        raise Exception(e)

But I'm getting the results as None, probably because of the faulty map function.I'm new to Javascript and Mongo kindly help!

Upvotes: 3

Views: 8214

Answers (2)

I made a tool that lets you run MongoDB Map/Reduce in Python

https://mreduce.com

import random
import threading

import bson
import pymongo

import mreduce


mongo_client = pymongo.MongoClient("mongodb://your_mongodb_server")

def map_func(document):
    for impression in document["impressions"]:
        yield document["id"], 1

def reduce_func(id, prices):
    return sum(prices)

worker_functions = {
    "exampleMap": map_func,
    "exampleReduce": reduce_func
}

api = mreduce.API(
    api_key = "...",
    mongo_client = mongo_client
)

project_id = "..."

thread = threading.Thread(
    target=api.run,
    args=[project_id, worker_functions]
)
thread.start()

job = api.submit_job(
    projectId=project["_id"],
    mapFunctionName="exampleMap",
    reduceFunctionName="exampleReduce",
    inputDatabase="db",
    inputCollection="impressions",
    outputDatabase="db",
    outputCollection="impressions_results"
)
result = job.wait_for_result()
for key, value in result:
    print("Key: " + key, ", Value: " + str(value))

Upvotes: 1

Sede
Sede

Reputation: 61225

You can use the aggregation framework

import pymongo
conn = pymongo.MongoClient()
db = conn.test
col =  db.collection

for doc in col.aggregate([{'$unwind': '$impressions'}, 
    {'$match': {'impressions.service': 'furniture'}}, 
    {'$group': {'_id': '$impressions.id', 'impressions_count': {'$sum': 1}}}, 
    ]):
    print(doc)

Or more efficiently using the $map and the $setDifference operators.

col.aggregate([
    { "$project": { "impressions": {"$setDifference": [{ "$map": { "input": "$impressions", "as": "imp", "in": { "$cond": { "if": { "$eq": [ "$$imp.service", "furniture" ] }, "then": "$$imp.id", "else": 0 }}}}, [0]]}}}, 
    { "$unwind": "$impressions" }, 
    { "$group": { "_id": "$impressions", "impressions_count": { "$sum": 1 }}}
])

Which yields:

{'_id': 122.0, 'impressions_count': 1}
{'_id': 124.0, 'impressions_count': 1}
{'_id': 127.0, 'impressions_count': 1}
{'_id': 123.0, 'impressions_count': 2}

Upvotes: 3

Related Questions