Reputation: 1053
I based my code on the @zzzeeek's answer to this question . I extended it a little so it takes into account NULLs and ARRAY for Postgresql.
class values(FromClause):
named_with_column = True
def __init__(self, columns, *args, **kw):
self._column_args = columns
self.list = args
self.alias_name = self.name = kw.pop('alias_name', None)
def _populate_column_collection(self):
# self._columns.update((col.name, col) for col in self._column_args)
for c in self._column_args:
c._make_proxy(self, c.name)
@compiles(values)
def compile_values(element, compiler, asfrom=False, **kw):
columns = element.columns
v = "VALUES %s" % ", ".join(
"(%s)" % ", ".join(
((compiler.visit_array(elem)+'::'+str(column.type)) if isinstance(column.type, ARRAY) else
compiler.render_literal_value(elem, column.type))
if elem is not None else compiler.render_literal_value(elem, NULLTYPE)
for elem, column in zip(tup, columns))
for tup in element.list
)
if asfrom:
if element.alias_name:
v = "(%s) AS %s (%s)" % (v, element.alias_name, (", ".join(c.name for c in element.columns)))
else:
v = "(%s)" % v
return v
Everything worked fine until it turned out I couldn't insert values with "%"-sign to this VALUES clause - they get inlined in a resulting statement and this seems to cause binding problems
I guess if instead of render_literal_value()
we used bindparam()
we could avoid such an error. But Everything under @compiles
should return plain text, am I right? How could I amend this to get parameter-based query?
Upvotes: 3
Views: 1289
Reputation: 1053
I got it. The actual values of bindparams are kept in an object called SQLCompiler
which is generally dialect-specific.
Here(link to GitHub) is where bindparams get stored in a SQLCompiler
instance during the query compilation process
So the final version of my code snippet looks like this:
class values(FromClause):
named_with_column = True
def __init__(self, columns, *args, **kw):
self._column_args = columns
self.list = args
self.alias_name = self.name = kw.pop('alias_name', None)
def _populate_column_collection(self):
# self._columns.update((col.name, col) for col in self._column_args)
for c in self._column_args:
c._make_proxy(self, c.name)
@compiles(values)
def compile_values(clause, compiler, asfrom=False, **kw):
def decide(value, column):
add_type_hint = False
if isinstance(value, array) and not value.clauses: # for empty array literals
add_type_hint = True
if isinstance(value, ClauseElement):
intermediate = compiler.process(value)
if add_type_hint:
intermediate += '::' + str(column.type)
return intermediate
elif value is None:
return compiler.render_literal_value(
value,
NULLTYPE
) + '::' + str(column.type)
else:
return compiler.process(
bindparam(
None,
value=compiler.render_literal_value(
value,
column.type
).strip("'")
)
) + '::' + str(column.type)
columns = clause.columns
v = "VALUES %s" % ", ".join(
"(%s)" % ", ".join(
decide(elem, column)
for elem, column in zip(tup, columns))
for tup in clause.list
)
if asfrom:
if clause.alias_name:
v = "(%s) AS %s (%s)" % (v, clause.alias_name, (", ".join(c.name for c in clause.columns)))
else:
v = "(%s)" % v
return v
Upvotes: 2