FVlad
FVlad

Reputation: 317

Missing messages from TopicProcessor in Reactor 3

I'm running a simple test where I publish messages to TopicProcessor from 4 threads and in a subscriber I simply add them to a collection. The code is the following:

@Test
public void testProcessingMessages() throws Exception {
    int numberOfMessages = 1000;

    TopicProcessor<Integer> processor = TopicProcessor.create();

    ExecutorService executorService = Executors.newFixedThreadPool(4);

    Queue<Integer> messages = new ConcurrentLinkedQueue<>();

    processor.subscribe(messages::add);

    AtomicInteger counter = new AtomicInteger(0);
    for (int i = 0; i < numberOfMessages; i++) {
        executorService.submit(() -> {
            processor.onNext(counter.incrementAndGet());
        });
    }

    Thread.sleep(10000);

    assertEquals(numberOfMessages, messages.size());
}

But the assertion in the end fails with usually around 980-990 actual messages instead of expected 1000. Am I missing something?

Upvotes: 1

Views: 480

Answers (1)

FVlad
FVlad

Reputation: 317

The problem was that TopicProcessor.create creates a processor which expects publishing from a single thread. TopicProcessor.share should be used when producing from multiple threads.

Upvotes: 3

Related Questions