Reputation: 41
I have a Flask web app that uses a large DataFrame ( hundreds of Megs). The DataFrame is used in the app for several different machine learning models. I want to create the DataFrame only once in the application and use it across multiple requests so that the user may build different models based on the same data. The Flask session is not built for large data, so that is not an option. I do not want to go back and recreate the DataFrame in case the source of the data is a csv file(yuck). be
I have a solution that works, but I cannot find any discussion of this solution in stack overflow. That makes me suspicious that my solution may not be a good design idea. I have always used the assumption that a well beaten path in software development is a path well chosen.
My solution is simply to create a data holder class with one class variable:
class DataHolder:
dataFrameHolder = None
Now the dataFrameHolder is known across all class instances (like a static variable in Java) since it is stored in memory on the server.
I can now create the DataFrame once, put it into the DataHolder class:
import pandas as pd
from dataholder import DataHolder
result_set = pd.read_sql_query(some_SQL, connection)
df = pd.DataFrame(result_set, columns=['col1', 'col2',....]
DataHolder.dataFrameHolder = df
Then access that DataFrame from any code that imports the DataHolder class. I can then use the stored DataFrame anywhere in the application, including across different requests:
.
.
modelDataFrame = DataHolder.dataFrameHolder
do_some_model(modelDataFrame)
.
.
Is this a bad idea, a good idea, or is there something else that I am not aware of that already solves the problem?
Upvotes: 4
Views: 2125
Reputation: 91
I had a similar problem, as I was importing CSVs (100s of MBs) and creating DataFrames on the fly for each request, which as you said was yucky! I also tried the REDIS way, to cache it, and that improved performance for a while, until I realized that making changes to the underlying data meant updating the cache as well.
Then I found a world beyond CSV, and more performant file formats like Pickle, Feather, Parquet, and others. You can read more about them here. You can import/export CSVs all you want, but use intermediate formats for processing.
I did run into some issues though. I have read that Pickle has security issues, even though I still use it. Feather wouldn't let me write some object
types in my data, it needed them categorized
. Your mileage may vary, but if you have good clean data, use Feather.
And more recently I've found that I manage large data using Datatable instead of Pandas, and storing them in Jay for even better read/write performance.
This does however mean re-writing bits of code that use Pandas into DataTable, but I believe the APIs are very similar. I have not yet done it myself, because of very large codebase, but you can give it a try.
Upvotes: 1
Reputation: 31226
Redis can be used. My use case is smaller data frames so have not tested with larger data frames. This allows me to provide 3 second ticking data to multiple browser clients. pyarrow serialisation / deserialisation is performing well. Works locally and across AWS/GCloud and Azure
@app.route('/cacheget/<path:key>', methods=['GET'])
def cacheget(key):
c = mycache()
data = c.redis().get(key)
resp = Response(BytesIO(data), mimetype="application/octet-stream", direct_passthrough=True)
resp.headers["key"] = key
resp.headers["type"] = c.redis().get(f"{key}.type")
resp.headers["size"] = sys.getsizeof(data)
resp.headers["redissize"] = sys.getsizeof(c.redis().get(key))
return resp
@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
c = mycache()
dfsensor = c.get("dfsensor")
newsensor = json_normalize(request.get_json())
newsensor[["x","y"]] = newsensor[["epoch", "value"]]
newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
newsensor["amin"] = newsensor["value"]
newsensor["amax"] = newsensor["value"]
newsensor = newsensor.drop(columns=["x","y"])
# add new data from serial interface to start of list (append old data to new data).
# default time as now to new data
dfsensor = newsensor.append(dfsensor, sort=False)
# keep size down - only last 500 observations
c.set("dfsensor", dfsensor[:500])
del dfsensor
return jsonify(result={"status":"ok"})
import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)
class mycache():
__redisClient:Redis
CONFIGKEY = "cacheconfig"
def __init__(self) -> None:
try:
ep = os.environ["REDIS_HOST"]
except KeyError:
if os.environ["HOST_ENV"] == "GCLOUD":
os.environ["REDIS_HOST"] = "redis://10.0.0.3"
elif os.environ["HOST_ENV"] == "EB":
os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
elif os.environ["HOST_ENV"] == "AZURE":
#os.environ["REDIS_HOST"] = "redis://ignore:[email protected]"
pass # should be set in azure env variable
elif os.environ["HOST_ENV"] == "LOCAL":
os.environ["REDIS_HOST"] = "redis://127.0.0.1"
else:
raise "could not initialise redis"
return # no known redis setup
#self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
self.__redisClient.ping()
# get config as well...
self.config = self.get(self.CONFIGKEY)
if self.config is None:
self.config = {"pyarrow":True, "pickle":False}
self.set(self.CONFIGKEY, self.config)
self.alog = logenv.alog()
def redis(self) -> Redis:
return self.__redisClient
def exists(self, key:str) -> bool:
if self.__redisClient is None:
return False
return self.__redisClient.exists(key) == 1
def get(self, key:str) -> Union[DataFrame, str]:
keytype = "{k}.type".format(k=key)
valuetype = self.__redisClient.get(keytype)
if valuetype is None:
if (key.split(".")[-1] == "pickle"):
return pickle.loads(self.redis().get(key))
else:
ret = self.redis().get(key)
if ret is None:
return ret
else:
return ret.decode()
elif valuetype.decode() == str(pd.DataFrame):
# fallback to pickle serialized form if pyarrow fails
# https://issues.apache.org/jira/browse/ARROW-7961
try:
return pa.deserialize(self.__redisClient.get(key))
except pa.lib.ArrowIOError as err:
self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
return pickle.loads(self.redis().get(f"{key}.pickle"))
except OSError as err:
if "Expected IPC" in str(err):
self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
return pickle.loads(self.redis().get(f"{key}.pickle"))
else:
raise err
elif valuetype.decode() == str(type({})):
return json.loads(self.__redisClient.get(key).decode())
else:
return self.__redisClient.get(key).decode() # type: ignore
def set(self, key:str, value:Union[DataFrame, str]) -> None:
if self.__redisClient is None:
return
keytype = "{k}.type".format(k=key)
if str(type(value)) == str(pd.DataFrame):
self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
if self.config["pickle"]:
self.redis().set(f"{key}.pickle", pickle.dumps(value))
# issue should be transient through an upgrade....
# once switched off data can go away
self.redis().expire(f"{key}.pickle", 60*60*24)
elif str(type(value)) == str(type({})):
self.__redisClient.set(key, json.dumps(value))
else:
self.__redisClient.set(key, value)
self.__redisClient.set(keytype, str(type(value)))
if __name__ == '__main__':
os.environ["HOST_ENV"] = "LOCAL"
r = mycache()
rr = r.redis()
for k in rr.keys("cache*"):
print(k.decode(), rr.ttl(k))
print(rr.get(k.decode()))
Upvotes: 2