David542
David542

Reputation: 110382

Doing Class.objects.filter(...) pattern in python

I am looking to use the pattern used in django models of Model.objects.filter(...) to build filters across data. This would probably be a good use case of pandas, but I'm more interested in improving my python (first) before trying that out.

If I have the following data:

DATA = [
    {'id': 1, 'name': 'brad', 'color':'red'},
    {'id': 2, 'name': 'sylvia', 'color':'blue'},
]

I would like to build something similar to the following:

class MyData:
    objects = <something>

And set the objects equivalent to a "ModelManager" and then do the filtering from there so that I can call:

MyData.objects.filter(id>1)

And get:

[
    {'id': 2, 'name': 'sylvia', 'color':'blue'}
]

Of course I can do something as simple as:

res = [_ for _ in DATA if _['id'] > 1]

But I'm more interested in designing the pattern itself -- the trivial nature of the example is just meant to show what I'm looking to accomplish.

What would be a good, basic way to do this properly? Here's the relevant class in django for it: https://github.com/django/django/blob/master/django/db/models/query.py#L185.

Upvotes: 5

Views: 14652

Answers (4)

slackmart
slackmart

Reputation: 4934

The following is an example where I'm creating a new NoteQuerySet class which inherits from django.db.models.QuerySet. After, I'm taking advantage of the as_manager method, by doing so, the objects manager is overriden preserving all the operations a manager is supposed to have.

So, in order to get the results you want, I've created a new custom_filter method, which operates over the NoteQuerySet.data and uses a dictionary for tracking and making it easy to add new filters.

As you can see, I'm creating a new custom_filter rather than overriding the objects.filter; this is intentional so you don't lose the native filtering. Also notice the operator built-in module for mapping easily strings to operations.

models.py

import operator

from collections import namedtuple

from django.db import models


class NoteQuerySet(models.QuerySet):
    data =  [
        {'id': 1, 'name': 'brad', 'color':'red'},
        {'id': 2, 'name': 'sylvia', 'color':'blue'},
        {'id': 3, 'name': 'sylwia', 'color':'green'},
        {'id': 4, 'name': 'shane', 'color':'red'},
    ]
    allowed_operations = {'gt': operator.gt, 'lt': operator.lt, 'eq': operator.eq}

    def custom_filter(self, **kwargs):
        """
        >>> kwargs = {'name': 'sylwia', 'id__gt': 1}
        dict_items([('name', 'sylwia'), ('id__gt', 1)])
        """
        operation = namedtuple('Q', 'op key value')
        def parse_filter(item):
            """item is expected to be a tuple with exactly two elements
            >>> parse_filter(('id__gt', 2))
            Q(op=<built-in function gt>, key='id', value=2)
            """
            key, *op = item[0].split('__')
            # no value after __ means exact value query, e.g. name='sylvia'
            op = op or ['eq']
            return operation(self.allowed_operations[op[0]], key, item[1])

        filtered_data = self.data.copy()
        for item in map(parse_filter, kwargs.items()):
            filtered_data = [
                entry for entry in filtered_data if item.op(entry[item.key], item.value)
            ]
        return filtered_data


class Note(models.Model):
    text = models.CharField(max_length=250)

    objects = NoteQuerySet.as_manager()

All the logic till now is implemented in the models module. Next, a possible use case is shown in a ListView.

views.py

from django.views.generic import ListView

from .models import Note


class ResultsApplicationView(ListView):
    model = Note
    template_name = 'results.html'

    def get_context_data(self, **kwargs):
        kwargs = super().get_context_data(**kwargs)
        if 'extra' not in kwargs:
            kwargs['extra'] = self.model.objects.custom_filter(id__lt=3, color='red')
        return kwargs

results.html

  <h1>Notes</h1>
  {% for note in object_list %}
    {{note}}
  {% endfor %}

  {{ extra }}

UPDATE: Non django implementation:

import operator

from collections import namedtuple


