msuzuki
msuzuki

Reputation: 145

Spring Batch - FlatFileItemWriter Error 14416: Stream is already closed

Basically I have a Spring Batch that queries a Database and implements Partitioner to get the Jobs, and assign the Jobs to a ThreadPoolTaskExecutors in a SlaveStep.

The Reader reads (Job) from the Database. The Writer loads the data into a csv file in an Azure Blob Storage.

The Job Partitioner and Reader works fine. The Writer writes to one file, then it closes, and the other jobs cannot finish because the stream is closed. I get the following error:

Reading: market1
Reading: market2
Reading: market3
Reading: market4
Reading: market5
Writter: /upload-demo/market3_2021-06-01.csv
Writter: /upload-demo/market5_2021-06-01.csv
Writter: /upload-demo/market4_63_2021-06-01.csv
Writter: /upload-demo/market2_2021-06-01.csv
Writter: /upload-demo/market1_11_2021-06-01.csv
2021-06-02 08:24:42.304 ERROR 20356 --- [ taskExecutor-3] c.a.storage.common.StorageOutputStream   : Stream is already closed.
2021-06-02 08:24:42.307  WARN 20356 --- [ taskExecutor-3] o.s.b.f.support.DisposableBeanAdapter    : Destroy method 'close' on bean with name 'scopedTarget.writer2' threw an exception: java.lang.RuntimeException: Stream is already closed.
Reading: market6
Writter: /upload-demo/market6_2021-06-01.csv

Here is my Batch Configuration:

@EnableBatchProcessing
@Configuration
public class BatchConfig extends DefaultBatchConfigurer {

    String connectionString = "azureConnectionString";

    String containerName = "upload-demo";

    String endpoint = "azureHttpsEndpoint";

    String accountName ="azureAccountName";
    String accountKey = "accountKey";

    StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
    BlobServiceClient client = new BlobServiceClientBuilder().connectionString(connectionString).endpoint(endpoint).buildClient();

    @Autowired
    private StepBuilderFactory steps;

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    @Qualifier("verticaDb")
    private DataSource verticaDataSource;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Autowired
    private ConsoleItemWriter consoleItemWriter;

    @Autowired
    private ItemWriter itemWriter;

