jelle
jelle

Reputation: 770

Memory consumption java file stream

I'm reading a file through a java nio interface, directly to a stream. This launches async http requests and handles these in the future. Every 10,000 records, I upload this result to a server and I clear the records, so this clears my memory consumption.
I start with the byte array, that stays in the memory constantly. The http client (commons CloseableHttpAsyncClient) fires the requests async, so these are fired all at once in the beginning.
Is there a way to limit the lambda stream in a way that I can limit the number of lines that are processed at the same time? Thus controlling my memory.

new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
    .lines()
    .map(line -> CsvLine.create(line))
    .filter(line -> !line.isHeader())
    .forEach(line -> getResult(line, new FutureCallback<HttpResponse>() {
        @Override
        public void completed(HttpResponse response) {
            try {
                result.addLine(response);
            } catch (IOException e) {
                LOGGER.error("IOException, cannot write to server", e);
                todo.set(-1); // finish in error
            } finally {
                todo.decrementAndGet();
            }
       }

       @Override
       public void failed(Exception ex) {
           handleError();
       }

       @Override
       public void cancelled() {
           handleError();
       }
    }
));

Upvotes: 4

Views: 476

Answers (1)

Hank D
Hank D

Reputation: 6471

You might try using a Semaphore to throttle your stream so that only a certain maximum async requests are outstanding at a time. It might look like this:

Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS, true); // false if FIFO is not important
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
.lines()
        .map(line -> CsvLine.create(line))
        .filter(line -> !line.isHeader())
        .forEach(line -> {
            try {
                if (!semaphore.tryAcquire(ASYNC_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    handleTimeout();
                } else {
                    getResult(line, new FutureCallback<HttpResponse>() {
                        @Override
                        public void completed(HttpResponse response) {
                            try {
                                result.addLine(response);
                            } catch (IOException e) {
                                LOGGER.error("IOException, cannot write to server", e);
                                todo.set(-1); // finish in error
                            } finally {
                                todo.decrementAndGet();
                                semaphore.release();
                            }
                        }

                        @Override
                        public void failed(Exception ex) {
                            handleError();
                            semaphore.release();
                        }

                        @Override
                        public void cancelled() {
                            handleError();
                            semaphore.release();
                        }
                    }
                    );
                }
            } catch (InterruptedException e) {
                // handle appropriately
            }

        });

Upvotes: 1

Related Questions