Reputation: 500
I am using spring batch
in the spring-boot
application. The Spring Boot version is 2.3.3.RELEASE
.
I have a multi-step job
, which validates xml file header
in the first step then read
transaction
in chunk oriented step
, do some business logic
on each transaction and then write
it back to xml file. In the third
and final step when i try to delete the input file
, it throws the FileSystemException
.
UPDATE: Even after the job is completed i am not able to delete the input file.
Spring batch configuration
package com.trax.europeangateway.config;
import java.util.Arrays;
import java.util.List;
import javax.xml.bind.JAXBException;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
import org.springframework.batch.item.xml.builder.StaxEventItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.util.ClassUtils;
import com.trax.europeangateway.factory.XMLSchemaValidatorFactory;
import com.trax.europeangateway.itemprocessor.omegaxml.PIExtractorItemProcessor;
import com.trax.europeangateway.itemprocessor.omegaxml.PIRemoverItemProcessor;
import com.trax.europeangateway.itemwriter.omegaxml.EdsClientItemWriter;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFooterCallBack;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlHeaderCallBack;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller;
import com.trax.europeangateway.listener.JobResultListener;
import com.trax.europeangateway.listener.StepResultListener;
import com.trax.europeangateway.model.dto.FileInformationHeaderDto;
import com.trax.europeangateway.model.dto.ProcessorWriterDto;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;
import com.trax.europeangateway.model.enums.Directory;
import com.trax.europeangateway.service.AfterJobService;
import com.trax.europeangateway.service.ExtractHeaderOmegaXml;
import com.trax.europeangateway.util.FileUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Configuration
public class OmegaXmlBatchConfig {
@Autowired
PIExtractorItemProcessor pIExtractorItemProcessor;
@Autowired
StepResultListener stepResultListener;
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
private FileUtils fileUtils;
@Value( "${eugateway.batch.chunk.size}" )
private int chunkSize;
@JobScope
@Bean (name = "extractOmegaXmlHeaderStep")
public Step extractOmegaXmlHeaderStep(StepBuilderFactory steps , XMLSchemaValidatorFactory factory,
@Value("#{jobParameters['file.path']}") String filePath,
@Value("#{jobParameters['submission.account']}") String submitter) {
return steps.get("omegaXmlExtractHeaderStep")
.tasklet((contribution, chunkContext) -> {
//factory.validatePayload(filePath, submitter);
FileInformationHeaderDto fileInformation = new ExtractHeaderOmegaXml().readHeader(filePath);
ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
jobExecutionContext.put("file.information", fileInformation);
return RepeatStatus.FINISHED;
}).build();
}
@Bean (name = "OmegaXmlExtractAndReplacePersonalDataStep")
public Step jobStep(ItemStreamReader<TransactionPositionReport> omegaXmlItemReader,
CompositeItemProcessor<TransactionPositionReport,
ProcessorWriterDto> omegaXmlExtractAndRemoveItemProcessor,
CompositeItemWriter<ProcessorWriterDto> omegaXmlCompositeItemWriter,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("OmegaXmlExtractAndReplacePersonalDataStep")
.<TransactionPositionReport, ProcessorWriterDto>chunk(chunkSize)
.reader(omegaXmlItemReader)
.processor(omegaXmlExtractAndRemoveItemProcessor)
.writer(omegaXmlCompositeItemWriter)
.faultTolerant()
.listener(stepResultListener)
.build();
}
@JobScope
@Bean (name = "moveFileStep")
public Step moveFileStep(StepBuilderFactory steps , AfterJobService afterJobService,
@Value("#{jobParameters['file.path']}") String filePath,
@Value("#{jobParameters['submission.account']}") String submitter) {
return steps.get("moveFileStep")
.tasklet((contribution, chunkContext) -> {
Path inputFile = new File(filePath).toPath();
Files.delete(inputFile);
log.info("Submitter {}: File {} moved with status {}",submitter, filePath, status);
return RepeatStatus.FINISHED;
}).build();
}
@Primary
@Bean("omegaXmlJob")
public Job extractPersonalDataJob(Step extractOmegaXmlHeaderStep, Step OmegaXmlExtractAndReplacePersonalDataStep,
Step moveFileStep, JobResultListener jobListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("omegaXmlExtractAndReplacePersonalDataJob")
.incrementer(new RunIdIncrementer())
.start(extractOmegaXmlHeaderStep)
.next(OmegaXmlExtractAndReplacePersonalDataStep)
.next(moveFileStep)
.listener(jobListener)
.build();
}
@StepScope
@Bean(destroyMethod="doClose")
public ItemStreamReader<TransactionPositionReport> omegaXmlItemReader(@Value("#{jobParameters['file.path']}") String path) {
Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
transactionMarshaller.setMappedClass(TransactionPositionReport.class);
transactionMarshaller.setPackagesToScan(ClassUtils.getPackageName(TransactionPositionReport.class));
transactionMarshaller.setSupportJaxbElementClass(true);
transactionMarshaller.setSupportDtd(true);
log.debug("Generating StaxEventItemReader");
return new StaxEventItemReaderBuilder<TransactionPositionReport>()
.name("transactionPositionReport")
.resource(new FileSystemResource(path))
.addFragmentRootElements("transaction")
.unmarshaller(transactionMarshaller)
.build();
}
@Bean
@JobScope
OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['file.information']}") FileInformationHeaderDto fileInformation){
return new OmegaXmlHeaderCallBack(fileInformation);
}
@Bean(destroyMethod="")
OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
return new OmegaXmlFooterCallBack();
}
@JobScope
@Bean(name = "staxTransactionWriter", destroyMethod="")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack,
@Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));
OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
marshaller.setSupportJaxbElementClass(true);
marshaller.setCheckForXmlRootElement(false);
return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
.headerCallback(omegaXmlHeaderCallBack)
.footerCallback(getOmegaXmlFooterCallBack())
.build();
}
@StepScope
@Bean
public PIExtractorItemProcessor extractItemProcessor() {
log.debug("Generating PIExtractorItemProcessor");
return new PIExtractorItemProcessor();
}
@Bean
public PIRemoverItemProcessor removeItemProcessor() {
log.debug("Generating PIRemoverItemProcessor");
return new PIRemoverItemProcessor();
}
@Bean
@StepScope
CompositeItemProcessor<TransactionPositionReport, ProcessorWriterDto> omegaXmlExtractAndRemoveItemProcessor() {
log.debug("Generating CompositeItemProcessor");
CompositeItemProcessor<TransactionPositionReport, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
return itemProcessor;
}
@Bean
public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
log.debug("Generating EdsClientItemWriter");
return new EdsClientItemWriter<>();
}
@StepScope
@Bean(destroyMethod="")
public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
log.debug("Generating OmegaXmlFileWriter");
return new OmegaXmlFileWriter(staxTransactionItemWriter(omegaXmlHeaderCallBack, path, submissionAccount));
}
@StepScope
@Bean(destroyMethod="")
public CompositeItemWriter<ProcessorWriterDto> omegaXmlCompositeItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
log.debug("Generating CompositeItemWriter");
CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(omegaXmlHeaderCallBack, path, submissionAccount)));
return compositeItemWriter;
}
}
Stack trace when try to delete the file
2021-06-27 10:14:09,270 DEBUG c.t.e.l.StepResultListener [europeanGateway-1] StepResultListener called afterStep for submitter 12345
2021-06-27 10:14:09,278 INFO o.s.b.c.s.AbstractStep [europeanGateway-1] Step: [OmegaXmlExtractAndReplacePersonalDataStep] executed in 1s873ms
2021-06-27 10:14:09,355 INFO o.s.b.c.j.SimpleStepHandler [europeanGateway-1] Executing step: [moveFileStep]
2021-06-27 10:14:09,387 ERROR o.s.b.c.s.AbstractStep [europeanGateway-1] Encountered an error executing step moveFileStep in job omegaXmlExtractAndReplacePersonalDataJob
java.nio.file.FileSystemException: \xbus\europeangateway\processing\PRO_529900G3SW56SHYNPR95_20180403T145725003_MIXXXXXXT.XML: The process cannot access the file because it is being used by another process.
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) ~[?:1.8.0_291]
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) ~[?:1.8.0_291]
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) ~[?:1.8.0_291]
at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) ~[?:1.8.0_291]
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) ~[?:1.8.0_291]
at java.nio.file.Files.delete(Files.java:1126) ~[?:1.8.0_291]
at com.trax.europeangateway.config.OmegaXmlBatchConfig.lambda$1(OmegaXmlBatchConfig.java:119) ~[classes/:?]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_291]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_291]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_291]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_291]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at com.sun.proxy.$Proxy142.execute(Unknown Source) [?:?]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_291]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_291]
2021-06-27 10:14:09,406 INFO o.s.b.c.s.AbstractStep [europeanGateway-1] Step: [moveFileStep] executed in 50ms
2021-06-27 10:14:09,437 ERROR c.t.e.l.JobResultListener [europeanGateway-1] ExtractPersonalData failed for submission account 12345
2021-06-27 10:14:09,451 INFO o.s.b.c.l.s.SimpleJobLauncher$1 [europeanGateway-1] Job: [SimpleJob: [name=omegaXmlExtractAndReplacePersonalDataJob]] completed with the following parameters: [{currentTime=1624785246755, submission.account=12345, file.path=\xbus\europeangateway\processing\PRO_529900G3SW56SHYNPR95_20180403T145725003_MIXXXXXXT.XML}] and the following status: [FAILED] in 2s508ms
Upvotes: 0
Views: 1774
Reputation: 31640
Your issue is that you are using a CompositeItemWriter
with two delegates but you are not registering the delegates as streams in the step. What happens in this case is that the ItemStream#close
method is not called and hence resources are not correctly released.
As mentioned in the reference documentation, you need to manually register delegate writers (edsClientItemWriter
and omegaXmlFileWriter
in your case) as item streams in the step.
Upvotes: 2