Glenn Karl Guiyab
Glenn Karl Guiyab

Reputation: 55

Pymongo Aggregate with multiple conditions: lookup, unwind, redact, cond, sort and limit

done_status = ['BAD_PU', 'TO_WH', 'RCVDPORT', 'RCVD', 'BAD_DEL', 'MISSFLT', 'OFFLOAD']

shipments = db.db_shipment.aggregate([{
    "$lookup":{
        "from":"db_shipment_status_history",
        "localField":"_id",
        "foreignField":"fk_shipment_id",
        "as":"shipment_status_history_collection"
        }
    },
    {"$unwind":
        "$shipment_status_history_collection"},
        {"$redact":{"$cond":{ "$if": { "status_value": {"$in": done_status } } },
                "$then": "$$KEEP"
                ,"$else":"$$PRUNE"
                }
            },
        {"$sort":
            {'shipment_status_history_collection.rec_timestamp':-1}},
        {"$limit":1},
        {"$project":{"pkey":"$pkey","_code":"$_code"}}
    ])

error:

pymongo.errors.OperationFailure: An object representing an expression must have exactly one field: { $cond: { $if: { status_value: { $in: [ "BAD_PU", "TO_WH", "RCVDPORT", "RCVD", "BAD_DEL", "MISSFLT", "OFFLOAD" ] } } }, $else: "$$PRUNE", $then: "$$KEEP" }

how to fix this error? Im trying to add the latest shipment status history in the shipment record where the status value is in the given status value.

Upvotes: 3

Views: 1767

Answers (1)

Oluwafemi Sule
Oluwafemi Sule

Reputation: 38922

Update the redact stage for your aggregation pipeline. if, then and else are a part of the $cond operator and they're not operators in themselves.

Also, $in operator is passed an array where its first item is checked for presence in the second item. The second item is usually an iterable.

Mongo 3.6

messenger_pipeline_status = (
    messenger_active_status['data']['pending'] 
    + messenger_active_status['data']['processing']
)
assigned_status = ['DEL_ASSIGNED','PU_ASSIGNED']

subpipeline =  [
    {
        '$match': {
            '$expr': {
                '$and': [
                    {'$eq': ['$fk_shipment_id', '$$pkey']},
                    {'$eq': ['$fk_messenger_id', fk_user_id]},
                    {'$in': ['$status_value', assigned_status]}
                ]
            }                       
        }
    },

    {
        '$sort': {
            'rec_timestamp': -1
        }
    },

    {
        '$limit': 1
    },

    {
        '$project': {
            'fk_shipment_id': 1
        }
    }
]

pipeline = [
    {
        '$match': {
            'status_value': {'$in': messenger_pipeline_status}
            'is_deleted': False,
            'is_postponed': False,
            'is_active': True,
        }
    },

    {
        '$lookup': {
            'from': 'db_shipment_status_history',
            'let': {'pkey':  '$pkey'},
            'pipeline': subpipeline,
            'as': 'shipment_status_history'
        }
    },

    {
        '$match': {
            'shipment_status_history': {
               '$ne': [] 
            } 
        }
    },

    {
        '$unwind': '$shipment_status_history'
    },

    {
        '$project': {
            '_id': 1,
            'pkey': 1,
            '_code': 1,
            'date_created': 1,
            'sender_full_name': '$sender.full_name',
            'sender_raw_address': '$sender.raw_address',
            'sender_formatted_address': '$sender.formatted_address',
            'receiver_full_name': '$receiver.full_name',
            'receiver_raw_address': '$receiver.raw_address',
            'receiver_formatted_address': '$receiver.formatted_address',
            'status_name': 1,
            'team_value': 1,
            'cs_name': 1,
            'fk_messenger_id': '$shipment_status_history.fk_shipment_id'
        }
    }   
]

result = db.db_shipment.aggregate(pipeline)
print(list(result))

[Edit] Mongo 3.2

The following aggregation pipeline produces similar results as the above and is valid query for Mongo 3.2.

messenger_pipeline_status = ['MISSFLT', 'OFFLOAD']
pipeline = [
    {
        '$match': {
            'status_value': { '$in': messenger_pipeline_status}
            'is_deleted': False,
            'is_postponed': False,
            'is_active': True,
        }
    },
    {
        "$lookup": {
            'from': 'db_shipment_status_history',
            'localField': 'pkey',
            'foreignField': 'fk_shipment_id',
            'as': 'shipment_status_history'
        }
    },

    {
        '$match': {
            'shipment_status_history': {
               '$ne': [] 
            } 
        }
    },

    {
        '$project': {
            '_id': 1,
            'pkey': 1,
            '_code': 1,
            'date_created': 1,
            'sender_full_name': '$sender.full_name',
            'sender_raw_address': '$sender.raw_address',
            'sender_formatted_address': '$sender.formatted_address',
            'receiver_full_name': '$receiver.full_name',
            'receiver_raw_address': '$receiver.raw_address',
            'receiver_formatted_address': '$receiver.formatted_address',
            'status_name': 1,
            'team_value': 1,
            'cs_name': 1,
            'shipment_status_history': {
                '$filter': {
                    'input': '$shipment_status_history',
                    'as': 'shipment',
                    'cond': {
                        '$and': [
                            {'$eq': ['$$shipment.fk_shipment_id', fk_user_id]},
                            {'$in': ['$$shipment.status_value', assigned_status]},
                        ]
                    }
                }
            },
        }
    },

    {
        '$unwind': '$shipment_status_history'
    },

    {
        '$sort': {
            'shipment_status_history.rec_timestamp': -1,
        }
    },

    {
        '$group': {
            '_id': '$pkey',
            'doc': {
                '$first': '$$CURRENT'
            }
        }
    },

    {
        '$unwind': '$doc'
    },

    { # last projection, I promise
        '$project': {
            '_id': '$doc.id',
            'pkey': '$doc.pkey',
            '_code': '$doc._code',
            'date_created': '$doc.date_created',
            'sender_full_name': '$doc.sender_full_name',
            'sender_raw_address': '$doc.sender_raw_address',
            'sender_formatted_address': '$doc.sender_formatted_address',
            'receiver_full_name': '$doc.receiver_full_name',
            'receiver_raw_address': '$doc.receiver_raw_address',
            'receiver_formatted_address': '$doc.receiver_formatted_address',
            'status_name': '$doc.status_name',
            'team_value': '$doc.team_value',
            'cs_name': '$doc.cs_name',
            'fk_messenger_id': '$doc.shipment_status_history.fk_shipment_id'
        }
    },
]

res = db.db_shipment.aggregate(pipeline)

Upvotes: 1

Related Questions