Reputation: 751
Every 4 seconds, I have to store 32,000 rows of data. Each of these rows consists of one time stamp value and 464 double precision values. The column name for the time stamp is time
and the column name for the precision values increase sequentially as channel1
, channel2
, ..., and channel 464
.
I establish a connection as follows:
CONNECTION = f"postgres://{username}:{password}@{host}:{port}/{dbname}"#?sslmode=require"
self.TimescaleDB_Client = psycopg2.connect(CONNECTION)
I then verify the TimescaleDB
extension with the following:
def verifyTimeScaleInstall(self):
try:
sql_query = "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"
cur = self.TimescaleDB_Client.cursor()
cur.execute(sql_query)
cur.close()
self.TimescaleDB_Client.commit()
except:
self.timescaleLogger.error("An error occurred in verifyTimeScaleInstall")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
I then create a hyptertable for my data with the following:
def createRAWDataTable(self):
try:
cur = self.TimescaleDB_Client.cursor()
self.query_create_raw_data_table = None
for channel in range(self.num_channel) :
channel = channel + 1
if self.query_create_raw_data_table is None:
self.query_create_raw_data_table = f"CREATE TABLE IF NOT EXISTS raw_data (time TIMESTAMPTZ NOT NULL, channel{channel} REAL"
else:
self.query_create_raw_data_table = self.query_create_raw_data_table + f", channel{channel} REAL"
self.query_create_raw_data_table = self.query_create_raw_data_table + ");"
self.query_create_raw_data_hypertable = "SELECT create_hypertable('raw_data', 'time');"
cur.execute(self.query_create_raw_data_table)
cur.execute(self.query_create_raw_data_hypertable)
self.TimescaleDB_Client.commit()
cur.close()
except:
self.timescaleLogger.error("An error occurred in createRAWDataTable")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
I then insert the data into the hypertable using the following:
def insertRAWData(self, seconds):
try:
insert_start_time = datetime.now(pytz.timezone("MST"))
current_time = insert_start_time
num_iterations = seconds * self.fs
time_increment = timedelta(seconds=1/self.fs)
raw_data_query = self.query_insert_raw_data
dtype = "float32"
matrix = np.random.rand(self.fs*seconds,self.num_channel).astype(dtype)
cur = self.TimescaleDB_Client.cursor()
data = list()
for iteration in range(num_iterations):
raw_data_row = matrix[iteration,:].tolist() #Select a particular row and all columns
time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
raw_data_values = (time_string,)+tuple(raw_data_row)
data.append(raw_data_values)
current_time = current_time + time_increment
start_time = time.perf_counter()
psycopg2.extras.execute_values(
cur, raw_data_query, data, template=None, page_size=100
)
print(time.perf_counter() - start_time)
self.TimescaleDB_Client.commit()
cur.close()
except:
self.timescaleLogger.error("An error occurred in insertRAWData")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
The SQL Query String that I am referencing in the above code is obtained from the following:
def getRAWData_Query(self):
try:
self.query_insert_raw_data = None
for channel in range(self.num_channel):
channel = channel + 1
if self.query_insert_raw_data is None:
self.query_insert_raw_data = f"INSERT INTO raw_data (time, channel{channel}"
else:
self.query_insert_raw_data = self.query_insert_raw_data + f", channel{channel}"
self.query_insert_raw_data = self.query_insert_raw_data + ") VALUES %s;"
return self.query_insert_raw_data
except:
self.timescaleLogger.error("An error occurred in insertRAWData_Query")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
As you can see, I am using psycopg2.extras.execute_values()
to insert the values. To my understanding, this is one of the fastest ways to insert data. However, it takes about 80 seconds for me to insert this data. It is on quite a beafy system with 12 cores/24 threads
, SSDs
, and 256GB of RAM
. Can this be done faster? It just seems quite slow.
I would like to use TimescaleDB
and am evaluating its performance. But I am looking to write within 2 seconds or so for it to be acceptable.
Edit I have tried to use pandas
to perform the insert, but it took longer, at about 117 seconds. The following is the function that I used.
def insertRAWData_Pandas(self, seconds):
try:
insert_start_time = datetime.now(pytz.timezone("MST"))
current_time = insert_start_time
num_iterations = seconds * self.fs
time_increment = timedelta(seconds=1/self.fs)
raw_data_query = self.query_insert_raw_data
dtype = "float32"
matrix = np.random.rand(self.fs*seconds,self.num_channel).astype(dtype)
pd_df_dict = {}
pd_df_dict["time"] = list()
for iteration in range(num_iterations):
time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
pd_df_dict["time"].append(time_string)
current_time = current_time + time_increment
for channel in range(self.num_channel):
pd_df_dict[f"channel{channel}"] = matrix[:,channel].tolist()
start_time = time.perf_counter()
pd_df = pd.DataFrame(pd_df_dict)
pd_df.to_sql('raw_data', self.engine, if_exists='append')
print(time.perf_counter() - start_time)
except:
self.timescaleLogger.error("An error occurred in insertRAWData_Pandas")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
edit I have tried to use CopyManager
and it appears to be producing the best results at around 74 seconds. Still not what I was after however.
def insertRAWData_PGCOPY(self, seconds):
try:
insert_start_time = datetime.now(pytz.timezone("MST"))
current_time = insert_start_time
num_iterations = seconds * self.fs
time_increment = timedelta(seconds=1/self.fs)
dtype = "float32"
matrix = np.random.rand(num_iterations,self.num_channel).astype(dtype)
data = list()
for iteration in range(num_iterations):
raw_data_row = matrix[iteration,:].tolist() #Select a particular row and all columns
#time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
raw_data_values = (current_time,)+tuple(raw_data_row)
data.append(raw_data_values)
current_time = current_time + time_increment
channelList = list()
for channel in range(self.num_channel):
channel = channel + 1
channelString = f"channel{channel}"
channelList.append(channelString)
channelList.insert(0,"time")
cols = tuple(channelList)
start_time = time.perf_counter()
mgr = CopyManager(self.TimescaleDB_Client, 'raw_data', cols)
mgr.copy(data)
self.TimescaleDB_Client.commit()
print(time.perf_counter() - start_time)
except:
self.timescaleLogger.error("An error occurred in insertRAWData_PGCOPY")
tb = traceback.format_exc()
self.timescaleLogger.exception(tb)
return False
I tried to modify the following values in postgresql.conf
. There wasn't a noticeable performance improvement.
wal_level = minimal
fsync = off
synchronous_commit = off
wal_writer_delay = 2000ms
commit_delay = 100000
I have tried to modify the chunk size according to one of the below comments using the following in my createRawDataTable()
function. However, there wasn't an improvement in the insert times. Perhaps this was also expectable given that I haven't been accumulating data. The data in the database has only been a few samples, perhaps at most 1 minute worth over the course of my testing.
self.query_create_raw_data_hypertable = "SELECT create_hypertable('raw_data', 'time', chunk_time_interval => INTERVAL '3 day',if_not_exists => TRUE);"
Edit For anyone reading this, I was able to pickle and insert an 32000x464 float32 numpy matrix in about 0.5 seconds for MongoDB, which is what my final solution is. Perhaps MongoDB just does better with this workload in this case.
Upvotes: 1
Views: 2937
Reputation: 37
You can use sqlachemy library to do it and also calibrate the chunksize while you are at it.
Append the data should possibly less than 74 seconds since I perform similar kind of insertion and it takes me about 40 odd seconds.
Another possibility is to use the pandas.DataFrame.to_sql with method=callable. It will increase the performance drastically.
in comparison to just to_sql (150s) or to_sql with method = multi (196s), the callable method did the job in just 14s.
Although a comparative summary for different methods would be best described with the image
Upvotes: 1
Reputation: 41
I have a two initial suggestions that may help with overall performance.
The default hypertable you are creating will "chunk" your data by 7 day periods (this means each chunk will hold around 4,838,400,000 rows of data given your parameters). Since your data is so granular, you may want to use a different chunk size. Check out the docs here for info on the optional chunk_time_interval
argument. Changing the chuck size should help with inserting and querying speed, it also will give you better performance in compression if needed later on.
As the individuals above stated, playing around with batch inserts should also help. If you haven't checked out this stock data tutorial I would highly recommend it. Using pgcopy
and it's function CopyManager
could help with inserting df objects more quickly.
Hopefully, some of this information can be helpful to your situation!
disclosure: I am part of the Timescale team 😊
Upvotes: 1
Reputation: 139
One of the fastest ways is to
here is a way you can do it: How to write data frame to postgres?
Upvotes: 0