    @Bean
    public Job job() throws Exception {
        return jobs.get("job1")
                .start(masterStep(null, null))
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean
    @JobScope
    public Step masterStep(@Value("#{jobParameters['startDate']}") String startDate,
                           @Value("#{jobParameters['endDate']}") String endDate) throws Exception {

        return steps.get("masterStep")
                .partitioner(slaveStep().getName(), new RangePartitioner(verticaDataSource, startDate, endDate))
                .step(slaveStep())
                .gridSize(5)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step slaveStep() throws Exception {
        return steps.get("slaveStep")
                .<MarketData, MarketData>chunk(100)
                .reader(pagingItemReader(null, null, null))
                .faultTolerant()
                .skip(NullPointerException.class)
                .skipPolicy(new AlwaysSkipItemSkipPolicy())
                .writer(writer2(null, null, null))  //consoleItemWriter
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader pagingItemReader(
            @Value("#{stepExecutionContext['MarketName']}") String marketName,
            @Value("#{jobParameters['startDate']}") String startDate,
            @Value("#{jobParameters['endDate']}") String endDate
            ) throws Exception {

System.out.println("Reading: " + marketName);

        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        Map<String, Order> sortKey = new HashMap<>();
        sortKey.put("xbin", Order.ASCENDING);
        sortKey.put("ybin", Order.ASCENDING);

        provider.setDataSource(this.verticaDataSource);
        provider.setDatabaseType("POSTGRES");
        provider.setSelectClause("SELECT MARKET AS market, EPSG AS epsg, XBIN AS xbin, YBIN AS ybin, " +
                        "LATITUDE AS latitude, LONGITUDE AS longitude, " +
                        "SUM(TOTALUPLINKVOLUME) AS totalDownlinkVol, SUM(TOTALDOWNLINKVOLUME) AS totalUplinkVol");
        provider.setFromClause("FROM views.geo_analytics");
        provider.setWhereClause(
                "WHERE market='" + marketName + "'" +
                        " AND STARTTIME >= '" + startDate + "'" +
                        " AND STARTTIME < '" + endDate + "'" +
                        " AND TOTALUPLINKVOLUME IS NOT NULL" +
                        " AND TOTALUPLINKVOLUME > 0" +
                        " AND TOTALDOWNLINKVOLUME IS NOT NULL" +
                        " AND TOTALDOWNLINKVOLUME > 0" +
                        " AND EPSG IS NOT NULL" +
                        " AND LATITUDE IS NOT NULL" +
                        " AND LONGITUDE IS NOT NULL" +
                        " AND XBIN IS NOT NULL" +
                        " AND YBIN IS NOT NULL"
        );
        provider.setGroupClause("GROUP BY XBIN, YBIN, MARKET, EPSG, LATITUDE, LONGITUDE");
        provider.setSortKeys(sortKey);

        JdbcPagingItemReader reader = new JdbcPagingItemReader();
        reader.setDataSource(this.verticaDataSource);
        reader.setQueryProvider(provider.getObject());
        reader.setFetchSize(1000);
        reader.setRowMapper(new BeanPropertyRowMapper() {
            {
                setMappedClass((MarketData.class));
            }
        });
        return reader;
    }

    @Bean
    @StepScope
    public FlatFileItemWriter<MarketData> writer2(@Value("#{jobParameters['yearMonth']}") String yearMonth,
                                                 @Value("#{stepExecutionContext['marketName']}") String marketName,
                                                 @Value("#{jobParameters['startDate']}") String startDate) throws URISyntaxException, InvalidKeyException, StorageException, IOException {

        AZBlobWriter<MarketData> writer = new AZBlobWriter<>();

        String fullPath =marketName + "_" + startDate + ".csv";
        String resourceString = "azure-blob://upload-demo/" + fullPath;

        CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
        CloudBlobContainer container2 = blobClient.getContainerReference(containerName);
        container2.createIfNotExists();

        AzureStorageResourcePatternResolver storageResourcePatternResolver = new AzureStorageResourcePatternResolver(client);
        Resource resource = storageResourcePatternResolver.getResource(resourceString);


System.out.println("Writter: " + resource.getURI().getPath().toString());

        writer.setResource(resource);
        writer.setStorage(container2);

        writer.setLineAggregator(new DelimitedLineAggregator<MarketData>() {
            {
                setDelimiter(",");
                setFieldExtractor(new BeanWrapperFieldExtractor<MarketData>() {
                    {
                        setNames(new String[] {
                                "market",
                                "epsg",
                                "xbin",
                                "ybin",
                                "latitude",
                                "longitude",
                                "totalDownlinkVol",
                                "totalUplinkVol"
                        });
                    }
                });
            }
        });
        return writer;
    }
}

Previously I ran into other issues, such as setting up the Resource for FlatFileWriter to Azure Blob, Spring Batch / Azure Storage account blob resource [container"foo", blob='bar'] cannot be resolved to absolute file path.

As suggested by @Mahmoud Ben Hassine, make an implementation of the FlatFileWriter for the Azure Blob.

The implementation for the FlatFileWriter I used as a base (GCP) from this post: how to configure FlatFileItemWriter to output the file to a ByteArrayRecource?

Here is the implementation of the Azure Blob:

public class AZBlobWriter<T> extends FlatFileItemWriter<T> {

    private CloudBlobContainer storage;
    private Resource resource;

    private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
    private OutputStream os;

    private String lineSeparator = DEFAULT_LINE_SEPARATOR;

    @Override
    public void write(List<? extends T> items) throws Exception {

        StringBuilder lines = new StringBuilder();
        for (T item : items) {
            lines.append(item).append(lineSeparator);
        }
        byte[] bytes = lines.toString().getBytes();
        try {
            os.write(bytes);
        }
        catch (IOException e) {
            throw new WriteFailedException("Could not write data.  The file may be corrupt.", e);
        }
        os.flush();
    }

    @Override
    public void open(ExecutionContext executionContext) {
        try {
            os = ((WritableResource)resource).getOutputStream();
            String bucket = resource.getURI().getHost();
            String filePath = resource.getURI().getPath().substring(1);

            CloudBlockBlob blob = storage.getBlockBlobReference(filePath);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (StorageException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void update(ExecutionContext executionContext) {
    }

    @Override
    public void close() {
        super.close();

        try {
            os.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void setStorage(CloudBlobContainer storage) {
        this.storage = storage;
    }
    @Override
    public void setResource(Resource resource) {
        this.resource = resource;
    }
}

Any help is greatly I appreciated. My apologies for the "dirt code", as I am still testing/developing it.

thx, Markus.

Upvotes: 1

Views: 756

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31600

You did not share the entire stack trace to see when this error happens exactly, but it seems that the close method is called more than once. I think this is not due to a concurrency issue, as I see you are using one writer per thread in a partitioned step. So I would make this method "re-entrant" by checking if the output stream is already closed before closing it (there is no isClosed method on an output stream, so you can use a custom boolean around that).

That said, I would first confirm that the close method is called twice and if so, investigate why is that and fix the root cause.

Upvotes: 2

Related Questions