Eli G
Eli G

Reputation: 41

Flask storing large dataframes across multiple requests

I have a Flask web app that uses a large DataFrame ( hundreds of Megs). The DataFrame is used in the app for several different machine learning models. I want to create the DataFrame only once in the application and use it across multiple requests so that the user may build different models based on the same data. The Flask session is not built for large data, so that is not an option. I do not want to go back and recreate the DataFrame in case the source of the data is a csv file(yuck). be

I have a solution that works, but I cannot find any discussion of this solution in stack overflow. That makes me suspicious that my solution may not be a good design idea. I have always used the assumption that a well beaten path in software development is a path well chosen.

My solution is simply to create a data holder class with one class variable:

class DataHolder:
     dataFrameHolder = None

Now the dataFrameHolder is known across all class instances (like a static variable in Java) since it is stored in memory on the server.

I can now create the DataFrame once, put it into the DataHolder class:

import pandas as pd
from dataholder import DataHolder

result_set = pd.read_sql_query(some_SQL, connection)
df = pd.DataFrame(result_set, columns=['col1', 'col2',....]
DataHolder.dataFrameHolder = df

Then access that DataFrame from any code that imports the DataHolder class. I can then use the stored DataFrame anywhere in the application, including across different requests:

.
.
modelDataFrame = DataHolder.dataFrameHolder
do_some_model(modelDataFrame)
.
.

Is this a bad idea, a good idea, or is there something else that I am not aware of that already solves the problem?

Upvotes: 4

Views: 2125

Answers (2)

Shaji Ahmed
Shaji Ahmed

Reputation: 91

I had a similar problem, as I was importing CSVs (100s of MBs) and creating DataFrames on the fly for each request, which as you said was yucky! I also tried the REDIS way, to cache it, and that improved performance for a while, until I realized that making changes to the underlying data meant updating the cache as well.

Then I found a world beyond CSV, and more performant file formats like Pickle, Feather, Parquet, and others. You can read more about them here. You can import/export CSVs all you want, but use intermediate formats for processing.

I did run into some issues though. I have read that Pickle has security issues, even though I still use it. Feather wouldn't let me write some object types in my data, it needed them categorized. Your mileage may vary, but if you have good clean data, use Feather.

And more recently I've found that I manage large data using Datatable instead of Pandas, and storing them in Jay for even better read/write performance.

This does however mean re-writing bits of code that use Pandas into DataTable, but I believe the APIs are very similar. I have not yet done it myself, because of very large codebase, but you can give it a try.

Upvotes: 1

Rob Raymond
Rob Raymond

Reputation: 31226

Redis can be used. My use case is smaller data frames so have not tested with larger data frames. This allows me to provide 3 second ticking data to multiple browser clients. pyarrow serialisation / deserialisation is performing well. Works locally and across AWS/GCloud and Azure

GET route

@app.route('/cacheget/<path:key>', methods=['GET'])
def cacheget(key):
    c = mycache()
    data = c.redis().get(key)
    resp = Response(BytesIO(data), mimetype="application/octet-stream", direct_passthrough=True)
    resp.headers["key"] = key
    resp.headers["type"] = c.redis().get(f"{key}.type")
    resp.headers["size"] = sys.getsizeof(data)
    resp.headers["redissize"] = sys.getsizeof(c.redis().get(key))
    return resp

sample route to put dataframe into cache

@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
    c = mycache()
    dfsensor = c.get("dfsensor")
    newsensor = json_normalize(request.get_json())
    newsensor[["x","y"]] = newsensor[["epoch", "value"]]
    newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
    newsensor["amin"] = newsensor["value"]
    newsensor["amax"] = newsensor["value"]
    newsensor = newsensor.drop(columns=["x","y"])

    # add new data from serial interface to start of list (append old data to new data).
    # default time as now to new data
    dfsensor = newsensor.append(dfsensor, sort=False)
    # keep size down - only last 500 observations
    c.set("dfsensor", dfsensor[:500])
    del dfsensor

    return jsonify(result={"status":"ok"})

utility class

import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)

class mycache():
    __redisClient:Redis
    CONFIGKEY = "cacheconfig"

    def __init__(self) -> None:
        try:
            ep = os.environ["REDIS_HOST"]
        except KeyError:
            if os.environ["HOST_ENV"] == "GCLOUD":
                os.environ["REDIS_HOST"] = "redis://10.0.0.3"
            elif os.environ["HOST_ENV"] == "EB":
                os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
            elif os.environ["HOST_ENV"] == "AZURE":
                #os.environ["REDIS_HOST"] = "redis://ignore:[email protected]"
                pass # should be set in azure env variable
            elif os.environ["HOST_ENV"] == "LOCAL":
                os.environ["REDIS_HOST"] = "redis://127.0.0.1"
            else:
                raise "could not initialise redis"
                return # no known redis setup

        #self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
        self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
        self.__redisClient.ping()
        # get config as well...
        self.config = self.get(self.CONFIGKEY)
        if self.config is None:
            self.config = {"pyarrow":True, "pickle":False}
            self.set(self.CONFIGKEY, self.config)
        self.alog = logenv.alog()

    def redis(self) -> Redis:
        return self.__redisClient


    def exists(self, key:str) -> bool:
        if self.__redisClient is None:
            return False

        return self.__redisClient.exists(key) == 1

    def get(self, key:str) -> Union[DataFrame, str]:
        keytype = "{k}.type".format(k=key)
        valuetype = self.__redisClient.get(keytype)
        if valuetype is None:
            if (key.split(".")[-1] == "pickle"):
                return pickle.loads(self.redis().get(key))
            else:
                ret = self.redis().get(key)
                if ret is None:
                    return ret
                else:
                    return ret.decode()
        elif valuetype.decode() == str(pd.DataFrame):
            # fallback to pickle serialized form if pyarrow fails
            # https://issues.apache.org/jira/browse/ARROW-7961
            try:
                return pa.deserialize(self.__redisClient.get(key))
            except pa.lib.ArrowIOError as err:
                self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                return pickle.loads(self.redis().get(f"{key}.pickle"))
            except OSError as err:
                if "Expected IPC" in str(err):
                    self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                    return pickle.loads(self.redis().get(f"{key}.pickle"))
                else:
                    raise err

        elif valuetype.decode() == str(type({})):
            return json.loads(self.__redisClient.get(key).decode())
        else:
            return self.__redisClient.get(key).decode() # type: ignore

    def set(self, key:str, value:Union[DataFrame, str]) -> None:
        if self.__redisClient is None:
            return
        keytype = "{k}.type".format(k=key)

        if str(type(value)) == str(pd.DataFrame):
            self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
            if self.config["pickle"]:
                self.redis().set(f"{key}.pickle", pickle.dumps(value))
                # issue should be transient through an upgrade....
                # once switched off data can go away
                self.redis().expire(f"{key}.pickle", 60*60*24)
        elif str(type(value)) == str(type({})):
            self.__redisClient.set(key, json.dumps(value))
        else:
            self.__redisClient.set(key, value)

        self.__redisClient.set(keytype, str(type(value)))


if __name__ == '__main__':
    os.environ["HOST_ENV"] = "LOCAL"
    r = mycache()
    rr = r.redis()
    for k in rr.keys("cache*"):
        print(k.decode(), rr.ttl(k))
        print(rr.get(k.decode()))

Upvotes: 2

Related Questions