Michael De Paz
Michael De Paz

Reputation: 41

Dask Dataframe doesn't bind from read_sql_query

Recent I decided work with Python Dask and SqlAlchemy, then realize test with read_sql_table and works fine, but when use a read_sql_query no worked, I tried this:

import dask.dataframe as dd
from sqlalchemy import create_engine, MetaData, Table, select, Column, BigInteger, Numeric, String

server = 'HYPER-CKS12423\\INSTANCERISK'
database = 'Sigma'
username = 'CMS002'
password = 'V=!234-APC'
driver = 'ODBC+DRIVER+17+for+SQL+Server'

engine_str = 'mssql+pyodbc://{}:{}@{}/{}?driver={}'.format(username,
                                                            password,
                                                            server,
                                                            database,
                                                            driver)

HCC = Table('HISTORIC_COMUNICATIONS'
            ,MetaData()
            ,Column('CD_ID', BigInteger, primary_key=True)
            ,Column('Workload', String(255), nullable=False)
            ,Column('Transsmision', String(255), nullable=False)
            ,Column('TTL_Counts', Numeric(18, asdecimal=False), nullable=False)
            ,Column('Message', Numeric(18, asdecimal=False), nullable=False)
            ,Column('Source', Numeric(18, asdecimal=False), nullable=False)
            )

columns = [HCC.c.CD_ID, HCC.c.Workload, HCC.c.Transsmision, HCC.c.TTL_Counts, HCC.c.Message, HCC.c.Source]

query = select(HCC,HCC.c.TTL_Counts>600)

pd_fat_d = dd.read_sql_query(sql=query
                                ,con=engine_str
                                ,columns = columns
                                ,index_col='CD_ID'
                                ,npartitions=43)

print(pd_fat_d.head(1000000),npartitions=2)

but I have this error:

pyodbc.ProgrammingError: ('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Incorrect syntax near '='. (102) (SQLExecDirectW); [42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Statement(s) could not be prepared. (8180)")

I follow documentation and work with the indications (https://sqlalche.me/e/14/f405) but doesn't work

Upvotes: 0

Views: 1372

Answers (1)

Michael De Paz
Michael De Paz

Reputation: 41

the solution implemented is here, I hope this helps you

import dask.dataframe as dd
from sqlalchemy import create_engine, MetaData, Table, select, Column, BigInteger, Numeric, String

server = 'HYPER-CKS12423\\INSTANCERISK'
database = 'Sigma'
username = 'CMS002'
password = 'V=!234-APC'
driver = 'ODBC+DRIVER+17+for+SQL+Server'

engine_str = 'mssql+pyodbc://{}:{}@{}/{}?driver={}'.format(username,
                                                            password,
                                                            server,
                                                            database,
                                                            driver)

HCC = Table('HISTORIC_COMUNICATIONS'
            ,MetaData()
            ,Column('CD_ID', BigInteger, primary_key=True)
            ,Column('Workload', String(255), nullable=False)
            ,Column('Transsmision', String(255), nullable=False)
            ,Column('TTL_Counts', Numeric(18, asdecimal=False), nullable=False)
            ,Column('Message', Numeric(18, asdecimal=False), nullable=False)
            ,Column('Source', Numeric(18, asdecimal=False), nullable=False)
            )

columns = [HCC.c.CD_ID, HCC.c.Workload, HCC.c.Transsmision, HCC.c.TTL_Counts, HCC.c.Message, HCC.c.Source]

############### Change this line ################
query = select(HCC).where(HCC.c.TTL_Counts > 600)

pd_fat_d = dd.read_sql_query(sql=query
                                ,con=engine_str
                                ,columns = columns
                                ,index_col='CD_ID'
                                ,npartitions=43)

print(pd_fat_d.head(1000000),npartitions=2)

but now I work with bindparam and the line update changed to this:

query = select(HCC).where(HCC.c.TTL_Counts > bindparam('CQ2s')).params(CQ2s=600)

(this to control variables and prevent possible injection)

Note: please consider import action to use bindparam: from sqlalchemy import bindparam

best regards.

Upvotes: 2

Related Questions