class DataQuerySet:
    allowed_operations = {
        'gt': operator.gt,
        'lt': operator.lt,
        'eq': operator.eq,
        'in': operator.contains,
    }

    def __init__(self, data):
        self.data = data

    def filter(self, **kwargs):
        """
        >>> kwargs = {'name': 'sylwia', 'id__gt': 1}
        >>> DataQuerySet().filter(**kwargs)
        [{'id': 3, 'name': 'sylwia', 'color': 'green'}]
        """
        operation = namedtuple('Q', 'op key value')
        def parse_filter(item):
            """item is expected to be a tuple with exactly two elements
            >>> parse_filter(('id__gt', 2))
            Q(op=<built-in function gt>, key='id', value=2)
            >>> parse_filter(('id__  ', 2))
            Q(op=<built-in function eq>, key='id', value=2)
            >>> parse_filter(('color__bad', 'red'))
            Traceback (most recent call last):
             ...
            AssertionError: 'bad' operation is not allowed
            """
            key, *op = item[0].split('__')
            # no value after __ means exact value query, e.g. name='sylvia'
            op = ''.join(op).strip() or 'eq'
            assert op in self.allowed_operations, f'{repr(op)} operation is not allowed'
            return operation(self.allowed_operations[op], key, item[1])

        filtered_data = self.data.copy()
        results = []
        for item in map(parse_filter, kwargs.items()):
            for entry in filtered_data:
                if item.op == operator.contains and all(item.op(entry[item.key], v) for v in item.value):
                    results.append(entry)
                elif item.op(entry[item.key], item.value):
                    results.append(entry)
        return results


class Data:

    def __init__(self, data):
        self._data = DataQuerySet(data)

    @property
    def objects(self):
        return self._data


if __name__ == '__main__':
    data = [
        {'id': 1, 'name': 'brad', 'color': 'red', 'tags': ['c++', 'javascript']},
        {'id': 2, 'name': 'sylvia', 'color': 'blue', 'tags': ['c++']},
        {'id': 3, 'name': 'sylwia', 'color': 'green', 'tags': ['c++', 'javascript', 'python']},
        {'id': 4, 'name': 'shane', 'color': 'red', 'tags': ['c++', 'javascript', 'python']},
    ]
    d = Data(data)
    print('Entries with id greater than 2:', d.objects.filter(id__gt=2))
    print('Entries with color="green":', d.objects.filter(color='green'))
    print('Entries with "python" in tags:', d.objects.filter(tags__in=['python']))

