Reputation: 647
I am trying to load large data into one table in PostgreSQL server (40 million rows total) in small batches (6000 rows in each csv). I thought HikariCP would be ideal for this purpose.
This is the throughput I get from my data insertion using Java 8 (1.8.0_65), Postgres JDBC driver 9.4.1211 and HikariCP 2.4.3.
6000 rows in 4 minutes and 42 seconds.
What am I doing wrong and how can I improve the speed of insertion?
Few more words about my setup:
Program inserts each row asynchronously with large thread pool to hold requests as below:
private static ExecutorService executorService = new ThreadPoolExecutor(5, 1000, 30L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));
DataSource configuration is:
private DataSource getDataSource() {
if (datasource == null) {
LOG.info("Establishing dataSource");
HikariConfig config = new HikariConfig();
config.setJdbcUrl(url);
config.setUsername(userName);
config.setPassword(password);
config.setMaximumPoolSize(600);// M4.large 648 connections tops
config.setAutoCommit(true); //I tried autoCommit=false and manually committed every 1000 rows but it only increased 2 minute and half for 6000 rows
config.addDataSourceProperty("dataSourceClassName","org.postgresql.ds.PGSimpleDataSource");
config.addDataSourceProperty("dataSource.logWriter", new PrintWriter(System.out));
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "1000");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.setConnectionTimeout(1000);
datasource = new HikariDataSource(config);
}
return datasource;
}
This where I read source data:
private void readMetadata(String inputMetadata, String source) {
BufferedReader br = null;
FileReader fr = null;
try {
br = new BufferedReader(new FileReader(inputMetadata));
String sCurrentLine = br.readLine();// skip header;
if (!sCurrentLine.startsWith("xxx") && !sCurrentLine.startsWith("yyy")) {
callAsyncInsert(sCurrentLine, source);
}
while ((sCurrentLine = br.readLine()) != null) {
callAsyncInsert(sCurrentLine, source);
}
} catch (IOException e) {
LOG.error(ExceptionUtils.getStackTrace(e));
} finally {
try {
if (br != null)
br.close();
if (fr != null)
fr.close();
} catch (IOException ex) {
LOG.error(ExceptionUtils.getStackTrace(ex));
}
}
}
I am inserting data asynchronously (or trying to with jdbc!):
private void callAsyncInsert(final String line, String source) {
Future<?> future = executorService.submit(new Runnable() {
public void run() {
try {
dataLoader.insertRow(line, source);
} catch (SQLException e) {
LOG.error(ExceptionUtils.getStackTrace(e));
try {
errorBufferedWriter.write(line);
errorBufferedWriter.newLine();
errorBufferedWriter.flush();
} catch (IOException e1) {
LOG.error(ExceptionUtils.getStackTrace(e1));
}
}
}
});
try {
if (future.get() != null) {
LOG.info("$$$$$$$$" + future.get().getClass().getName());
}
} catch (InterruptedException e) {
LOG.error(ExceptionUtils.getStackTrace(e));
} catch (ExecutionException e) {
LOG.error(ExceptionUtils.getStackTrace(e));
}
}
My DataLoader.insertRow is below:
public void insertRow(String row, String source) throws SQLException {
String[] splits = getRowStrings(row);
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
if (splits.length == 15) {
String ... = splits[0];
//blah blah blah
String insertTableSQL = "insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ";
conn = getConnection();
preparedStatement = conn.prepareStatement(insertTableSQL);
preparedStatement.setString(1, column1);
//blah blah blah
preparedStatement.executeUpdate();
counter.incrementAndGet();
//if (counter.get() % 1000 == 0) {
//conn.commit();
//}
} else {
LOG.error("Invalid row:" + row);
}
} finally {
/*if (conn != null) {
conn.close(); //Do preparedStatement.close(); rather connection.close
}*/
if (preparedStatement != null) {
preparedStatement.close();
}
}
}
When monitored in pgAdmin4, I noticed few things:
Upvotes: 2
Views: 4477
Reputation: 11114
You absolutely want to use batched inserts, with the statement being prepared outside of the loop, and auto-commit off. In pseudo-code:
PreparedStatement stmt = conn.prepareStatement("insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
while ( <data> ) {
stmt.setString(1, column1);
//blah blah blah
stmt.addBatch();
}
stmt.executeBatch();
conn.commit();
Even a single thread on a single connection should be able to insert > 5000 rows/sec.
UPDATE: If you want to multi-thread it, the number of connections should be the number of database CPU cores x1.5 or 2. The number of processing threads should match that, and each processing thread should handle one CSV file using the pattern above. However, you may find that many concurrent inserts into the same table creates too much lock contention in the DB, in which case you need to back-off the number of processing threads until you find the optimal concurrency.
A properly sized pool and concurrency should easily be above to achieve >20K rows/sec.
Also, please upgrade to HikariCP v2.6.0.
Upvotes: 5