Reputation: 317
I'm running a simple test where I publish messages to TopicProcessor from 4 threads and in a subscriber I simply add them to a collection. The code is the following:
@Test
public void testProcessingMessages() throws Exception {
int numberOfMessages = 1000;
TopicProcessor<Integer> processor = TopicProcessor.create();
ExecutorService executorService = Executors.newFixedThreadPool(4);
Queue<Integer> messages = new ConcurrentLinkedQueue<>();
processor.subscribe(messages::add);
AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < numberOfMessages; i++) {
executorService.submit(() -> {
processor.onNext(counter.incrementAndGet());
});
}
Thread.sleep(10000);
assertEquals(numberOfMessages, messages.size());
}
But the assertion in the end fails with usually around 980-990 actual messages instead of expected 1000. Am I missing something?
Upvotes: 1
Views: 480
Reputation: 317
The problem was that TopicProcessor.create
creates a processor which expects publishing from a single thread. TopicProcessor.share
should be used when producing from multiple threads.
Upvotes: 3