geekygeek
geekygeek

Reputation: 751

Fastest way to insert many rows of data?

Every 4 seconds, I have to store 32,000 rows of data. Each of these rows consists of one time stamp value and 464 double precision values. The column name for the time stamp is time and the column name for the precision values increase sequentially as channel1, channel2, ..., and channel 464.

I establish a connection as follows:

CONNECTION = f"postgres://{username}:{password}@{host}:{port}/{dbname}"#?sslmode=require"
self.TimescaleDB_Client = psycopg2.connect(CONNECTION)

I then verify the TimescaleDB extension with the following:

def verifyTimeScaleInstall(self): 
    try: 
        sql_query = "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"
        cur = self.TimescaleDB_Client.cursor()
        cur.execute(sql_query)
        cur.close()
        self.TimescaleDB_Client.commit()
    except: 
        self.timescaleLogger.error("An error occurred in verifyTimeScaleInstall")
        tb = traceback.format_exc()
        self.timescaleLogger.exception(tb) 
        return False

I then create a hyptertable for my data with the following:

def createRAWDataTable(self): 
    try: 
        cur = self.TimescaleDB_Client.cursor()
        self.query_create_raw_data_table = None
        for channel in range(self.num_channel) :
            channel = channel + 1 
            if self.query_create_raw_data_table is None: 
                self.query_create_raw_data_table = f"CREATE TABLE IF NOT EXISTS raw_data (time TIMESTAMPTZ NOT NULL, channel{channel} REAL"
            else: 
                self.query_create_raw_data_table = self.query_create_raw_data_table + f", channel{channel} REAL"
        self.query_create_raw_data_table = self.query_create_raw_data_table + ");"
        self.query_create_raw_data_hypertable = "SELECT create_hypertable('raw_data', 'time');"
        cur.execute(self.query_create_raw_data_table)
        cur.execute(self.query_create_raw_data_hypertable)
        self.TimescaleDB_Client.commit()
        cur.close()
    except:         
        self.timescaleLogger.error("An error occurred in createRAWDataTable")
        tb = traceback.format_exc()
        self.timescaleLogger.exception(tb) 
        return False

I then insert the data into the hypertable using the following:

def insertRAWData(self, seconds): 
    try: 
        insert_start_time = datetime.now(pytz.timezone("MST"))
        current_time = insert_start_time
        num_iterations = seconds * self.fs
        time_increment = timedelta(seconds=1/self.fs)

        raw_data_query = self.query_insert_raw_data

        dtype = "float32"
        matrix = np.random.rand(self.fs*seconds,self.num_channel).astype(dtype)
        cur = self.TimescaleDB_Client.cursor()
        data = list()
        for iteration in range(num_iterations): 
            raw_data_row = matrix[iteration,:].tolist() #Select a particular row and all columns
            time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
            raw_data_values = (time_string,)+tuple(raw_data_row)
            data.append(raw_data_values)
            current_time = current_time + time_increment

        start_time = time.perf_counter()
        psycopg2.extras.execute_values(
            cur, raw_data_query, data, template=None, page_size=100
        )
        print(time.perf_counter() - start_time)
        self.TimescaleDB_Client.commit()
        cur.close()

    except: 
        self.timescaleLogger.error("An error occurred in insertRAWData")
        tb = traceback.format_exc()
        self.timescaleLogger.exception(tb) 
        return False

The SQL Query String that I am referencing in the above code is obtained from the following:

def getRAWData_Query(self): 
    try: 
        self.query_insert_raw_data = None
        for channel in range(self.num_channel): 
            channel = channel + 1
            if self.query_insert_raw_data is None: 
                self.query_insert_raw_data = f"INSERT INTO raw_data (time, channel{channel}"
            else: 
                self.query_insert_raw_data = self.query_insert_raw_data + f", channel{channel}"
        self.query_insert_raw_data = self.query_insert_raw_data + ") VALUES %s;"
        return self.query_insert_raw_data
    except: 
        self.timescaleLogger.error("An error occurred in insertRAWData_Query")
        tb = traceback.format_exc()
        self.timescaleLogger.exception(tb) 
        return False

As you can see, I am using psycopg2.extras.execute_values() to insert the values. To my understanding, this is one of the fastest ways to insert data. However, it takes about 80 seconds for me to insert this data. It is on quite a beafy system with 12 cores/24 threads, SSDs, and 256GB of RAM. Can this be done faster? It just seems quite slow.

I would like to use TimescaleDB and am evaluating its performance. But I am looking to write within 2 seconds or so for it to be acceptable.

Edit I have tried to use pandas to perform the insert, but it took longer, at about 117 seconds. The following is the function that I used.

