Ojomio
Ojomio

Reputation: 1053

How to make use of bindparam() in a custom Compiled expression?

I based my code on the @zzzeeek's answer to this question . I extended it a little so it takes into account NULLs and ARRAY for Postgresql.

class values(FromClause):
    named_with_column = True

    def __init__(self, columns, *args, **kw):
        self._column_args = columns
        self.list = args
        self.alias_name = self.name = kw.pop('alias_name', None)

    def _populate_column_collection(self):
        # self._columns.update((col.name, col) for col in self._column_args)
        for c in self._column_args:
        c._make_proxy(self, c.name)


@compiles(values)
def compile_values(element, compiler, asfrom=False, **kw):
    columns = element.columns
    v = "VALUES %s" % ", ".join(
    "(%s)" % ", ".join(
        ((compiler.visit_array(elem)+'::'+str(column.type)) if isinstance(column.type, ARRAY) else
         compiler.render_literal_value(elem, column.type))
        if elem is not None else compiler.render_literal_value(elem, NULLTYPE)
        for elem, column in zip(tup, columns))
    for tup in element.list
    )
    if asfrom:
        if element.alias_name:
            v = "(%s) AS %s (%s)" % (v, element.alias_name, (", ".join(c.name for c in element.columns)))
        else:
            v = "(%s)" % v
    return v

Everything worked fine until it turned out I couldn't insert values with "%"-sign to this VALUES clause - they get inlined in a resulting statement and this seems to cause binding problems

I guess if instead of render_literal_value() we used bindparam() we could avoid such an error. But Everything under @compiles should return plain text, am I right? How could I amend this to get parameter-based query?

Upvotes: 3

Views: 1289

Answers (1)

Ojomio
Ojomio

Reputation: 1053

I got it. The actual values of bindparams are kept in an object called SQLCompiler which is generally dialect-specific. Here(link to GitHub) is where bindparams get stored in a SQLCompiler instance during the query compilation process

So the final version of my code snippet looks like this:

class values(FromClause):
    named_with_column = True

    def __init__(self, columns, *args, **kw):
        self._column_args = columns
        self.list = args
        self.alias_name = self.name = kw.pop('alias_name', None)

    def _populate_column_collection(self):
        # self._columns.update((col.name, col) for col in self._column_args)
        for c in self._column_args:
            c._make_proxy(self, c.name)


@compiles(values)
def compile_values(clause, compiler, asfrom=False, **kw):
    def decide(value, column):
        add_type_hint = False
        if isinstance(value, array) and not value.clauses:  # for empty array literals
            add_type_hint = True

        if isinstance(value, ClauseElement):
            intermediate = compiler.process(value)
            if add_type_hint:
                intermediate += '::' + str(column.type)
            return intermediate

        elif value is None:
            return compiler.render_literal_value(
                value,
                NULLTYPE
            ) + '::' + str(column.type)
        else:
            return compiler.process(
                bindparam(
                    None,
                    value=compiler.render_literal_value(
                        value,
                        column.type
                    ).strip("'")
                )
            ) + '::' + str(column.type)

    columns = clause.columns
    v = "VALUES %s" % ", ".join(
        "(%s)" % ", ".join(
            decide(elem, column)
            for elem, column in zip(tup, columns))
        for tup in clause.list
    )
    if asfrom:
        if clause.alias_name:
            v = "(%s) AS %s (%s)" % (v, clause.alias_name, (", ".join(c.name for c in clause.columns)))
        else:
            v = "(%s)" % v
    return v

Upvotes: 2

Related Questions