Reputation: 770
I'm reading a file through a java nio interface, directly to a stream. This launches async http requests and handles these in the future. Every 10,000 records, I upload this result to a server and I clear the records, so this clears my memory consumption.
I start with the byte array, that stays in the memory constantly. The http client (commons CloseableHttpAsyncClient
) fires the requests async, so these are fired all at once in the beginning.
Is there a way to limit the lambda stream in a way that I can limit the number of lines that are processed at the same time? Thus controlling my memory.
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
.lines()
.map(line -> CsvLine.create(line))
.filter(line -> !line.isHeader())
.forEach(line -> getResult(line, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
result.addLine(response);
} catch (IOException e) {
LOGGER.error("IOException, cannot write to server", e);
todo.set(-1); // finish in error
} finally {
todo.decrementAndGet();
}
}
@Override
public void failed(Exception ex) {
handleError();
}
@Override
public void cancelled() {
handleError();
}
}
));
Upvotes: 4
Views: 476
Reputation: 6471
You might try using a Semaphore to throttle your stream so that only a certain maximum async requests are outstanding at a time. It might look like this:
Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS, true); // false if FIFO is not important
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
.lines()
.map(line -> CsvLine.create(line))
.filter(line -> !line.isHeader())
.forEach(line -> {
try {
if (!semaphore.tryAcquire(ASYNC_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)) {
handleTimeout();
} else {
getResult(line, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
result.addLine(response);
} catch (IOException e) {
LOGGER.error("IOException, cannot write to server", e);
todo.set(-1); // finish in error
} finally {
todo.decrementAndGet();
semaphore.release();
}
}
@Override
public void failed(Exception ex) {
handleError();
semaphore.release();
}
@Override
public void cancelled() {
handleError();
semaphore.release();
}
}
);
}
} catch (InterruptedException e) {
// handle appropriately
}
});
Upvotes: 1