Reputation: 41
Recent I decided work with Python Dask and SqlAlchemy, then realize test with read_sql_table and works fine, but when use a read_sql_query no worked, I tried this:
import dask.dataframe as dd
from sqlalchemy import create_engine, MetaData, Table, select, Column, BigInteger, Numeric, String
server = 'HYPER-CKS12423\\INSTANCERISK'
database = 'Sigma'
username = 'CMS002'
password = 'V=!234-APC'
driver = 'ODBC+DRIVER+17+for+SQL+Server'
engine_str = 'mssql+pyodbc://{}:{}@{}/{}?driver={}'.format(username,
password,
server,
database,
driver)
HCC = Table('HISTORIC_COMUNICATIONS'
,MetaData()
,Column('CD_ID', BigInteger, primary_key=True)
,Column('Workload', String(255), nullable=False)
,Column('Transsmision', String(255), nullable=False)
,Column('TTL_Counts', Numeric(18, asdecimal=False), nullable=False)
,Column('Message', Numeric(18, asdecimal=False), nullable=False)
,Column('Source', Numeric(18, asdecimal=False), nullable=False)
)
columns = [HCC.c.CD_ID, HCC.c.Workload, HCC.c.Transsmision, HCC.c.TTL_Counts, HCC.c.Message, HCC.c.Source]
query = select(HCC,HCC.c.TTL_Counts>600)
pd_fat_d = dd.read_sql_query(sql=query
,con=engine_str
,columns = columns
,index_col='CD_ID'
,npartitions=43)
print(pd_fat_d.head(1000000),npartitions=2)
but I have this error:
pyodbc.ProgrammingError: ('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Incorrect syntax near '='. (102) (SQLExecDirectW); [42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Statement(s) could not be prepared. (8180)")
I follow documentation and work with the indications (https://sqlalche.me/e/14/f405) but doesn't work
Upvotes: 0
Views: 1372
Reputation: 41
the solution implemented is here, I hope this helps you
import dask.dataframe as dd
from sqlalchemy import create_engine, MetaData, Table, select, Column, BigInteger, Numeric, String
server = 'HYPER-CKS12423\\INSTANCERISK'
database = 'Sigma'
username = 'CMS002'
password = 'V=!234-APC'
driver = 'ODBC+DRIVER+17+for+SQL+Server'
engine_str = 'mssql+pyodbc://{}:{}@{}/{}?driver={}'.format(username,
password,
server,
database,
driver)
HCC = Table('HISTORIC_COMUNICATIONS'
,MetaData()
,Column('CD_ID', BigInteger, primary_key=True)
,Column('Workload', String(255), nullable=False)
,Column('Transsmision', String(255), nullable=False)
,Column('TTL_Counts', Numeric(18, asdecimal=False), nullable=False)
,Column('Message', Numeric(18, asdecimal=False), nullable=False)
,Column('Source', Numeric(18, asdecimal=False), nullable=False)
)
columns = [HCC.c.CD_ID, HCC.c.Workload, HCC.c.Transsmision, HCC.c.TTL_Counts, HCC.c.Message, HCC.c.Source]
############### Change this line ################
query = select(HCC).where(HCC.c.TTL_Counts > 600)
pd_fat_d = dd.read_sql_query(sql=query
,con=engine_str
,columns = columns
,index_col='CD_ID'
,npartitions=43)
print(pd_fat_d.head(1000000),npartitions=2)
but now I work with bindparam
and the line update changed to this:
query = select(HCC).where(HCC.c.TTL_Counts > bindparam('CQ2s')).params(CQ2s=600)
(this to control variables and prevent possible injection)
Note: please consider import action to use bindparam: from sqlalchemy import bindparam
best regards.
Upvotes: 2