Reputation: 1868
I tried to build a API in Spring Boot, every time saveRecord
is called, it will be run in a thread from ThreadPool. What I intend to do is buffer records in the BlockingQueue
, when the size is greater than 400, take 400 records out of the BlockingQueue
and put to file (only for testing).
private int putRecordBatchSize = 400;
private AtomicInteger counter = new AtomicInteger(0);
private BlockingQueue<String> buffer = new ArrayBlockingQueue<>(2000, true);
@Async
public void saveRecord() throws UnsupportedEncodingException {
int cur = counter.incrementAndGet();
if(buffer.add(cur+"\n") == false) {
logger.error(format("buffer add failed. buffer_remaining_cap=[%d]", buffer.remainingCapacity()));
}
if(buffer.size() >= putRecordBatchSize) {
logger.info("[PutRecordBatch] start.");
List<String> drainedRecords = new ArrayList<>(putRecordBatchSize);
buffer.drainTo(drainedRecords, putRecordBatchSize);
//TODO test code
drainedRecords.stream().forEach(k -> {
try {
Files.write(Paths.get("text.txt"), k.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
e.printStackTrace();
}
});
try {
Thread.currentThread().sleep(3000); // mocking time-consuming operation
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("[PutRecordBatch] end.");
buffer.clear();
}
}
But from the text.txt, it seems like the data added to the Queue between the start and end of drainTo
operation is missing somehow, which looks like below. Could anyone help me out? Thanks a lot!
389
390
391
392
393
394
395
396
397
398
399
400 // This is the border for every drainTo operation
430
431
432
433
434
435
436
437
438
439
Upvotes: 0
Views: 944
Reputation: 21620
For me it's no surprise that you are losing records:
buffer.drainTo(drainedRecords, putRecordBatchSize);
removes putRecordBatchSize
elements from the buffer
.
After writing out those records, you do a
buffer.clear();
which removes those elements that have been added while you were processing the previous elements.
Since buffer.drainTo(drainedRecords, putRecordBatchSize);
already removes the elements that you are processing there is no need for buffer.clear();
!
Upvotes: 2