Don_Quijote
Don_Quijote

Reputation: 1046

Spring batch (spring boot - java config) multithreaded jdbc writer

I need to write a huge amount of data into mysql.

I want to do it in several threads or performance. I would like to use spring batch partitioning but never done this before.

My spring batch java config (part):

@Bean
ItemWriter<Event> writer() throws SQLException {
    return new CustomJdbcBatchDataWriter();
}

@Bean
public TaskExecutor taskExecutor(){
    SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
    asyncTaskExecutor.setConcurrencyLimit(threadsAmount);
    return asyncTaskExecutor;
}

@Bean
public Job importUserJob(JobCompletionNotificationListener listener) throws Exception {
    return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1())
            .end()
            .build();
}

@Bean
public Step step1() throws Exception {
    return stepBuilderFactory.get("step1")
            .<Event, Event>chunk(chunkSize)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .taskExecutor(taskExecutor())
            .build();
}

And my custom jdbc writer (had to write it to disable autocommit - for performance):

public class CustomJdbcBatchDataWriter implements ItemWriter {

@Override
public void write(List<? extends Event> items) throws Exception {
try (
    Connection connection = DriverManager.getConnection(
            "jdbc:mysql://localhost:3306/batch?useSSL=false&useServerPrepStmts=false&rewriteBatchedStatements=true",
            "user", "password") ) {
    connection.setAutoCommit(false);


    String sql = "INSERT INTO events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
    PreparedStatement ps = connection.prepareStatement(sql);

    for (Event p : items) {
        try {
            ps.setString(1, p.getId());
       //Setting rest of data into prepared statement
       ...
            ps.addBatch();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    ps.executeBatch();
    connection.commit();
}
}

}

How can I configure this so it will insert different data in different threads to gain performance? Any help is really appreciated.

Upvotes: 0

Views: 1235

Answers (1)

Dean Clark
Dean Clark

Reputation: 3868

Here's something to get you started. I haven't tested it, but it should at least get you close.

//updated this bean of yours. the others are new
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) throws Exception {
    return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(partitionStep())
            .end()
            .build();
}

@Bean
public Step partitionStep(){
    return stepBuilderFactory.get("partitionStep")
            .partitioner(step1()) //leverage the step you already have
            .partitioner("step1", partitioner())
            .gridSize(10) //# of threads
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Partitioner partitioner() {
    //Use this partitioner to add ranges for your reader
    //NOTE: your reader needs to be in @StepScope to pull from the Step Execution Context
    return new YourCustomPartitioner();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor();
}

Upvotes: 1

Related Questions