Reputation: 149
Here's what I tried:
dask_rf = dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)
This gives me a 'large object' warning and recommends using client.scatter. Problem is that it appears that client.scatter requires data to be loaded into a Pandas dataframe first, which is why I'm using Dask in the first place because of RAM limitations.
The Oracle table is too large to read using Dask's read_sql_table because read_sql_table does not filter the table in any way.
Ideas? Dask not applicable to my use case?
Edit - Per answer below and after researching how to do so, here is my attempt to convert to use sqlalchemy expression:
from sqlalchemy import create_engine, Table, Column, String, MetaData, select
sql_engine = create_engine(f'oracle+cx_oracle://username:password@environment')
metadata = MetaData(bind=sql_engine)
table_reference = Table('table', metadata, autoload=True, schema='schema')
s = select([table_reference ]).where(table_reference .c.field_to_filter == filtered_value)
import dask.dataframe as dd
dask_df = dd.read_sql_table(s, 'sqlalchemy_connection_string', 'index_col', schema = 'schema')
dask_df.count()
Dask Series Structure: npartitions=1 action_timestamp int64 vendor_name ... dtype: int64 Dask Name: dataframe-count-agg, 1996 tasks
dask_df.count().compute()
DatabaseError: (cx_Oracle.DatabaseError) ORA-02391: exceeded simultaneous SESSIONS_PER_USER limit (Background on this error at: http://sqlalche.me/e/4xp6)
Why trying to connect to Oracle?
Edit #2 - Just in case helpful, I have performed additional tests. I wanted to prove that sqlalchemy worked on its own so I proved that via:
result = sql_engine.execute(s)
type(result)
sqlalchemy.engine.result.ResultProxy
result.fetchone()
Results were displayed
This seems to rule out SQLAlchemy/Oracle issues, so any ideas what to try next?
Upvotes: 0
Views: 1651
Reputation: 179
I'm looking for the same thing right now.
To not being stuck... You may not have enough RAM but you possible have a lot of free storage. So... a suggestion for now
# imports
import pandas as pd
import cx_Oracle as cx
import dask.dataframe as dd
# Connection stuff
...
conn = ...
# Query
qry = "SELECT * FROM HUGE_TABLE"
# Pandas Chunks
for ix , chunk in enumerate(pd.io.sql.read_sql(qry , conn , ... , chunksize=1000000)):
pd.DataFrame(chunk).to_csv(f"chunk_{ix}.csv" , sep=";") # or to_parquet
# Dask dataframe reading from files (chunks)
dataset = dd.read_csv("chunk_*.csv" , sep=";" , blocksie=32e6) # or read_parquet
Since this is IO intensive and you are performing sequential operations, it may take a while.
My suggestion to "export" quicker is to partition your table and perform the chunk export in parallel by each partition.
Upvotes: 2
Reputation: 57319
Problem is that it appears that client.scatter requires data to be loaded into a Pandas dataframe first
That is because you are calling Pandas code here
dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)
# pd.read_sql('select ...)', conn_cx_Oracle) # <<-----
According to the read_sql_table
docstring
you should be able to pass in a SQLAlchemy expression object.
The reason that read_sql_table
doesn't accept arbitrary SQL query strings, is because it needs to be able to partition your query, so each task loads only a chunk of the whole. This is a tricky thing to do for the many dialects out there, so we rely on sqlalchemy to do the formatting.
Upvotes: 1