def insertRAWData_Pandas(self, seconds): 
    try: 
        insert_start_time = datetime.now(pytz.timezone("MST"))
        current_time = insert_start_time
        num_iterations = seconds * self.fs
        time_increment = timedelta(seconds=1/self.fs)

        raw_data_query = self.query_insert_raw_data
        dtype = "float32"
        matrix = np.random.rand(self.fs*seconds,self.num_channel).astype(dtype)
        pd_df_dict = {}
        pd_df_dict["time"] = list()
        for iteration in range(num_iterations):
            time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z") 
            pd_df_dict["time"].append(time_string)
            current_time = current_time + time_increment
        for channel in range(self.num_channel): 
            pd_df_dict[f"channel{channel}"] = matrix[:,channel].tolist()



        start_time = time.perf_counter()
        pd_df = pd.DataFrame(pd_df_dict)
        pd_df.to_sql('raw_data', self.engine, if_exists='append')
        print(time.perf_counter() - start_time)

    except: 
        self.timescaleLogger.error("An error occurred in insertRAWData_Pandas")
        tb = traceback.format_exc()
        self.timescaleLogger.exception(tb) 
        return False

edit I have tried to use CopyManager and it appears to be producing the best results at around 74 seconds. Still not what I was after however.

def insertRAWData_PGCOPY(self, seconds): 
    try: 
        insert_start_time = datetime.now(pytz.timezone("MST"))
        current_time = insert_start_time
        num_iterations = seconds * self.fs
        time_increment = timedelta(seconds=1/self.fs)
        dtype = "float32"
        matrix = np.random.rand(num_iterations,self.num_channel).astype(dtype)
        data = list()
        for iteration in range(num_iterations): 
            raw_data_row = matrix[iteration,:].tolist() #Select a particular row and all columns
            #time_string = current_time.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
            raw_data_values = (current_time,)+tuple(raw_data_row)
            data.append(raw_data_values)
            current_time = current_time + time_increment
        channelList = list()
        for channel in range(self.num_channel): 
            channel = channel + 1
            channelString = f"channel{channel}"
            channelList.append(channelString)
        channelList.insert(0,"time")
        cols = tuple(channelList)

        start_time = time.perf_counter()
        mgr = CopyManager(self.TimescaleDB_Client, 'raw_data', cols)
        mgr.copy(data)
        self.TimescaleDB_Client.commit()
        print(time.perf_counter() - start_time)

    except: 
        self.timescaleLogger.error("An error occurred in insertRAWData_PGCOPY")
        tb = traceback.format_exc()
        self.timescaleLogger.exception(tb) 
        return False

I tried to modify the following values in postgresql.conf. There wasn't a noticeable performance improvement.

wal_level = minimal
fsync = off
synchronous_commit = off
wal_writer_delay = 2000ms
commit_delay = 100000

I have tried to modify the chunk size according to one of the below comments using the following in my createRawDataTable() function. However, there wasn't an improvement in the insert times. Perhaps this was also expectable given that I haven't been accumulating data. The data in the database has only been a few samples, perhaps at most 1 minute worth over the course of my testing.

self.query_create_raw_data_hypertable = "SELECT create_hypertable('raw_data', 'time', chunk_time_interval => INTERVAL '3 day',if_not_exists => TRUE);"

Edit For anyone reading this, I was able to pickle and insert an 32000x464 float32 numpy matrix in about 0.5 seconds for MongoDB, which is what my final solution is. Perhaps MongoDB just does better with this workload in this case.

Upvotes: 1

Views: 2937

Answers (3)

Shashank Goud
Shashank Goud

Reputation: 37

You can use sqlachemy library to do it and also calibrate the chunksize while you are at it.

Append the data should possibly less than 74 seconds since I perform similar kind of insertion and it takes me about 40 odd seconds.

Another possibility is to use the pandas.DataFrame.to_sql with method=callable. It will increase the performance drastically.

in comparison to just to_sql (150s) or to_sql with method = multi (196s), the callable method did the job in just 14s.

Although a comparative summary for different methods would be best described with the image Comparison for fast method to post data into postgres

Upvotes: 1

Miranda
Miranda

Reputation: 41

I have a two initial suggestions that may help with overall performance.

  1. The default hypertable you are creating will "chunk" your data by 7 day periods (this means each chunk will hold around 4,838,400,000 rows of data given your parameters). Since your data is so granular, you may want to use a different chunk size. Check out the docs here for info on the optional chunk_time_interval argument. Changing the chuck size should help with inserting and querying speed, it also will give you better performance in compression if needed later on.

  2. As the individuals above stated, playing around with batch inserts should also help. If you haven't checked out this stock data tutorial I would highly recommend it. Using pgcopy and it's function CopyManager could help with inserting df objects more quickly.

Hopefully, some of this information can be helpful to your situation!

disclosure: I am part of the Timescale team 😊

Upvotes: 1

Khanzadeh_AH
Khanzadeh_AH

Reputation: 139

One of the fastest ways is to

  • first create a pandas data frame of your data that you want to insert into the DB
  • then use the data frame to bulk-insert your data into the DB

here is a way you can do it: How to write data frame to postgres?

Upvotes: 0

Related Questions