aaron mei
aaron mei

Reputation: 33

Slow Postgresql Upserts using libpqxx

Using the LiChess database (~4 billion games split across multiple textfiles), I want to construct a table called users where for each user, the table stores the highest rated player the user has beaten and the highest rated player the user has lost to.

After processing a game, I want to perform an upsert query where I want to replace the defeated and lostto columns as necessary. This would be result in around 4 billion queries. I am using C++ to process the text file (it's decently fast) and am using lipqxx to connect to the database. Currently the database processes around 1 million games every 5 minutes.

I am using Postgresql 14 on an AWS EC2 r6i.xlarge instance with 32GB of RAM.

Currently, the way I am upserting is that I have two prepared statements:

PREPARE upsert_win_query (varchar(40), varchar(40), integer) AS 
    INSERT INTO users (id, defeated, drating, lostto, lrating) VALUES ($1, $2, $3, '', 0) 
    ON CONFLICT (id) DO UPDATE SET 
    defeated = CASE WHEN EXCLUDED.drating > users.drating THEN EXCLUDED.defeated ELSE users.defeated END,
    drating = CASE WHEN EXCLUDED.drating > users.drating THEN EXCLUDED.drating ELSE users.drating END;

PREPARE upsert_loss_query (varchar(40), varchar(40), integer) AS 
    INSERT INTO users (id, defeated, drating, lostto, lrating) VALUES ($1, '', 0, $2, $3)
    ON CONFLICT (id) DO UPDATE SET 
    lostto = CASE WHEN EXCLUDED.lrating > users.lrating THEN EXCLUDED.lostto ELSE users.lostto END,
    lrating = CASE WHEN EXCLUDED.lrating > users.lrating THEN EXCLUDED.lrating ELSE users.lrating END;

Furthermore, I tried batching my queries so that I call W.exec() every 2000 games. Changing the batch size didn't seem to change the performance much.

The full C++ code is below:

#include <fstream>
#include <iostream>
#include <string>
#include <chrono>

#include <pqxx/pqxx>
#include <libssh/libssh.h>
#include <libssh/libsshpp.hpp>


using namespace std;
using namespace pqxx;


const int batch_size = 2000;
string white, black, wr, br; char res;
string u, o, r, r2;
int games = 0;


int main(int argc, char *argv[]) {
    chrono::steady_clock::time_point begin = chrono::steady_clock::now();
    string fname = argv[1];
    string sql = "";
    string sql2 = "";


    try {
        connection C("dbname = lichessdb user = ubuntu password = password \
                hostaddr = 127.0.0.1 port = 5432");
        if (C.is_open()) {
            cout << "Opened database successfully: " << C.dbname() << endl;
        } else {
            cout << "Can't open database" << endl;
            return 1;
        }

        work W(C);
        int cnt = 0;

        sql = "DROP INDEX IF EXISTS users_id_drating_idx; " \
            "DROP INDEX IF EXISTS users_id_lrating_idx;";
        

        W.exec(sql.c_str());

        sql = "PREPARE upsert_win_query (varchar(40), varchar(40), integer) AS " \
        "INSERT INTO users (id, defeated, drating, lostto, lrating) VALUES ($1, $2, $3, "", 0) " \
        "ON CONFLICT (id) " \
            "DO UPDATE SET " \
        "defeated = CASE WHEN EXCLUDED.drating > users.drating THEN EXCLUDED.defeated ELSE users.defeated END," \
        "drating = CASE WHEN EXCLUDED.drating > users.drating THEN EXCLUDED.drating ELSE users.drating END;\n" \
        "PREPARE upsert_loss_query (varchar(40), varchar(40), integer) AS " \
            "INSERT INTO users (id, lostto, lrating) VALUES ($1, $2, $3) " \
        "ON CONFLICT (id) " \
        "DO UPDATE SET " \
        "lostto = CASE WHEN EXCLUDED.lrating > users.lrating THEN EXCLUDED.lostto ELSE users.lostto END," \
        "lrating = CASE WHEN EXCLUDED.lrating > users.lrating THEN EXCLUDED.lrating ELSE users.lrating END;";


        W.exec(sql.c_str());

        sql = "";

        ifstream file(fname);
        if (file.is_open()) {
            string line;
            while (getline(file, line)) {
                if (line[1] == 'W') {
                    if (line[6] == ' ') {
                        white = line.substr(8, line.length() - 10);
                    } else if (line[6] == 'E') {
                        wr = line.substr(11, line.length() - 13);
                    }
                } else if (line[1] == 'B') {
                    if (line[6] == ' ') {
                        black = line.substr(8, line.length() - 10);
                    } else if (line[6] == 'E') {
                        br = line.substr(11, line.length() - 13);

                        // parse the winner
                        if (res == '1') {
                            u = white;
                            o = black;
                            r = br;
                            r2 = wr;
                        } else if (res == '0') {
                            u = black;
                            o = white;
                            r = wr;
                            r2 = br;
                        } else {
                            // continue if draw
                            continue;
                        }

                        // continue if game is indeterminate
                        if (r2[0] == '?' || r[0] == '?') continue;

                        sql += "EXECUTE upsert_win_query('" + u + "', '" + o + "'," + r + ")\n;";
                        sql2 += "EXECUTE upsert_loss_query('" + o + "', '" + u + "'," + r2 + ");\n";    

                        cnt++;

                        if (cnt == batch_size) {
                            cnt = 0;
                            W.exec(sql.c_str());
                            W.exec(sql2.c_str());
                            sql = "";
                            sql2 = "";
                        }
                    }   
                } else if (line[1] == 'R') {
                    if (line[2] == 'e') {
                        games++;
                        if (line[11] != '2') {
                            res = line[9];
                        }
                    }
                }
            }
            file.close();
        }
        
        if (cnt != 0) {
            W.exec(sql.c_str());
            W.exec(sql2.c_str());
        }

        sql = "DEALLOCATE upsert_win_query; DEALLOCATE upsert_loss_query;";
        W.exec(sql.c_str());

        W.commit();

        cout << "Added data successfully" << endl;
        
        C.disconnect();
    } catch (const exception &e) {
        cerr << e.what() << endl;
        return 1;
    }

    chrono::steady_clock::time_point end = chrono::steady_clock::now();
    
    cout << games << endl;
    cout << "Time difference = " << chrono::duration_cast<chrono::microseconds>(end - begin).count() << "[microseconds]" << endl;

    return 0;
}

An example output is

Opened database successfully: lichessdb
Added data successfully
1048440
Time difference = 269071101[microseconds]

Finally, I have tried optimizing memory following this blog.

I'm new to postgresql (and sql in general) so any help with optimizing this would be greatly appreciated!

Upvotes: 1

Views: 218

Answers (1)

Joseph Larson
Joseph Larson

Reputation: 9068

Would something like this help:

INSERT INTO products (product_no, name, price) VALUES
    (1, 'Cheese', 9.99),
    (2, 'Bread', 1.99),
    (3, 'Milk', 2.99);

And this question may offer some suggestions: What is the fastest way to insert data into postgres using pqxx

It references the COPY command: https://www.postgresql.org/docs/current/sql-copy.html

Upvotes: 0

Related Questions