Reputation: 33
Using the LiChess database (~4 billion games split across multiple textfiles), I want to construct a table called users
where for each user, the table stores the highest rated player the user has beaten and the highest rated player the user has lost to.
After processing a game, I want to perform an upsert query where I want to replace the defeated
and lostto
columns as necessary. This would be result in around 4 billion queries. I am using C++ to process the text file (it's decently fast) and am using lipqxx to connect to the database. Currently the database processes around 1 million games every 5 minutes.
I am using Postgresql 14 on an AWS EC2 r6i.xlarge instance with 32GB
of RAM.
Currently, the way I am upserting is that I have two prepared statements:
PREPARE upsert_win_query (varchar(40), varchar(40), integer) AS
INSERT INTO users (id, defeated, drating, lostto, lrating) VALUES ($1, $2, $3, '', 0)
ON CONFLICT (id) DO UPDATE SET
defeated = CASE WHEN EXCLUDED.drating > users.drating THEN EXCLUDED.defeated ELSE users.defeated END,
drating = CASE WHEN EXCLUDED.drating > users.drating THEN EXCLUDED.drating ELSE users.drating END;
PREPARE upsert_loss_query (varchar(40), varchar(40), integer) AS
INSERT INTO users (id, defeated, drating, lostto, lrating) VALUES ($1, '', 0, $2, $3)
ON CONFLICT (id) DO UPDATE SET
lostto = CASE WHEN EXCLUDED.lrating > users.lrating THEN EXCLUDED.lostto ELSE users.lostto END,
lrating = CASE WHEN EXCLUDED.lrating > users.lrating THEN EXCLUDED.lrating ELSE users.lrating END;
Furthermore, I tried batching my queries so that I call W.exec()
every 2000 games. Changing the batch size didn't seem to change the performance much.
The full C++ code is below:
#include <fstream>
#include <iostream>
#include <string>
#include <chrono>
#include <pqxx/pqxx>
#include <libssh/libssh.h>
#include <libssh/libsshpp.hpp>
using namespace std;
using namespace pqxx;
const int batch_size = 2000;
string white, black, wr, br; char res;
string u, o, r, r2;
int games = 0;
int main(int argc, char *argv[]) {
chrono::steady_clock::time_point begin = chrono::steady_clock::now();
string fname = argv[1];
string sql = "";
string sql2 = "";
try {
connection C("dbname = lichessdb user = ubuntu password = password \
hostaddr = 127.0.0.1 port = 5432");
if (C.is_open()) {
cout << "Opened database successfully: " << C.dbname() << endl;
} else {
cout << "Can't open database" << endl;
return 1;
}
work W(C);
int cnt = 0;
sql = "DROP INDEX IF EXISTS users_id_drating_idx; " \
"DROP INDEX IF EXISTS users_id_lrating_idx;";
W.exec(sql.c_str());
sql = "PREPARE upsert_win_query (varchar(40), varchar(40), integer) AS " \
"INSERT INTO users (id, defeated, drating, lostto, lrating) VALUES ($1, $2, $3, "", 0) " \
"ON CONFLICT (id) " \
"DO UPDATE SET " \
"defeated = CASE WHEN EXCLUDED.drating > users.drating THEN EXCLUDED.defeated ELSE users.defeated END," \
"drating = CASE WHEN EXCLUDED.drating > users.drating THEN EXCLUDED.drating ELSE users.drating END;\n" \
"PREPARE upsert_loss_query (varchar(40), varchar(40), integer) AS " \
"INSERT INTO users (id, lostto, lrating) VALUES ($1, $2, $3) " \
"ON CONFLICT (id) " \
"DO UPDATE SET " \
"lostto = CASE WHEN EXCLUDED.lrating > users.lrating THEN EXCLUDED.lostto ELSE users.lostto END," \
"lrating = CASE WHEN EXCLUDED.lrating > users.lrating THEN EXCLUDED.lrating ELSE users.lrating END;";
W.exec(sql.c_str());
sql = "";
ifstream file(fname);
if (file.is_open()) {
string line;
while (getline(file, line)) {
if (line[1] == 'W') {
if (line[6] == ' ') {
white = line.substr(8, line.length() - 10);
} else if (line[6] == 'E') {
wr = line.substr(11, line.length() - 13);
}
} else if (line[1] == 'B') {
if (line[6] == ' ') {
black = line.substr(8, line.length() - 10);
} else if (line[6] == 'E') {
br = line.substr(11, line.length() - 13);
// parse the winner
if (res == '1') {
u = white;
o = black;
r = br;
r2 = wr;
} else if (res == '0') {
u = black;
o = white;
r = wr;
r2 = br;
} else {
// continue if draw
continue;
}
// continue if game is indeterminate
if (r2[0] == '?' || r[0] == '?') continue;
sql += "EXECUTE upsert_win_query('" + u + "', '" + o + "'," + r + ")\n;";
sql2 += "EXECUTE upsert_loss_query('" + o + "', '" + u + "'," + r2 + ");\n";
cnt++;
if (cnt == batch_size) {
cnt = 0;
W.exec(sql.c_str());
W.exec(sql2.c_str());
sql = "";
sql2 = "";
}
}
} else if (line[1] == 'R') {
if (line[2] == 'e') {
games++;
if (line[11] != '2') {
res = line[9];
}
}
}
}
file.close();
}
if (cnt != 0) {
W.exec(sql.c_str());
W.exec(sql2.c_str());
}
sql = "DEALLOCATE upsert_win_query; DEALLOCATE upsert_loss_query;";
W.exec(sql.c_str());
W.commit();
cout << "Added data successfully" << endl;
C.disconnect();
} catch (const exception &e) {
cerr << e.what() << endl;
return 1;
}
chrono::steady_clock::time_point end = chrono::steady_clock::now();
cout << games << endl;
cout << "Time difference = " << chrono::duration_cast<chrono::microseconds>(end - begin).count() << "[microseconds]" << endl;
return 0;
}
An example output is
Opened database successfully: lichessdb
Added data successfully
1048440
Time difference = 269071101[microseconds]
Finally, I have tried optimizing memory following this blog.
I'm new to postgresql (and sql in general) so any help with optimizing this would be greatly appreciated!
Upvotes: 1
Views: 218
Reputation: 9068
Would something like this help:
INSERT INTO products (product_no, name, price) VALUES
(1, 'Cheese', 9.99),
(2, 'Bread', 1.99),
(3, 'Milk', 2.99);
And this question may offer some suggestions: What is the fastest way to insert data into postgres using pqxx
It references the COPY command: https://www.postgresql.org/docs/current/sql-copy.html
Upvotes: 0