Joshua
Joshua

Reputation: 3719

How to serialize SqlAlchemy join query to JSON?

Can anyone help me design/find a JSON encoder that will work when using a join in my sqlalchemy query and will serialize the results in which I specify.

The query looks like so:data = db.session.query(Post).join(Users).filter(Post.area == id, Users.id == Post.user_id).all()

I then do a json dump like so: json_object = json.dumps(data, cls=AlchemyEncoder)

Traditionally I use my own JSON encoder which can be seen here:

class AlchemyEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o.__class__, DeclarativeMeta):
            data = {}
            fields = o.__json__() if hasattr(o, '__json__') else dir(o)
            for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                value = o.__getattribute__(field)
                try:
                    json.dumps(value)
                    data[field] = value
                except TypeError:
                    data[field] = None
            return data
        return json.JSONEncoder.default(self, o)

The function it references will look something like the following:

 def __json__(self):
        return ['id', 'user_id', 'message', 'media', 'time_uploaded', 'lifetime', 'votes', 'area']

However there are columns in the table of which I join in the query that I would like to include in the JSON object I create. My JSON encoder can obviously only use columns found in the first table I query.

EDIT:

Here are what my two SQLAlchemy classes look like:

class Post(Base):
    __tablename__ = "post"

    id = db.Column('id', UUIDType(binary=False), primary_key=True)
    user_id = db.Column('user_id', UUIDType(binary=False), db.ForeignKey('users.id'))
    votes = db.Column('votes', db.Integer)
    message = db.Column('message', db.Unicode)
    media = db.Column('media', db.Unicode)
    lifetime = db.Column('lifetime', db.Integer)
    time_uploaded = db.Column('time_uploaded', db.DateTime, server_default=func.now())
    area = db.Column('area', db.Unicode)

    user = db.relationship('Users', foreign_keys=user_id)

    def __json__(self):
        return ['id', 'username', 'name', 'profile_picture', 'message', 'media', 'time_uploaded', 'lifetime', 'votes', 'area']


class Users(Base):
    __tablename__ = "users"

    id = db.Column('id', UUIDType(binary=False), primary_key=True)
    username = db.Column('username', db.Unicode, unique=True)
    email = db.Column('email', db.Unicode, unique=True)
    password = db.Column('password', db.Unicode)
    name = db.Column('name', db.Unicode)
    bio = db.Column('bio', db.Unicode)
    profile_picture = db.Column('profile_picture', db.Unicode) 

Upvotes: 3

Views: 5378

Answers (3)

Victor Egiazarian
Victor Egiazarian

Reputation: 1096

I've already answered to the similar question, but guess it'll be helpful here as well.

Just use the following query with only required field specified:

users_posts = (
    await session.execute(
        select(
            User.c.id.label("user_id"), # Use label if you want to rename it 
            User.c.username,
            User.c.email,
            Post.c.media,  
            Post.c.message,        
            # Any other columns you need from joined tables
        )
        .outerjoin(Post, onclause=Post.c.user_id == User.c.id)
        .where(Post.c.area == area_id))
    )
).fetch_all()

Then you could convert it to pydantic model or similar to validate, postprocess (e.g. convert to json), etc. it in more convenient way.

class UsersPostsDb(pydantic.BaseModel):
    model_config = ConfigDict(from_attributes=True)

    user_id: uuid.UUID
    username: str
    email: str
    media: str
    message: str

[UsersPostsDb.model_validate(users_post) for user_post in user_posts] # That's it

Upvotes: 0

Haji
Haji

Reputation: 1

now it works for Row, not tuple.

so i rewrite.

class AlchemyEncoder(json.JSONEncoder):
def default(self, o):
    if isinstance(o, Row):
        data = {}
        for obj in o:
            data.update(self.parse_sqlalchemy_object(obj))
        return data
    if isinstance(o.__class__, DeclarativeMeta):
        return self.parse_sqlalchemy_object(o)
    return json.JSONEncoder.default(self, o)

def parse_sqlalchemy_object(self, o):
    data = {}
    fields = o.__json__() if hasattr(o, '__json__') else dir(o)
    for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class', 'registry']]:
        value = o.__getattribute__(field)
        try:
            json.dumps(value)
            data[field] = value
        except TypeError:
            data[field] = None
    return data


    return json.JSONEncoder.default(self, obj)

thank you

Upvotes: 0

DorElias
DorElias

Reputation: 2313

sqlalchemy put in the select part on the query whatever you pass to the query method, so if you want to get 2 classes you can do:

db.session.query(Post, Users).join(Users).filter(Post.area == id, Users.id == Post.user_id).all()

this will return an array of tuples of the Post and Users class (so you will have to change the encoder to be able to recieve tuples)

EDIT:

here is a sample to how you can change your encoder to accept tuples:

class AlchemyEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, tuple):
            data = {}
            for obj in o:
                data.update(self.parse_sqlalchemy_object(obj))
            return data
        if isinstance(o.__class__, DeclarativeMeta):
            return self.parse_sqlalchemy_object(o)
        return json.JSONEncoder.default(self, o)

    def parse_sqlalchemy_object(self, o):
        data = {}
        fields = o.__json__() if hasattr(o, '__json__') else dir(o)
        for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
            value = o.__getattribute__(field)
            try:
                json.dumps(value)
                data[field] = value
            except TypeError:
                data[field] = None
        return data

Upvotes: 3

Related Questions