KAs
KAs

Reputation: 1868

Implement producer-consumer in Java with BlockingQueue seems losing data after consume

I tried to build a API in Spring Boot, every time saveRecord is called, it will be run in a thread from ThreadPool. What I intend to do is buffer records in the BlockingQueue, when the size is greater than 400, take 400 records out of the BlockingQueue and put to file (only for testing).

private int putRecordBatchSize = 400;
private AtomicInteger counter = new AtomicInteger(0);
private BlockingQueue<String> buffer = new ArrayBlockingQueue<>(2000, true);

@Async
public void saveRecord() throws UnsupportedEncodingException {
    int cur = counter.incrementAndGet();
    if(buffer.add(cur+"\n") == false)   {
        logger.error(format("buffer add failed. buffer_remaining_cap=[%d]", buffer.remainingCapacity()));
    }

    if(buffer.size() >= putRecordBatchSize) {
        logger.info("[PutRecordBatch] start.");
        List<String> drainedRecords = new ArrayList<>(putRecordBatchSize);
        buffer.drainTo(drainedRecords, putRecordBatchSize);

        //TODO test code
        drainedRecords.stream().forEach(k -> {
            try {
                Files.write(Paths.get("text.txt"), k.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        try {
            Thread.currentThread().sleep(3000); // mocking time-consuming operation
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        logger.info("[PutRecordBatch] end.");
        buffer.clear();
    }
}

But from the text.txt, it seems like the data added to the Queue between the start and end of drainTo operation is missing somehow, which looks like below. Could anyone help me out? Thanks a lot!

389
390
391
392
393
394
395
396
397
398
399
400  // This is the border for every drainTo operation
430
431
432
433
434
435
436
437
438
439

Upvotes: 0

Views: 944

Answers (1)

Thomas Kl&#228;ger
Thomas Kl&#228;ger

Reputation: 21620

For me it's no surprise that you are losing records:

buffer.drainTo(drainedRecords, putRecordBatchSize);

removes putRecordBatchSize elements from the buffer.

After writing out those records, you do a

buffer.clear();

which removes those elements that have been added while you were processing the previous elements.

Since buffer.drainTo(drainedRecords, putRecordBatchSize); already removes the elements that you are processing there is no need for buffer.clear();!

Upvotes: 2

Related Questions