Reputation: 1012
The following multi threading example writes to a TopicProcessor
from two different threads, and reads from the TopicProcessor
in two different threads. However, there is a race condition somewhere such that not all events are passed to the subscribers, causing the application to hang forever in processed.await()
. Does anyone see why?
import reactor.core.publisher.Flux;
import reactor.extra.processor.TopicProcessor;
import reactor.extra.processor.WaitStrategy;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.util.Arrays.asList;
public class ReactorTopicProcessorSample {
public static class Producer implements Callable<Void> {
final String name;
final List<String> data;
final CountDownLatch producerCount;
final TopicProcessor<String> topicProcessor;
public Producer(String name, List<String> data, CountDownLatch submitted, TopicProcessor<String> topicProcessor) {
this.name = name;
this.data = data;
this.producerCount = submitted;
this.topicProcessor = topicProcessor;
}
@Override
public Void call() throws Exception {
producerCount.countDown();
producerCount.await(); // wait until the other producer is submitted to be sure that they run in different threads
Flux.fromIterable(data)
.map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
.delayElements(Duration.ofMillis(10))
.subscribe(topicProcessor);
System.out.println("Submitted " + name + " producer in thread " + Thread.currentThread().getName() + ".");
return null;
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) { // this sample doesn't hang every time. repeat a few times to make it reproducible
realMain(args);
System.out.println("\n--- the previous run was successful. running again ---\n");
}
}
public static void realMain(String[] args) throws InterruptedException {
List<String> numbers = asList("1", "2", "3", "4", "5", "6", "7", "8");
List<String> characters = asList("a", "b", "c", "d", "e", "f", "g", "h");
CountDownLatch producerCount = new CountDownLatch(2);
CountDownLatch subscriberCount = new CountDownLatch(2);
CountDownLatch processed = new CountDownLatch(
(int) subscriberCount.getCount() * (numbers.size() + characters.size()));
ExecutorService exec = Executors.newFixedThreadPool((int) producerCount.getCount());
TopicProcessor<String> topicProcessor = TopicProcessor.<String>builder()
.share(true)
.name("topic-processor")
.bufferSize(16)
.waitStrategy(WaitStrategy.liteBlocking())
.build();
Flux<String> flux = Flux.from(topicProcessor)
.doOnSubscribe(s -> subscriberCount.countDown());
flux.subscribe(out -> {
System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
processed.countDown();
});
flux.subscribe(out -> {
System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
processed.countDown();
});
subscriberCount.await();
exec.submit(new Producer("number", numbers, producerCount, topicProcessor));
exec.submit(new Producer("character", characters, producerCount, topicProcessor));
processed.await();
exec.shutdown();
topicProcessor.shutdown();
}
}
Dependencies
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
<version>3.3.2.RELEASE</version>
</dependency>
Example behavior: The subscriber only receives characters or only receives numbers, causing the program to wait forever in processed.await()
. This does not happen every time, sometimes it works as expected.
Upvotes: 2
Views: 839
Reputation: 1271
If I understood well, you want to have two producers who produces data in parallel and two consumers who consumes in parallel.
First of all you need to understand how the reactor or RxJava works. You need to read about cold publisher (a publisher that start publishing data once a subscriber subscribes to it).
Now going back to your code, if you take a look over the marble diagram of TopicProcessor you will see that this class is used to stream data in parallel to many consumers from a single producer. Your race condition is caused by the improper usage of TopicProcessor.
To fix the problem, you have to merge the producers and to subscribe the topic processor to this one. Here is the example:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.extra.processor.TopicProcessor;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static java.util.Arrays.asList;
public class ReactorTopicProcessorSampleFixed {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) { // this sample doesn't hang every time. repeat a few times to make it reproducible
realMain(args);
System.out.println("\n--- the previous run was successful. running again ---\n");
}
}
public static void realMain(String[] args) throws InterruptedException {
List<String> numbers = asList("1", "2", "3", "4", "5", "6", "7", "8");
List<String> characters = asList("a", "b", "c", "d", "e", "f", "g", "h");
CountDownLatch subscriberCount = new CountDownLatch(2);
CountDownLatch processed = new CountDownLatch((int) subscriberCount.getCount() * (numbers.size() + characters.size()));
//the producers will not produce anything until a subscriber is subscribed to it.
//I used subscribeOn to produce the data on different threads.
Flux<String> mergedFlux = Flux.fromIterable(numbers)
.map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
.subscribeOn(Schedulers.boundedElastic())
.mergeWith(Flux.fromIterable(characters)
.map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
.subscribeOn(Schedulers.boundedElastic()));
TopicProcessor<String> topicProcessor = TopicProcessor.share("topic-processor", 16);
Flux<String> flux = Flux.from(topicProcessor).doOnSubscribe(s -> subscriberCount.countDown());
flux.subscribe(out -> {
System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
processed.countDown();
});
flux.subscribe(out -> {
System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
processed.countDown();
});
subscriberCount.await();
mergedFlux.subscribe(topicProcessor);
processed.await();
topicProcessor.shutdown();
}
}
The result of execution is this one:
Subscriber in thread topic-processor-200 received "1" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "2" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "1" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "2" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "3" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "4" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "5" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "6" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "7" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "3" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "4" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "5" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "8" from thread boundedElastic-2
Subscriber in thread topic-processor-200 received "a" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "b" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "6" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "7" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "8" from thread boundedElastic-2
Subscriber in thread topic-processor-199 received "a" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "b" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "c" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "d" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "e" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "f" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "c" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "d" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "e" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "f" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "g" from thread boundedElastic-1
Subscriber in thread topic-processor-200 received "h" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "g" from thread boundedElastic-1
Subscriber in thread topic-processor-199 received "h" from thread boundedElastic-1
Please let me know if this is what you are looking for.
Upvotes: 1