Reputation: 13
I'm using spring batch with java configuration (new to this) and I'm running into a error when trying to use a ClassifierCompositeItemWriter so generate separate files based on a classifier.
The error im getting is org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
My configuration looks like follows:
package com.infonova.btcompute.batch.geneva.job;
import com.infonova.btcompute.batch.billruntransfer.BillRunTranStatusFinishedJobAssignment;
import com.infonova.btcompute.batch.billruntransfer.BillRunTranStatusInprogressJobAssignment;
import com.infonova.btcompute.batch.billruntransfer.BillRunTransferStatus;
import com.infonova.btcompute.batch.geneva.camel.GenevaJobLauncher;
import com.infonova.btcompute.batch.geneva.dto.GenevaDetailsResultsDto;
import com.infonova.btcompute.batch.geneva.dto.GenveaDetailsTransactionDto;
import com.infonova.btcompute.batch.geneva.properties.GenevaDetailsExportJobProperties;
import com.infonova.btcompute.batch.geneva.rowmapper.GenevaDetailsTransactionsRowMapper;
import com.infonova.btcompute.batch.geneva.steps.*;
import com.infonova.btcompute.batch.repository.BillrunTransferStatusMapper;
import com.infonova.btcompute.batch.utils.FileNameGeneration;
import com.infonova.product.batch.camel.CamelEnabledJob;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.classify.BackToBackPatternClassifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public abstract class AbstractGenevaDetailsExportJob extends CamelEnabledJob {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGenevaDetailsExportJob.class);
@Autowired
protected JobBuilderFactory jobBuilders;
@Autowired
protected StepBuilderFactory stepBuilders;
@Autowired
protected DataSource datasource;
@Autowired
private BillrunTransferStatusMapper billrunTransferStatusMapper;
@Autowired
protected JdbcTemplate jdbcTemplate;
public abstract GenevaDetailsExportJobProperties jobProperties();
@Bean
public RouteBuilder routeBuilder(final GenevaDetailsExportJobProperties jobProperties, final Job job) {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from(jobProperties.getConsumer())
.transacted("PROPAGATION_REQUIRED")
.routeId(jobProperties.getInputRouteName())
.process(genevaJobLauncher(job));
//.to("ftp://[email protected]?password=secret");
}
};
}
@Bean
public Processor genevaJobLauncher(Job job) {
return new GenevaJobLauncher(job);
}
@Bean
@StepScope
public GenevaDetailsReader reader() {
GenevaDetailsReader reader = new GenevaDetailsReader(jobProperties().getMandatorKey(),
jobProperties().getInvoiceType(), jobProperties().getSqlResourcePath());
reader.setSql("");
reader.setDataSource(datasource);
reader.setRowMapper(new GenevaDetailsTransactionsRowMapper());
reader.setFetchSize(jobProperties().getFetchSize());
return reader;
}
@Bean
@StepScope
public GenevaDetailsItemProcessor processor() {
return new GenevaDetailsItemProcessor();
}
@Bean
@StepScope
public ClassifierCompositeItemWriter writer() {
List<String> serviceCodes = new ArrayList<>();//billrunTransferStatusMapper.getServiceCodes(jobProperties().getMandatorKey());
Long billingTaskId = billrunTransferStatusMapper.getCurrentTaskId(jobProperties().getMandatorKey());
String countryKey = billrunTransferStatusMapper.getCountryKey(billingTaskId);
serviceCodes.add("BTCC");
serviceCodes.add("CCMS");
BackToBackPatternClassifier classifier = new BackToBackPatternClassifier();
classifier.setRouterDelegate(new GenveaDetailsRouterClassifier());
HashMap<String, Object> map = new HashMap<>();
for (String serviceCode : serviceCodes) {
map.put(serviceCode, genevaDetailsWriter(serviceCode, countryKey));
}
classifier.setMatcherMap(map);
ClassifierCompositeItemWriter<GenveaDetailsTransactionDto> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier(classifier);
return writer;
}
@Bean
@StepScope
public GenevaDetailsFlatFileItemWriter genevaDetailsWriter(String serviceCode, String countryKey) {
GenevaDetailsFlatFileItemWriter writer = new GenevaDetailsFlatFileItemWriter(jobProperties().getDelimiter());
FileNameGeneration fileNameGeneration = new FileNameGeneration();
try {
FileSystemResource fileSystemResource = new FileSystemResource(new File(jobProperties().getExportDir(), fileNameGeneration.generateFileName(jdbcTemplate,
serviceCode, countryKey)));
writer.setResource(fileSystemResource);
} catch (SQLException e) {
LOGGER.error("Error creating FileSystemResource : " + e.getMessage());
}
return writer;
}
@Bean
public Job job() {
return jobBuilders.get(jobProperties().getJobName())
.start(setBillRunTransferStatusDetailInprogressStep())
.next(processGenevaDetailsStep())
.next(setBillRunTransferStatusProcessedStep())
.build();
}
@Bean
public Step setBillRunTransferStatusDetailInprogressStep() {
return stepBuilders.get("setBillRunTransferStatusDetailInprogressStep")
.tasklet(setBillRunTransferStatusDetailInprogress())
.build();
}
@Bean
public Tasklet setBillRunTransferStatusDetailInprogress() {
return new BillRunTranStatusInprogressJobAssignment(BillRunTransferStatus.SUMMARY.toString(), BillRunTransferStatus.DETAILS_INPROGRESS.toString(),
jobProperties().getMandatorKey(), jobProperties().getInvoiceTypeNum(), jobProperties().getReportTypeNum());
}
@Bean
public Step setBillRunTransferStatusProcessedStep() {
return stepBuilders.get("setBillRunTransferStatusProcessedStep")
.tasklet(setBillRunTransferStatusProcessed())
.build();
}
@Bean
public Tasklet setBillRunTransferStatusProcessed() {
return new BillRunTranStatusFinishedJobAssignment(BillRunTransferStatus.PROCESSED.toString());
}
@Bean
public Step processGenevaDetailsStep() {
return stepBuilders.get("processGenevaDetailsStep")
.<GenveaDetailsTransactionDto, GenevaDetailsResultsDto>chunk(jobProperties().getChunkSize())
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
}
and my writer looks like:
package com.infonova.btcompute.batch.geneva.steps;
import com.infonova.btcompute.batch.geneva.dto.GenevaDetailsResultsDto;
import com.infonova.btcompute.batch.repository.BillrunTransferStatusMapper;
import com.infonova.btcompute.batch.utils.FileNameGeneration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.*;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
@Component
public class GenevaDetailsFlatFileItemWriter extends FlatFileItemWriter<GenevaDetailsResultsDto> {
private static final Logger LOGGER = LoggerFactory.getLogger(GenevaDetailsFlatFileItemWriter.class);
@Autowired
protected JdbcTemplate jdbcTemplate;
@Autowired
private BillrunTransferStatusMapper billrunTransferStatusMapper;
private String delimiter;
public GenevaDetailsFlatFileItemWriter(String delimiter) {
this.delimiter = delimiter;
this.setLineAggregator(getLineAggregator());
this.setHeaderCallback(getHeaderCallback());
}
private DelimitedLineAggregator<GenevaDetailsResultsDto> getLineAggregator() {
DelimitedLineAggregator<GenevaDetailsResultsDto> delLineAgg = new DelimitedLineAggregator<>();
delLineAgg.setDelimiter(delimiter);
BeanWrapperFieldExtractor<GenevaDetailsResultsDto> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(getNames());
delLineAgg.setFieldExtractor(fieldExtractor);
return delLineAgg;
}
private String[] getHeaderNames() {
return new String[] {"Record ID", "Service Identifier", "Billing Account Reference", "Cost Description", "Event Cost",
"Event Date and Time", "Currency Code", "Charge Category", "Order Identifier", "Net Usage", "UOM",
"Quantity", "Service Start Date", "Service End Date"};
}
private String[] getNames() {
return new String[] {"RECORD_ID", "SERVICE_CODE", "BILLING_ACCOUNT_REFERENCE", "COST_DESCRIPTION", "EVENT_COST",
"EVENT_DATE_AND_TIME", "CURRENCY_CODE", "CHARGE_CATEGORY", "ORDER_IDENTIFIER", "NET_USAGE", "UOM",
"QUANTITY", "SERVICE_START_DATE", "SERVICE_END_DATE"};
}
private FlatFileHeaderCallback getHeaderCallback()
{
return new FlatFileHeaderCallback() {
@Override
public void writeHeader(Writer writer) throws IOException {
writer.write(String.join(delimiter, getHeaderNames()));
}
};
}
// @BeforeStep
// public void beforeStep(StepExecution stepExecution) {
// billingTaskId = (Long) stepExecution.getJobExecution().getExecutionContext().get("billingTaskId");
// FileNameGeneration fileNameGeneration = new FileNameGeneration();
//
// try {
// FileSystemResource fileSystemResource = new FileSystemResource(new File(exportDir, fileNameGeneration.generateFileName(jdbcTemplate,
// serviceCode, billrunTransferStatusMapper.getCountryKey(billingTaskId))));
// setResource(fileSystemResource);
// } catch (SQLException e) {
// LOGGER.error("Error creating FileSystemResource : " + e.getMessage());
// }
// }
}
I have searched the web and cannot find a solution to this issue.
Upvotes: 1
Views: 1597
Reputation: 18423
What @Hansjoerg Wingeier wrote about ClassifierCompositeItemWriter
is correct, but the right way to resolve the problem is to register delegated writer(s) as stream(s) using AbstractTaskletStepBuilder.stream()
to let SB manage execution context lifecycle.
Upvotes: 1
Reputation: 4454
ClassifierCompositeItemWriter does not implement the ItemStream interface, hence the open method of your FlatFileItemWriter is never called.
The easiest thing to do is to call the open method when you create your classifier map:
for (String serviceCode : serviceCodes) {
FlatFileItemWriter writer =genevaDetailsWriter(serviceCode, countryKey);
writer.open (new ExecutionContext ());
map.put(serviceCode, writer);
}
Upvotes: 0