__in operation accepts a list of values. This code assumes you want all of them to be present in the tags (that's why we use all(item.op(entry[item.key], v) for v in item.value)).

Upvotes: 2

Nizam Mohamed
Nizam Mohamed

Reputation: 9240

The OP wants to do this MyData.objects.filter(id>1).

Let's face it.

The problem is Python is greedy (eagerly evaluates expressions), not lazy like Haskell.
Watch David Beazley - Lambda Calculus from the Ground Up - PyCon 2019 for mind-bending λ thing.

Python evaluates id > 1 before calling filter. If we can stop the evaluation for now, we can pass the expression unevaluated to the filter function.

But we can delay expression evaluation until required if we enclose the expression in a function. That's the idea.

The function interface would be filter(lambda: id > 1) if we could implement it. This interface will be super versatile because any Python expression can be passed and abused.

The implementation;

if we invoke the lambda or any other function with the expression id > 1, Python looks up the name id in the local, enclosing, global scope or builtins depending on the context where the function is invoked.

If we can introduce an object with the name id somewhere in the look-up path before Python finds id in the builtins we can redefine the semantics of the expression.

I'm gonna do it with eval which evaluates expressions in the given context.

DATA = [
    {'id': 1, 'name': 'brad', 'color':'red'},
    {'id': 2, 'name': 'sylvia', 'color':'blue'},
]

def myfilter(a_lambda):
    return filter(lambda obj: eval(a_lambda.__code__, obj.copy()),
    DATA)

I pass a dict.copy to eval because eval modifies it's globals object.

See it in action in the context of Model class

In [1]: class Data(Model):
   ...:     name = str()
   ...:     id = int()
   ...:     color = str()
   ...: 

In [2]: Data.objects.create(**{"id": 1, "name": "brad", "color": "red"})

In [3]:     Data.objects.create(**{"id": 2, "name": "sylvia", "color": "blue"})

In [4]:     Data.objects.create(**{"id": 3, "name": "paul", "color": "red"})

In [5]:     Data.objects.create(**{"id": 4, "name": "brandon", "color": "yello"})

In [6]:     Data.objects.create(**{"id": 5, "name": "martin", "color": "green"})

In [7]:     Data.objects.create(**{"id": 6, "name": "annie", "color": "gray"})

In [8]: pprint([vars(obj) for obj in Data.objects.filter(lambda: id == 1)])
[{'color': 'red', 'id': 1, 'name': 'brad'}]

In [9]: pprint([vars(obj) for obj in Data.objects.filter(lambda: 1 <= id <= 2)])
[{'color': 'red', 'id': 1, 'name': 'brad'},
 {'color': 'blue', 'id': 2, 'name': 'sylvia'}]

In [10]: pprint([vars(obj) for obj in Data.objects.filter(lambda: color == "blue")])
[{'color': 'blue', 'id': 2, 'name': 'sylvia'}]

In [11]: pprint([vars(obj) for obj in Data.objects.filter(lambda: "e" in color and (name is "brad" or name is "sylvia"))])
[{'color': 'red', 'id': 1, 'name': 'brad'},
 {'color': 'blue', 'id': 2, 'name': 'sylvia'}]

In [12]: pprint([vars(obj) for obj in Data.objects.filter(lambda: id % 2 == 1)])
[{'color': 'red', 'id': 1, 'name': 'brad'},
 {'color': 'red', 'id': 3, 'name': 'paul'},
 {'color': 'green', 'id': 5, 'name': 'martin'}]

The Data class inherits from Model. The Model gives Data the __init__ method and a class attribute named objects that points to a MetaManager instance which is a descriptor.

The MetaManager returns a Manager instance to sub classes of Model upon access of objects attribute from the subclass. The MetaManger identifies the accessing class and passes that to the Manager instance. The Manager handles object creation, persistence and fetch.

The db is implemented as a class attribute of Manager for simplicity.

To stop abuse with global objects via functions the filter function raises an exception if a lambda is not passed.

from collections import defaultdict
from collections.abc import Callable


class MetaManager:
    def __get__(self, obj, objtype):
        if obj is None:
            return Manager(objtype)
        else:
            raise AttributeError(
                "Manger isn't accessible via {} instances".format(objtype)
            )


class Manager:
    _store = defaultdict(list)

    def __init__(self, client):
        self._client = client
        self._client_name = "{}.{}".format(client.__module__, client.__qualname__)

    def create(self, **kwargs):
        self._store[self._client_name].append(self._client(**kwargs))

    def all(self):
        return (obj for obj in self._store[self._client_name])

    def filter(self, a_lambda):
        if a_lambda.__code__.co_name != "<lambda>":
            raise ValueError("a lambda required")

        return (
            obj
            for obj in self._store[self._client_name]

            if eval(a_lambda.__code__, vars(obj).copy())
        )


class Model:
    objects = MetaManager()

    def __init__(self, **kwargs):
        if type(self) is Model:
            raise NotImplementedError

        class_attrs = self.__get_class_attributes(type(self))

        self.__init_instance(class_attrs, kwargs)

    def __get_class_attributes(self, cls):
        attrs = vars(cls)
        if "objects" in attrs:
            raise AttributeError(
                'class {} has an attribute named "objects" of type "{}"'.format(
                    type(self), type(attrs["objects"])
                )
            )
        attrs = {
            attr: obj
            for attr, obj in vars(cls).items()
            if not attr.startswith("_") and not isinstance(obj, Callable)
        }
        return attrs

    def __init_instance(self, attrs, kwargs_dict):
        for key, item in kwargs_dict.items():
            if key not in attrs:
                raise TypeError('Got an unexpected key word argument "{}"'.format(key))
            if isinstance(item, type(attrs[key])):
                setattr(self, key, item)
            else:
                raise TypeError(
                    "Expected type {}, got {}".format(type(attrs[key]), type(item))
                )


if __name__ == "__main__":
    from pprint import pprint

    class Data(Model):
        name = str()
        id = int()
        color = str()

    Data.objects.create(**{"id": 1, "name": "brad", "color": "red"})
    Data.objects.create(**{"id": 2, "name": "sylvia", "color": "blue"})
    Data.objects.create(**{"id": 3, "name": "paul", "color": "red"})
    Data.objects.create(**{"id": 4, "name": "brandon", "color": "yello"})
    Data.objects.create(**{"id": 5, "name": "martin", "color": "green"})
    Data.objects.create(**{"id": 6, "name": "annie", "color": "gray"})

    pprint([vars(obj) for obj in Data.objects.filter(lambda: id == 1)])
    pprint([vars(obj) for obj in Data.objects.filter(lambda: 1 <= id <= 2)])
    pprint([vars(obj) for obj in Data.objects.filter(lambda: color == "blue")])
    pprint(
        [
            vars(obj)
            for obj in Data.objects.filter(
                lambda: "e" in color and (name is "brad" or name is "sylvia")
            )
        ]
    )
    pprint([vars(obj) for obj in Data.objects.filter(lambda: id % 2 == 1)])

Upvotes: 4

j-i-l
j-i-l

Reputation: 10977

If you want the full django Model experience, i.e.:

  • create a new feature vector or data entry with datapoint = MyData(name='johndoe', color='green', ...) just like in django: e.g. new_user=User(username='johndoe', email='[email protected]');
  • use the MyData.objects for object management, like MyData.objects.filter(color__eq='yellow');

here is an approach on how the logic could look like.

First you need basically a naive ObjectManager class:

import collections
import operator
import inspect

class ObjectManager(collections.MutableSet):
    def __init__(self):
        # this will hold a list of all attributes from your custom class, once 
        # initiated
        self._object_attributes = None
        self._theset = set()
    def add(self, item):
        self._theset.add(item)
    def discard(self, item):
        self._theset.discard(item)
    def __iter__(self):
        return iter(self._theset)
    def __len__(self):
        return len(self._theset)
    def __contains__(self, item):
        try:
            return item in self._theset
        except AttributeError:
            return False

    def set_attributes(self, an_object):
        self._object_attributes = [
            a[0] for a in  inspect.getmembers(
                an_object, lambda a:not(inspect.isroutine(a))
            ) if not(a[0].startswith('__') and a[0].endswith('__'))
            ]

    def filter(self, **kwargs):
        """Filters your objects according to one or several conditions

        If several filtering conditions are present you can set the 
        combination mode to either 'and' or 'or'.
        """
        mode = kwargs.pop('mode', 'or')
        ok_objects = set()
        for kw in kwargs:
            if '__' in kw:
                _kw, op = kw.split('__')
                # only allow valid operators
                assert op in ('lt', 'le', 'eq', 'ne', 'ge', 'gt')
            else:
                op = 'eq'
                _kw = kw
            _oper = getattr(operator, op)
            # only allow access to valid object attributes
            assert _kw in self._object_attributes
            n_objects = (
                obj for obj in self 
                if _oper(getattr(obj, _kw), kwargs[kw])
                )
            if mode == 'and':
                if n_objects:
                    ok_objects = ok_objects.intersection(n_objects)\
                        if ok_objects else set(n_objects)
                else:
                    return set()

            else:
                ok_objects.update(n_objects)
        return ok_objects

    # feel free to add a `get_or_create`, `create`, etc. 

Now you attach an instance of this class as attribute to your MyData class and make sure all new objects are added to it:

class MyData:
    # initiate the object manager
    objects = ObjectManager()

    def __init__(self, uid, name, color):
        self.uid = uid
        self.name = name
        self.color = color

        # populate the list of query-able attributes on creation
        # of the first instance
        if not len(self.objects):
            self.objects.set_attributes(self)
        # add any new instance to the object manager
        self.objects.add(self)

Now you can import your feature vector:

DATA = [
    {'uid': 1, 'name': 'brad', 'color':'red'},
    {'uid': 2, 'name': 'sylvia', 'color':'blue'},
]
for dat in DATA:
    myData(**dat)

or create new instances:

d1 = MyData(uid=10, name='john', color='yellow')

and make use of the manager to filter your objects:

print([md.name for md in MyData.objects.filter(uid__ge=10)])
# > ['john']
print([md.name for md in MyData.objects.filter(mode='and',uid__ge=1,name__eq='john')])
# > ['john']
print([md.name for md in MyData.objects.filter(mode='or',uid__le=4,name__eq='john')])
# > ['john', 'brad', 'sylvia']


If you cannot or don't want to change the class you want an object manager for, and you are willing to monkey patch around (note that I'm not advertising this!) you can even create a ObjectManager that can be hooked to an arbitrary class (built-in types won't work though) after definition or even initiation of some instances.

The idea is to monkey patch __init__ of the target class and add the objects attribute upon init of an instance of your ObjectManager:

import gc
import inspect
import collections
import operator
import wrapt  # not standard lib > pip install wrapt

class ObjectManager(collections.MutableSet):
    def __init__(self, attach_to):
        self._object_attributes = None
        # add self as class attribute
        attach_to.objects = self
        # monkey patch __init__ of your target class
        @wrapt.patch_function_wrapper(attach_to, '__init__')
        def n_init(wrapped, instance, args, kwargs):
            wrapped(*args, **kwargs)
            c_objects = instance.__class__.objects
            if not c_objects:
                c_objects.set_attributes(instance)
            c_objects.add(instance)
        # make sure to be up to date with the existing instances
        self._theset = set(obj for obj in gc.get_objects() if isinstance(obj, attach_to))
        # already fetch the attributes if instances exist
        if self._theset:
            self.set_attributes(next(iter(self._theset)))
        ...
        # the rest is identical to the version above

So now this is how you would use it:

class MyData:

    def __init__(self, uid, name, color):
        self.uid = uid
        self.name = name
        self.color = color

# create some instances
DATA = [
    {'uid': 1, 'name': 'brad', 'color':'red'},
    {'uid': 2, 'name': 'sylvia', 'color':'blue'},
]
my_datas = []
for dat in DATA:
    my_datas.append(myData(**dat))  # appending them just to have a reference
# say that ONLY NOW you decide you want to use an object manager
# Simply do:
ObjectManager(MyData)
# and you are done:
print([md.name for md in MyData.objects.filter(mode='or',uid__le=4,name__eq='john')])
# > ['brad', 'sylvia']
# also any object you create from now on is included:
d1 = MyData(uid=10, name='john', color='yellow')
print([md.name for md in MyData.objects.filter(mode='or',uid__le=4,name__eq='john')])
# > ['brad', 'sylvia', 'john']

Upvotes: 2

gelonida
gelonida

Reputation: 5630

Is this what you mean?

This solution depends on no external library and uses **kwargs, generators / closures and the @property decorator. So from a learning point of view it might be interesting.

If you manage to use Django to read the data, that is in your list, then this would probably be much better concerning Django compatibility as my code. It all depends on what your goal is. (Perfect imitation of django filters) or (learning about how to do a not so perfect imitation, but have the whole source code without dependencies)

DATA = [
    {'id': 1, 'name': 'brad',    'color':'red'},
    {'id': 2, 'name': 'sylvia',  'color':'blue'},
    {'id': 3, 'name': 'paul',    'color':'red'},
    {'id': 4, 'name': 'brandon', 'color':'yello'},
    {'id': 5, 'name': 'martin',  'color':'green'},
    {'id': 6, 'name': 'annie',  'color':'gray'},
]

class UnknownOperator(Exception):
    """ custom exception """

class FilterData:
    def __init__(self, data):
        self.data = data

    def _filter_step(self, key, value, data):
        if not "__" in key:
            return (entry for entry in data if entry[key] == value)
        else:
            key, operator = key.split("__")
            if operator == "gt":  # greater than
                return (entry for entry in data if entry[key] > value)
            elif operator == "lt":  # less than
                return (entry for entry in data if entry[key] < value)
            elif operator == "startswith":  # starts with
                return (entry for entry in data if entry[key].startswith(value))
            elif operator == "in":  # starts with
                return (entry for entry in data if entry[key] in value)
            else:
                raise UnknownOperator("operator %s is unknown" % operator)

    def _exclude_step(self, key, value, data):
        if not "__" in key:
            return (entry for entry in data if entry[key] != value)
        else:
            key, operator = key.split("__")
            if operator == "gt":  # greater than
                return (entry for entry in data if entry[key] <= value)
            elif operator == "lt":  # less than
                return (entry for entry in data if entry[key] >= value)
            elif operator == "startswith":  # starts with
                return (entry for entry in data if not entry[key].startswith(value))
            elif operator == "in":  # starts with
                return (entry for entry in data if entry[key] not in value)
            else:
                raise UnknownOperator("operator %s is unknown" % operator)


    def filter(self, **kwargs):
        data = (entry for entry in self.data)
        for key, value in kwargs.items():
            data = self._filter_step(key, value, data)

        return FilterData(data)

    def exclude(self, **kwargs):
        data = (entry for entry in self.data)
        for key, value in kwargs.items():
            data = self._exclude_step(key, value, data)

        return FilterData(data)

    def all(self):
        return FilterData(self.data)

    def count(self):
        cnt = 0
        for cnt, entry in enumerate(self.data, 1):
            pass
        return cnt

    def __iter__(self):
        for entry in self.data:
            yield entry

# make it even more look like django managers / filters
class DataManager:
    def __init__(self, data):
        self.data = data
    @property
    def objects(self):
        return FilterData(self.data)


fdata = FilterData(DATA)

assert [v["id"] for v in fdata.filter(name="paul")] == [3]
assert [v["id"] for v in fdata.filter(color="red")] == [1, 3]
assert [v["id"] for v in fdata.filter(id__gt=2)] == [3, 4, 5, 6]
assert [v["id"] for v in fdata.filter(color__startswith="gr")] == [5, 6]

fmgr = DataManager(DATA)

assert [v["id"] for v in fmgr.objects.filter(name="paul")] == [3]
assert [v["id"] for v in fmgr.objects.filter(color="red")] == [1, 3]
assert [v["id"] for v in fmgr.objects.filter(id__gt=2)] == [3, 4, 5, 6]
assert [v["id"] for v in fmgr.objects.filter(color__startswith="gr")] == [5, 6]
assert [v["id"] for v in fmgr.objects.filter(color__startswith="gr", id__lt=6)] == [5]
assert [v["id"] for v in fmgr.objects.filter(color__startswith="gr", id__lt=6)] == [5]

assert [v["id"] for v in fmgr.objects.filter(color__startswith="gr").filter(id__lt=6)] == [5]

assert fmgr.objects.filter(color__startswith="gr").filter(id__lt=6).count() == 1
assert fmgr.objects.filter(id__gt=2).count() == 4
assert fmgr.objects.count() == 6
assert [v["id"] for v in fmgr.objects.all()] == list(range(1, 7))

Upvotes: 1

Related Questions