Reputation: 225
I’m investigating the use of spring batch to process records from an encoded zipped file. The records are variable length with nested variable length data fields encoded within them.
I’m new to Spring and Spring Batch, this is how I plan to structure the batch configuration.
My initial problem is understanding how to set up the ItemReader, I’ve looked at some of the examples of using a FlatFileItemReader, but my difficulty is the expectation to have a Line Mapper. I don't see how I can do that in my case (no concept of a line in the file).
There are some articles indicating the use of a custom BufferedReaderFactory, but great to see a worked example of this.
Help would be appreciated.
Upvotes: 8
Views: 7257
Reputation: 3879
Tested that this simple configuration of reading lines from a zipped & encoded file in S3 works.
Key points:
BufferedReaderFactory
that uses Apache's GZIPInputStreamFactory
, and set that as the bufferedReaderFactory on the FlatFileItemReader
.SimpleStorageResourceLoader
from Spring Cloud with an AmazonS3Client
, and use it to get the zipped flat file in S3. Set that as the resource on the FlatFileItemReader
.Note: reading into a string can be easily replaced by reading into a POJO.
GZIPBufferedReaderFactory.java
Using Apache's GZIPInputStreamFactory
public class GZIPBufferedReaderFactory implements BufferedReaderFactory {
private final GZIPInputStreamFactory gzipInputStreamFactory;
public GZIPBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
this.gzipInputStreamFactory = gzipInputStreamFactory;
}
@Override
public BufferedReader create(Resource resource, String encoding) throws IOException {
return new BufferedReader(new InputStreamReader(gzipInputStreamFactory.create(resource.getInputStream()), encoding));
}
}
AWSConfiguration.java
@Configuration
public class AWSConfiguration {
@Bean
public AmazonS3Client s3Client(AWSCredentialsProvider credentials, Region region) {
ClientConfiguration clientConfig = new ClientConfiguration();
AmazonS3Client client = new AmazonS3Client(credentials, clientConfig);
client.setRegion(region);
return client;
}
}
How you configure the AWSCredentialsProvider
and Region
beans can vary and I will not detail that here since there is documentation elsewhere.
BatchConfiguration.java
@Configuration
@EnableBatchProcessing
public class SignalsIndexBatchConfiguration {
@Autowired
public AmazonS3Client s3Client;
@Bean
public GZIPInputStreamFactory gzipInputStreamFactory() {
return new GZIPInputStreamFactory();
}
@Bean
public GZIPBufferedReaderFactory gzipBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
return new GZIPBufferedReaderFactory(gzipInputStreamFactory);
}
@Bean
public SimpleStorageResourceLoader simpleStorageResourceLoader() {
return new SimpleStorageResourceLoader(s3Client);
}
@Bean
@StepScope
protected FlatFileItemReader<String> itemReader(
SimpleStorageResourceLoader simpleStorageResourceLoader,
GZIPBufferedReaderFactory gzipBufferedReaderFactory) {
FlatFileItemReader<String> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setBufferedReaderFactory(gzipBufferedReaderFactory);
flatFileItemReader.setResource(simpleStorageResourceLoader.getResource("s3://YOUR_FLAT_FILE.csv"));
flatFileItemReader.setLineMapper(new PassThroughLineMapper());
return flatFileItemReader;
}
@Bean
public Job job(Step step) {
return jobBuilderFactory.get("job").start(step).build();
}
@Bean
protected Step step(GZIPInputStreamFactory gzipInputStreamFactory) {
return stepBuilderFactory.get("step")
.<String, String> chunk(200)
.reader(itemReader(simpleStorageResourceLoader(), gzipBufferedReaderFactory(gzipInputStreamFactory)))
.processor(itemProcessor())
.faultTolerant()
.build();
}
/*
* These components are some of what we
* get for free with the @EnableBatchProcessing annotation
*/
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public JobRepository jobRepository;
/*
* END Freebies
*/
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
Upvotes: 4
Reputation: 904
From the feature request ticket to spring batch (https://jira.spring.io/browse/BATCH-1750):
public class GZIPResource extends InputStreamResource implements Resource {
public GZIPResource(Resource delegate) throws IOException {
super(new GZIPInputStream(delegate.getInputStream()));
}
}
The custom GZipBufferedReaderFactory
won't work with other than FlatFileItemReader
.
Edit: lazy version. This doesn't try to open the file until getInputStream
is called. This avoids exceptions due to that the file doesn't exist if you create the Resource at the program initialization (e.g. with autowiring).
public class GzipLazyResource extends FileSystemResource implements Resource {
public GzipLazyResource(File file) {
super(file);
}
public GzipLazyResource(String path) {
super(path);
}
@Override
public InputStream getInputStream() throws IOException {
return new GZIPInputStream(super.getInputStream());
}
}
Adding another similar method getOutputStream
won't work because spring uses the FileSystemResource.getFile
, not the FileSystemResource.getOutputStream
.
Upvotes: 2
Reputation: 225
My confusion was based around the file handling in the custom ItemReader, if I was to open and process the file in the read() method, I would have to keep track of where I was in the file etc. I managed to tackle this by creating a BufferedInputStream (BufferedInputStream(new GZIPInputStream(new FileInputStream(file)) in the constructor of the custom ItemReader, then process that stream in the read() method with each iteration of the step.
Upvotes: 0
Reputation: 6630
if the gzipped file is a simple txt file, you only need a custum BufferedReaderFactory, the linemaper then gets the String of the current line
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.springframework.batch.item.file.BufferedReaderFactory;
import org.springframework.core.io.Resource;
public class GZipBufferedReaderFactory implements BufferedReaderFactory {
/** Default value for gzip suffixes. */
private List<String> gzipSuffixes = new ArrayList<String>() {
{
add(".gz");
add(".gzip");
}
};
/**
* Creates Bufferedreader for gzip Resource, handles normal resources
* too.
*
* @param resource
* @param encoding
* @return
* @throws UnsupportedEncodingException
* @throws IOException
*/
@Override
public BufferedReader create(Resource resource, String encoding)
throws UnsupportedEncodingException, IOException {
for (String suffix : gzipSuffixes) {
// test for filename and description, description is used when
// handling itemStreamResources
if (resource.getFilename().endsWith(suffix)
|| resource.getDescription().endsWith(suffix)) {
return new BufferedReader(new InputStreamReader(new GZIPInputStream(resource.getInputStream()), encoding));
}
}
return new BufferedReader(new InputStreamReader(resource.getInputStream(), encoding));
}
public List<String> getGzipSuffixes() {
return gzipSuffixes;
}
public void setGzipSuffixes(List<String> gzipSuffixes) {
this.gzipSuffixes = gzipSuffixes;
}
}
simple itemreader configuration:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{jobParameters['input.file']}" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" />
</property>
<property name="strict" value="true" />
<property name="bufferedReaderFactory">
<bean class="your.custom.GZipBufferedReaderFactory" />
</property>
</bean>
Upvotes: 4