Adam Morris
Adam Morris

Reputation: 8545

Easy convert betwen SQLAlchemy column types and python data types?

I'm looking for an easy python way to compare column types from SQLAlchemy to base types. For example, if my column type is a VARCHAR of any length, I want to read it as a string.

I can read the column type okay, but I'm not sure an easy way to verify it's basic type... it would be nice if I could use something like "if isinstance(mycolumn, int)" - but I'm new to python and not sure how this would work.

Here's what I have so far:

from sqlalchemy import MetaData
from sqlalchemy import create_engine, Column, Table
engine = create_engine('mysql+mysqldb://user:pass@localhost:3306/mydb', pool_recycle=3600)
meta = MetaData()
meta.bind = engine
meta.reflect()
datatable = meta.tables['my_data_table']
[c.type for c in datatable.columns]

Output:

[INTEGER(display_width=11), DATE(), VARCHAR(length=127), DOUBLE(precision=None, scale=None, asdecimal=True)]

My end purpose is twofold, first because I want to format the output based on the type when I load it into my jQuery jqGrid. The second, is I'm slowly converting non-normalized data tables into a normalized structure, and want to ensure that I keep my types consistent - (to make sure my numbers in the previous table are stored as numbers and not strings...)

Upvotes: 21

Views: 28734

Answers (4)

Sereger
Sereger

Reputation: 1091

Just use the python_type attribute available in all AQLAlchemy types:

[c.type.python_type for c in datatable.columns]

Upvotes: 25

Simon Streicher
Simon Streicher

Reputation: 2826

Python types to SQL types:

I struggled with the problem of creating SQL tables on-the-fly with default sql-types. I ended up with the following handy functions for all my a python type to a sql-type conversion needs. To go from sql-type to python type is trivial as will be explained in the next section.

import sqlalchemy
import numpy as np

import datetime
import decimal

_type_py2sql_dict = {
 int: sqlalchemy.sql.sqltypes.BigInteger,
 str: sqlalchemy.sql.sqltypes.Unicode,
 float: sqlalchemy.sql.sqltypes.Float,
 decimal.Decimal: sqlalchemy.sql.sqltypes.Numeric,
 datetime.datetime: sqlalchemy.sql.sqltypes.DateTime,
 bytes: sqlalchemy.sql.sqltypes.LargeBinary,
 bool: sqlalchemy.sql.sqltypes.Boolean,
 datetime.date: sqlalchemy.sql.sqltypes.Date,
 datetime.time: sqlalchemy.sql.sqltypes.Time,
 datetime.timedelta: sqlalchemy.sql.sqltypes.Interval,
 list: sqlalchemy.sql.sqltypes.ARRAY,
 dict: sqlalchemy.sql.sqltypes.JSON
}

def type_py2sql(pytype):
    '''Return the closest sql type for a given python type'''
    if pytype in _type_py2sql_dict:
        return _type_py2sql_dict[pytype]
    else:
        raise NotImplementedError(
            "You may add custom `sqltype` to `"+str(pytype)+"` assignment in `_type_py2sql_dict`.")

def type_np2py(dtype=None, arr=None):
    '''Return the closest python type for a given numpy dtype'''

    if ((dtype is None and arr is None) or
        (dtype is not None and arr is not None)):
        raise ValueError(
            "Provide either keyword argument `dtype` or `arr`: a numpy dtype or a numpy array.")

    if dtype is None:
        dtype = arr.dtype

    #1) Make a single-entry numpy array of the same dtype
    #2) force the array into a python 'object' dtype
    #3) the array entry should now be the closest python type
    single_entry = np.empty([1], dtype=dtype).astype(object)

    return type(single_entry[0])

def type_np2sql(dtype=None, arr=None):
    '''Return the closest sql type for a given numpy dtype'''
    return type_py2sql(type_np2py(dtype=dtype, arr=arr))

Some usecases:

>>> sqlalchemy.Column(type_py2sql(int))
Column(None, BigInteger(), table=None)

>>> type_py2sql(type('hello'))
sqlalchemy.sql.sqltypes.Unicode

>>> type_np2sql(arr=np.array([1.,2.,3.]))
sqlalchemy.sql.sqltypes.Float

How I chose my conversion set:

What I did was to map all the sql-types to their equivalent python types. I then printed which python type corresponds to which sql-types and picked the best sql-type for each python type. Here is the code I used to generate this mapping:

#********** SQL to Python: one to one **********
type_sql2py_dict = {}
for key in sqlalchemy.types.__dict__['__all__']:
    sqltype = getattr(sqlalchemy.types, key)

    if 'python_type' in dir(sqltype) and not sqltype.__name__.startswith('Type'):
        try:
            typeinst = sqltype()
        except TypeError as e: #List/array wants inner-type
            typeinst = sqltype(None)

        try:
            type_sql2py_dict[sqltype] = typeinst.python_type
        except NotImplementedError:
            pass

#********** Python to SQL: one to many **********
type_py2sql_dict = {}
for key, val in type_sql2py_dict.items():
    if not val in type_py2sql_dict:
        type_py2sql_dict[val] = [key]
    else:
        type_py2sql_dict[val].append(key)

And here is the output of type_py2sql_dict under sqlalchemy version 1.3.5:

{int: [sqlalchemy.sql.sqltypes.INTEGER,
  sqlalchemy.sql.sqltypes.BIGINT,
  sqlalchemy.sql.sqltypes.SMALLINT,
  sqlalchemy.sql.sqltypes.Integer,
  sqlalchemy.sql.sqltypes.SmallInteger,
  sqlalchemy.sql.sqltypes.BigInteger],
 str: [sqlalchemy.sql.sqltypes.CHAR,
  sqlalchemy.sql.sqltypes.VARCHAR,
  sqlalchemy.sql.sqltypes.NCHAR,
  sqlalchemy.sql.sqltypes.NVARCHAR,
  sqlalchemy.sql.sqltypes.TEXT,
  sqlalchemy.sql.sqltypes.Text,
  sqlalchemy.sql.sqltypes.CLOB,
  sqlalchemy.sql.sqltypes.String,
  sqlalchemy.sql.sqltypes.Unicode,
  sqlalchemy.sql.sqltypes.UnicodeText,
  sqlalchemy.sql.sqltypes.Enum],
 float: [sqlalchemy.sql.sqltypes.FLOAT,
  sqlalchemy.sql.sqltypes.REAL,
  sqlalchemy.sql.sqltypes.Float],
 decimal.Decimal: [sqlalchemy.sql.sqltypes.NUMERIC,
  sqlalchemy.sql.sqltypes.DECIMAL,
  sqlalchemy.sql.sqltypes.Numeric],
 datetime.datetime: [sqlalchemy.sql.sqltypes.TIMESTAMP,
  sqlalchemy.sql.sqltypes.DATETIME,
  sqlalchemy.sql.sqltypes.DateTime],
 bytes: [sqlalchemy.sql.sqltypes.BLOB,
  sqlalchemy.sql.sqltypes.BINARY,
  sqlalchemy.sql.sqltypes.VARBINARY,
  sqlalchemy.sql.sqltypes.LargeBinary,
  sqlalchemy.sql.sqltypes.Binary],
 bool: [sqlalchemy.sql.sqltypes.BOOLEAN, sqlalchemy.sql.sqltypes.Boolean],
 datetime.date: [sqlalchemy.sql.sqltypes.DATE, sqlalchemy.sql.sqltypes.Date],
 datetime.time: [sqlalchemy.sql.sqltypes.TIME, sqlalchemy.sql.sqltypes.Time],
 datetime.timedelta: [sqlalchemy.sql.sqltypes.Interval],
 list: [sqlalchemy.sql.sqltypes.ARRAY],
 dict: [sqlalchemy.sql.sqltypes.JSON]}

Upvotes: 16

Anurag
Anurag

Reputation: 180

You can do a str(column.type) this will give you the type as a string. In you code

    from sqlalchemy import MetaData
    from sqlalchemy import create_engine, Column, Table
    engine = create_engine('mysql+mysqldb://user:pass@localhost:3306/mydb', pool_recycle=3600)
    meta = MetaData()
    meta.bind = engine
    meta.reflect()
    datatable = meta.tables['my_data_table']
    [str(c.type) for c in datatable.columns]

you will get a list with the data types.Hope this helps you

Upvotes: 0

Adam Morris
Adam Morris

Reputation: 8545

One solution is to do the conversion manually - for example, this works:

def convert(self, saType):
    type = "Unknown"
    if isinstance(saType,sqlalchemy.types.INTEGER):
        type = "Integer"
    elif isinstance(saType,sqlalchemy.types.VARCHAR):
        type = "String"
    elif isinstance(saType,sqlalchemy.types.DATE):
        type = "Date"
    elif isinstance(saType,sqlalchemy.dialects.mysql.base._FloatType):
        type = "Double"
    return type

Not sure if this is a normal python way of doing things... I still think like a java programmer.

Upvotes: 7

Related Questions