lance-java
lance-java

Reputation: 28061

How to wait for all Disruptor messages to be consumed in a test case

I'm trying to unit test an application which publishes/consumes messages using the Disruptor ring buffer. In the test I would like to

  1. Configure the Disruptor and it's EventHandlers
  2. Publish messages onto the ring buffer
  3. Wait for all messages to be consumed by all EventHandlers
  4. Perform some assertions

Is there a simple way that I can wait for the messages to be consumed (step 3 above)?

Upvotes: 0

Views: 82

Answers (1)

lance-java
lance-java

Reputation: 28061

I ended up solving this with a loop which completes when RingBuffer.remainingCapacity() is equal to Disruptor.getBufferSize()

public class MyTest {
    private Disruptor<MyEvent> disruptor;
  
    @BeforeEach
    public void beforeEach() {
        disruptor = new Disruptor<>(...);
        disruptor.handleEventsWith(...);
        disruptor.start();
    }

    @AfterEach
    public void afterEach() {
        disruptor.shutdown();
    }

    @Test
    public void myTest() throws Exception {
        disruptor.publishEvent(...);
        disruptor.publishEvent(...);

        awaitRingBuffer();
        
        assertThat(...);
    }
 
    private void awaitRingBuffer() throws InterruptedException {
        RingBuffer<MyEvent> ringBuffer = disruptor.getRingBuffer();
        while (ringBuffer.remainingCapacity() != disruptor.getBufferSize()) {
            Thread.sleep(10);
        }
    }
}

Upvotes: 0

Related Questions