P. S.R.
P. S.R.

Reputation: 149

How to load SUBSET of large Oracle table into Dask dataframe?

Here's what I tried:

dask_rf = dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)

This gives me a 'large object' warning and recommends using client.scatter. Problem is that it appears that client.scatter requires data to be loaded into a Pandas dataframe first, which is why I'm using Dask in the first place because of RAM limitations.

The Oracle table is too large to read using Dask's read_sql_table because read_sql_table does not filter the table in any way.

Ideas? Dask not applicable to my use case?

Edit - Per answer below and after researching how to do so, here is my attempt to convert to use sqlalchemy expression:

from sqlalchemy import create_engine, Table, Column, String, MetaData, select

sql_engine = create_engine(f'oracle+cx_oracle://username:password@environment')

metadata = MetaData(bind=sql_engine)

table_reference = Table('table', metadata, autoload=True, schema='schema')

s = select([table_reference ]).where(table_reference .c.field_to_filter == filtered_value)

import dask.dataframe as dd

dask_df = dd.read_sql_table(s, 'sqlalchemy_connection_string', 'index_col', schema = 'schema')

dask_df.count()

Dask Series Structure: npartitions=1 action_timestamp int64 vendor_name ... dtype: int64 Dask Name: dataframe-count-agg, 1996 tasks

dask_df.count().compute()

DatabaseError: (cx_Oracle.DatabaseError) ORA-02391: exceeded simultaneous SESSIONS_PER_USER limit (Background on this error at: http://sqlalche.me/e/4xp6)

Why trying to connect to Oracle?

Edit #2 - Just in case helpful, I have performed additional tests. I wanted to prove that sqlalchemy worked on its own so I proved that via:

result = sql_engine.execute(s)

type(result)

sqlalchemy.engine.result.ResultProxy

result.fetchone()

Results were displayed

This seems to rule out SQLAlchemy/Oracle issues, so any ideas what to try next?

Upvotes: 0

Views: 1651

Answers (2)

Victor Faro
Victor Faro

Reputation: 179

I'm looking for the same thing right now.

To not being stuck... You may not have enough RAM but you possible have a lot of free storage. So... a suggestion for now

# imports
import pandas as pd 
import cx_Oracle as cx
import dask.dataframe as dd

# Connection stuff
...
conn = ...

# Query
qry = "SELECT * FROM HUGE_TABLE"

# Pandas Chunks
for ix , chunk in enumerate(pd.io.sql.read_sql(qry , conn , ... , chunksize=1000000)):
    pd.DataFrame(chunk).to_csv(f"chunk_{ix}.csv" , sep=";") # or to_parquet 

# Dask dataframe reading from files (chunks)
dataset = dd.read_csv("chunk_*.csv" , sep=";" , blocksie=32e6) # or read_parquet

Since this is IO intensive and you are performing sequential operations, it may take a while.

My suggestion to "export" quicker is to partition your table and perform the chunk export in parallel by each partition.

Upvotes: 2

MRocklin
MRocklin

Reputation: 57319

Problem is that it appears that client.scatter requires data to be loaded into a Pandas dataframe first

That is because you are calling Pandas code here

dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)
#              pd.read_sql('select ...)', conn_cx_Oracle) # <<-----

According to the read_sql_table docstring you should be able to pass in a SQLAlchemy expression object.

The reason that read_sql_table doesn't accept arbitrary SQL query strings, is because it needs to be able to partition your query, so each task loads only a chunk of the whole. This is a tricky thing to do for the many dialects out there, so we rely on sqlalchemy to do the formatting.

Upvotes: 1

Related Questions