slizb
slizb

Reputation: 6170

How to create a large pandas dataframe from an sql query without running out of memory?

I have trouble querying a table of > 5 million records from MS SQL Server database. I want to select all of the records, but my code seems to fail when selecting to much data into memory.

This works:

import pandas.io.sql as psql
sql = "SELECT TOP 1000000 * FROM MyTable" 
data = psql.read_frame(sql, cnxn)

...but this does not work:

sql = "SELECT TOP 2000000 * FROM MyTable" 
data = psql.read_frame(sql, cnxn)

It returns this error:

File "inference.pyx", line 931, in pandas.lib.to_object_array_tuples
(pandas\lib.c:42733) Memory Error

I have read here that a similar problem exists when creating a dataframe from a csv file, and that the work-around is to use the 'iterator' and 'chunksize' parameters like this:

read_csv('exp4326.csv', iterator=True, chunksize=1000)

Is there a similar solution for querying from an SQL database? If not, what is the preferred work-around? Should I use some other methods to read the records in chunks? I read a bit of discussion here about working with large datasets in pandas, but it seems like a lot of work to execute a SELECT * query. Surely there is a simpler approach.

Upvotes: 86

Views: 145436

Answers (11)

shailesh Chanderiya
shailesh Chanderiya

Reputation: 1

You can use chunksize option, but need to set it up to 6-7 digit if you have RAM issue.

for chunk in  pd.read_sql(sql, engine, params = (fromdt, todt,filecode), chunksize=100000):
 df1.append(chunk)
 dfs = pd.concat(df1, ignore_index=True)

do this

Upvotes: 0

George
George

Reputation: 21

Full one-line code using sqlalchemy and with operator:

db_engine = sqlalchemy.create_engine(db_url, pool_size=10, max_overflow=20)
with Session(db_engine) as session:
    sql_qry = text("Your query")
    data = pd.concat(pd.read_sql(sql_qry,session.connection().execution_options(stream_results=True), chunksize=500000), ignore_index=True)

You can try to change chunksize to find the optimal size for your case.

Upvotes: 0

BossRoyce
BossRoyce

Reputation: 211

Here is a one-liner. I was able to load in 49m records to the dataframe without running out of memory.

dfs = pd.concat(pd.read_sql(sql, engine, chunksize=500000), ignore_index=True)

Upvotes: 2

you can update version airflow. for example, I had that error in the version 2.2.3 using docker-compose.

  • AIRFLOW__CORE__EXECUTOR=CeleryExecutor

mysq 6.7

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

redis:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "250M" 

airflow-webserver:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

airflow-scheduler:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

airflow-worker:

#cpus: "0.5"
#mem_reservation: "10M"
#mem_limit: "750M"

error: Task exited with return code Negsignal.SIGKILL

but update to the version FROM apache/airflow:2.3.4.

and perform the pulls without problems, using the same resources configured in the docker-compose

enter image description here

my dag extractor:

function

def getDataForSchema(table,conecction,tmp_path, **kwargs):

conn=connect_sql_server(conecction)

query_count= f"select count(1) from {table['schema']}.{table['table_name']}"
logging.info(f"query: {query_count}")
real_count_rows = pd.read_sql_query(query_count, conn) 

##sacar  esquema de la tabla
metadataquery=f"SELECT COLUMN_NAME ,DATA_TYPE  FROM information_schema.columns \
    where table_name = '{table['table_name']}' and table_schema= '{table['schema']}'"
#logging.info(f"query metadata: {metadataquery}")                
metadata = pd.read_sql_query(metadataquery, conn) 
schema=generate_schema(metadata)

#logging.info(f"schema : {schema}")
#logging.info(f"schema: {schema}")

#consulta la tabla a extraer
query=f" SELECT  {table['custom_column_names']} FROM {table['schema']}.{table['table_name']} "
logging.info(f"quere data :{query}")
chunksize=table["partition_field"]
data = pd.read_sql_query(query, conn, chunksize=chunksize)

count_rows=0
pqwriter=None
iteraccion=0
for df_row in data:       
    print(f"bloque  {iteraccion} de  total {count_rows} de un total {real_count_rows.iat[0, 0]}")
    #logging.info(df_row.to_markdown())
    if iteraccion == 0:
        parquetName=f"{tmp_path}/{table['table_name']}_{iteraccion}.parquet"
        pqwriter = pq.ParquetWriter(parquetName,schema)
    tableData = pa.Table.from_pandas(df_row, schema=schema,safe=False, preserve_index=True)
    #logging.info(f" tabledata {tableData.column(17)}")
    pqwriter.write_table(tableData)
    #logging.info(f"parquet name:::{parquetName}")
    ##pasar a parquet df directo
    #df_row.to_parquet(parquetName)
    iteraccion=iteraccion+1
    count_rows += len(df_row)
    del df_row
    del tableData
