mkab
mkab

Reputation: 953

Reading an SQL query into a Dask DataFrame

I'm trying create a function that takes an SQL SELECT query as a parameter and use dask to read its results into a dask DataFrame using the dask.read_sql_query function. I am new to dask and to SQLAlchemy. I first tried this:

import dask.dataFrame as dd

query = "SELECT name, age, date_of_birth from customer"
df = dd.read_sql_query(sql=query, con=con_string, index_col="name", npartitions=10)

As you probably already know, this won't work because the sql parameter has to be an SQLAlchemy selectable and more importantly, TextClause isn't supported.

I then wrapped the query behind a select like this:

import dask.dataFrame as dd
from sqlalchemy import sql

query = "SELECT name, age, date_of_birth from customer"
sa_query = sql.select(sql.text(query))
df = dd.read_sql_query(sql=sa_query, con=con_string, index_col="name")

This fails too with a very weird error that I have been trying to solve. The problem is that dask needs to infer the types of the columns and it does so by reading the first head_row rows in the table - 5 rows by default - and infer the types there. This line in the dask codebase adds a LIMIT ? to the query, which ends up being

SELECT name, age, date_of_birth from customer LIMIT param_1

The param_1 doesn't get substituted at all with the right value - 5 in this case. It then fails on the next line, https://github.com/dask/dask/blob/main/dask/dataframe/io/sql.py#L119, tjat evaluates the SQL expression.

sqlalchemy.exc.ProgrammingError: (mariadb.ProgrammingError) You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'SELECT name, age, date_of_birth from customer 
 LIMIT ?' at line 1
[SQL: SELECT SELECT name, age, date_of_birth from customer 
 LIMIT ?]
[parameters: (5,)]
(Background on this error at: https://sqlalche.me/e/14/f405)

I can't understand why param_1 wasn't substituted with the value of head_rows. One can see from the error message that it detects there's a parameter that needs to be used for the substitution but for some reason it doesn't actually substitute it.

Perhaps, I didn't correctly create the SQLAlchemy selectable?

I can simply use pandas.read_sql and create a dask dataframe from the resulting pandas dataframe but that defeats the purpose of using dask in the first place.

I have the following constraints:

How can I go about correctly reading an SQL query into dask dataframe?

Upvotes: 4

Views: 10551

Answers (2)

Faizan
Faizan

Reputation: 333

I encountered the same limitation while attempting the solution, but after delving into the documentation, I realized that the issue might stem from an incorrect construction of the SQL Alchemy select clause. Here's the revised code that resolved the problem for me:

import dask.dataframe as dd
from sqlalchemy import sql

        
        conn_str = "you sqlalchemy connection string here"
        id = sql.column("numeric_pk_id_column")
        col1 = sql.column("some_other_column")
        table = sql.table("table", schema = "schema")


        select_statement = sql.select([id,
                                       col1]
                                      ).select_from(table)

        df = dd.read_sql_query(select_statement, conn_str, index_col=id)

ref: https://www.oreilly.com/library/view/scaling-python-with/9781098119867/ch04.html Example 4-4

Upvotes: 0

SultanOrazbayev
SultanOrazbayev

Reputation: 16561

The crux of the problem is this line:

sa_query = sql.select(sql.text(query))

What is happening is that we are constructing a nested SELECT query, which can cause a problem downstream.

Let's first create a test database:

# create a test database (using https://stackoverflow.com/a/64898284/10693596)
from sqlite3 import connect

from dask.datasets import timeseries

con = "delete_me_test.sqlite"
db = connect(con)

# create a pandas df and store (timestamp is dropped to make sure
# that the index is numeric)
df = (
    timeseries(start="2000-01-01", end="2000-01-02", freq="1h", seed=0)
    .compute()
    .reset_index()
)
df.to_sql("ticks", db, if_exists="replace")

Next, let's try to get things working with pandas without sqlalchemy:

from pandas import read_sql_query

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks LIMIT 3"
meta = read_sql_query(sql=query, con=con).set_index("index")

print(meta)
#          id    name         x         y
# index
# 0       998  Ingrid  0.760997 -0.381459
# 1      1056  Ingrid  0.506099  0.816477
# 2      1056   Laura  0.316556  0.046963

Now, let's add sqlalchemy functions:

from pandas import read_sql_query
from sqlalchemy.sql import text, select

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks LIMIT 3"
sa_query = select(text(query))
meta = read_sql_query(sql=sa_query, con=con).set_index("index")
# OperationalError: (sqlite3.OperationalError) near "SELECT": syntax error
# [SQL: SELECT SELECT * FROM ticks LIMIT 3]
# (Background on this error at: https://sqlalche.me/e/14/e3q8)

Note the SELECT SELECT due to running sqlalchemy.select on an existing query. This can cause problems. How to fix this? In general, I don't think there's a safe and robust way of transforming arbitrary SQL queries into their sqlalchemy equivalent, but if this is for an application where you know that users will only run SELECT statements, you can manually sanitize the query before passing it to sqlalchemy.select:

from dask.dataframe import read_sql_query
from sqlalchemy.sql import select, text

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks"


def _remove_leading_select_from_query(query):
    if query.startswith("SELECT "):
        return query.replace("SELECT ", "", 1)
    else:
        return query


sa_query = select(text(_remove_leading_select_from_query(query)))
ddf = read_sql_query(sql=sa_query, con=con, index_col="index")

print(ddf)
print(ddf.head(3))
# Dask DataFrame Structure:
#                   id    name        x        y
# npartitions=1
# 0              int64  object  float64  float64
# 23               ...     ...      ...      ...
# Dask Name: from-delayed, 2 tasks
#          id    name         x         y
# index
# 0       998  Ingrid  0.760997 -0.381459
# 1      1056  Ingrid  0.506099  0.816477
# 2      1056   Laura  0.316556  0.046963

Upvotes: 3

Related Questions