Reputation: 1046
I need to write a huge amount of data into mysql.
I want to do it in several threads or performance. I would like to use spring batch partitioning but never done this before.
My spring batch java config (part):
@Bean
ItemWriter<Event> writer() throws SQLException {
return new CustomJdbcBatchDataWriter();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(threadsAmount);
return asyncTaskExecutor;
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1())
.end()
.build();
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Event, Event>chunk(chunkSize)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
And my custom jdbc writer (had to write it to disable autocommit - for performance):
public class CustomJdbcBatchDataWriter implements ItemWriter {
@Override
public void write(List<? extends Event> items) throws Exception {
try (
Connection connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/batch?useSSL=false&useServerPrepStmts=false&rewriteBatchedStatements=true",
"user", "password") ) {
connection.setAutoCommit(false);
String sql = "INSERT INTO events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
PreparedStatement ps = connection.prepareStatement(sql);
for (Event p : items) {
try {
ps.setString(1, p.getId());
//Setting rest of data into prepared statement
...
ps.addBatch();
} catch (SQLException e) {
e.printStackTrace();
}
}
ps.executeBatch();
connection.commit();
}
}
}
How can I configure this so it will insert different data in different threads to gain performance? Any help is really appreciated.
Upvotes: 0
Views: 1235
Reputation: 3868
Here's something to get you started. I haven't tested it, but it should at least get you close.
//updated this bean of yours. the others are new
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) throws Exception {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(partitionStep())
.end()
.build();
}
@Bean
public Step partitionStep(){
return stepBuilderFactory.get("partitionStep")
.partitioner(step1()) //leverage the step you already have
.partitioner("step1", partitioner())
.gridSize(10) //# of threads
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Partitioner partitioner() {
//Use this partitioner to add ranges for your reader
//NOTE: your reader needs to be in @StepScope to pull from the Step Execution Context
return new YourCustomPartitioner();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
Upvotes: 1