Reputation: 953
I'm trying create a function that takes an SQL SELECT query as a parameter and use dask to read its results into a dask DataFrame using the dask.read_sql_query
function. I am new to dask and to SQLAlchemy.
I first tried this:
import dask.dataFrame as dd
query = "SELECT name, age, date_of_birth from customer"
df = dd.read_sql_query(sql=query, con=con_string, index_col="name", npartitions=10)
As you probably already know, this won't work because the sql
parameter has to be an SQLAlchemy selectable and more importantly, TextClause
isn't supported.
I then wrapped the query behind a select
like this:
import dask.dataFrame as dd
from sqlalchemy import sql
query = "SELECT name, age, date_of_birth from customer"
sa_query = sql.select(sql.text(query))
df = dd.read_sql_query(sql=sa_query, con=con_string, index_col="name")
This fails too with a very weird error that I have been trying to solve. The problem is that dask needs to infer the types of the columns and it does so by reading the first head_row
rows in the table - 5 rows by default - and infer the types there. This line in the dask codebase adds a LIMIT ?
to the query, which ends up being
SELECT name, age, date_of_birth from customer LIMIT param_1
The param_1
doesn't get substituted at all with the right value - 5 in this case. It then fails on the next line, https://github.com/dask/dask/blob/main/dask/dataframe/io/sql.py#L119, tjat evaluates the SQL expression.
sqlalchemy.exc.ProgrammingError: (mariadb.ProgrammingError) You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'SELECT name, age, date_of_birth from customer
LIMIT ?' at line 1
[SQL: SELECT SELECT name, age, date_of_birth from customer
LIMIT ?]
[parameters: (5,)]
(Background on this error at: https://sqlalche.me/e/14/f405)
I can't understand why param_1
wasn't substituted with the value of head_rows. One can see from the error message that it detects there's a parameter that needs to be used for the substitution but for some reason it doesn't actually substitute it.
Perhaps, I didn't correctly create the SQLAlchemy selectable?
I can simply use pandas.read_sql
and create a dask dataframe from the resulting pandas dataframe but that defeats the purpose of using dask in the first place.
I have the following constraints:
meta
to the custom function is not an option because it would require the caller do create it. However, passing a meta
attribute to read_sql_query
and setting head_rows=0
is completely ok as long as there's an efficient way to retrieve/createHow can I go about correctly reading an SQL query into dask dataframe?
Upvotes: 4
Views: 10551
Reputation: 333
I encountered the same limitation while attempting the solution, but after delving into the documentation, I realized that the issue might stem from an incorrect construction of the SQL Alchemy select clause. Here's the revised code that resolved the problem for me:
import dask.dataframe as dd
from sqlalchemy import sql
conn_str = "you sqlalchemy connection string here"
id = sql.column("numeric_pk_id_column")
col1 = sql.column("some_other_column")
table = sql.table("table", schema = "schema")
select_statement = sql.select([id,
col1]
).select_from(table)
df = dd.read_sql_query(select_statement, conn_str, index_col=id)
ref: https://www.oreilly.com/library/view/scaling-python-with/9781098119867/ch04.html Example 4-4
Upvotes: 0
Reputation: 16561
The crux of the problem is this line:
sa_query = sql.select(sql.text(query))
What is happening is that we are constructing a nested SELECT query, which can cause a problem downstream.
Let's first create a test database:
# create a test database (using https://stackoverflow.com/a/64898284/10693596)
from sqlite3 import connect
from dask.datasets import timeseries
con = "delete_me_test.sqlite"
db = connect(con)
# create a pandas df and store (timestamp is dropped to make sure
# that the index is numeric)
df = (
timeseries(start="2000-01-01", end="2000-01-02", freq="1h", seed=0)
.compute()
.reset_index()
)
df.to_sql("ticks", db, if_exists="replace")
Next, let's try to get things working with pandas
without sqlalchemy
:
from pandas import read_sql_query
con = "sqlite:///test.sql"
query = "SELECT * FROM ticks LIMIT 3"
meta = read_sql_query(sql=query, con=con).set_index("index")
print(meta)
# id name x y
# index
# 0 998 Ingrid 0.760997 -0.381459
# 1 1056 Ingrid 0.506099 0.816477
# 2 1056 Laura 0.316556 0.046963
Now, let's add sqlalchemy
functions:
from pandas import read_sql_query
from sqlalchemy.sql import text, select
con = "sqlite:///test.sql"
query = "SELECT * FROM ticks LIMIT 3"
sa_query = select(text(query))
meta = read_sql_query(sql=sa_query, con=con).set_index("index")
# OperationalError: (sqlite3.OperationalError) near "SELECT": syntax error
# [SQL: SELECT SELECT * FROM ticks LIMIT 3]
# (Background on this error at: https://sqlalche.me/e/14/e3q8)
Note the SELECT SELECT
due to running sqlalchemy.select
on an existing query. This can cause problems. How to fix this? In general, I don't think there's a safe and robust way of transforming arbitrary SQL queries into their sqlalchemy
equivalent, but if this is for an application where you know that users will only run SELECT
statements, you can manually sanitize the query before passing it to sqlalchemy.select
:
from dask.dataframe import read_sql_query
from sqlalchemy.sql import select, text
con = "sqlite:///test.sql"
query = "SELECT * FROM ticks"
def _remove_leading_select_from_query(query):
if query.startswith("SELECT "):
return query.replace("SELECT ", "", 1)
else:
return query
sa_query = select(text(_remove_leading_select_from_query(query)))
ddf = read_sql_query(sql=sa_query, con=con, index_col="index")
print(ddf)
print(ddf.head(3))
# Dask DataFrame Structure:
# id name x y
# npartitions=1
# 0 int64 object float64 float64
# 23 ... ... ... ...
# Dask Name: from-delayed, 2 tasks
# id name x y
# index
# 0 998 Ingrid 0.760997 -0.381459
# 1 1056 Ingrid 0.506099 0.816477
# 2 1056 Laura 0.316556 0.046963
Upvotes: 3