if pqwriter:
    print("Cerrando archivo parquet")
    pqwriter.close()
del data
del chunksize
del iteraccion

Upvotes: 0

Owais Ajaz
Owais Ajaz

Reputation: 264

chunksize still loads all the data in memory, stream_results=True is the answer. it is server side cursor that loads the rows in given chunks and save memory.. efficiently using in many pipelines, it may also help when you load history data

stream_conn = engine.connect().execution_options(stream_results=True)

use pd.read_sql with thechunksize

pd.read_sql("SELECT * FROM SOURCE", stream_conn , chunksize=5000)

Upvotes: 2

Ehsan Fathi
Ehsan Fathi

Reputation: 708

You can use Server Side Cursors (a.k.a. stream results)

import pandas as pd
from sqlalchemy import create_engine

def process_sql_using_pandas():
    engine = create_engine(
        "postgresql://postgres:pass@localhost/example"
    )
    conn = engine.connect().execution_options(
        stream_results=True)

    for chunk_dataframe in pd.read_sql(
            "SELECT * FROM users", conn, chunksize=1000):
        print(f"Got dataframe w/{len(chunk_dataframe)} rows")
        # ... do something with dataframe ...

if __name__ == '__main__':
    process_sql_using_pandas()

As mentioned in the comments by others, using the chunksize argument in pd.read_sql("SELECT * FROM users", engine, chunksize=1000) does not solve the problem as it still loads the whole data in the memory and then gives it to you chunk by chunk.

More explanation here

Upvotes: 2

Julien Kervizic
Julien Kervizic

Reputation: 131

The best way I found to handle this is to leverage the SQLAlchemy steam_results connection options

conn = engine.connect().execution_options(stream_results=True)

And passing the conn object to pandas in

pd.read_sql("SELECT *...", conn, chunksize=10000)

This will ensure that the cursor is handled server-side rather than client-side

Upvotes: 6

DharsanB
DharsanB

Reputation: 1

If you want to limit the number of rows in output, just use:

data = psql.read_frame(sql, cnxn,chunksize=1000000).__next__()

Upvotes: -1

flying_fluid_four
flying_fluid_four

Reputation: 764

Code solution and remarks.

# Create empty list
dfl = []  

# Create empty dataframe
dfs = pd.DataFrame()  

# Start Chunking
for chunk in pd.read_sql(query, con=conct, ,chunksize=10000000):

    # Start Appending Data Chunks from SQL Result set into List
    dfl.append(chunk)

# Start appending data from list to dataframe
dfs = pd.concat(dfl, ignore_index=True)

However, my memory analysis tells me that even though the memory is released after each chunk is extracted, the list is growing bigger and bigger and occupying that memory resulting in a net net no gain on free RAM.

Would love to hear what the author / others have to say.

Upvotes: 22

ThePhysicist
ThePhysicist

Reputation: 1862

Update: Make sure to check out the answer below, as Pandas now has built-in support for chunked loading.

You could simply try to read the input table chunk-wise and assemble your full dataframe from the individual pieces afterwards, like this:

import pandas as pd
import pandas.io.sql as psql
chunk_size = 10000
offset = 0
dfs = []
while True:
  sql = "SELECT * FROM MyTable limit %d offset %d order by ID" % (chunk_size,offset) 
  dfs.append(psql.read_frame(sql, cnxn))
  offset += chunk_size
  if len(dfs[-1]) < chunk_size:
    break
full_df = pd.concat(dfs)

It might also be possible that the whole dataframe is simply too large to fit in memory, in that case you will have no other option than to restrict the number of rows or columns you're selecting.

Upvotes: 65

Kamil Sindi
Kamil Sindi

Reputation: 22832

As mentioned in a comment, starting from pandas 0.15, you have a chunksize option in read_sql to read and process the query chunk by chunk:

sql = "SELECT * FROM My_Table"
for chunk in pd.read_sql_query(sql , engine, chunksize=5):
    print(chunk)

Reference: http://pandas.pydata.org/pandas-docs/version/0.15.2/io.html#querying

Upvotes: 83

Related Questions