Reputation: 87
i find the user guide of Lmax disrupter in github is very simple, now i have a problem with one producer and five cosumer, after that i need to conclude the result of the consumer, is there any demo, how to find a Lmax Disruptor diamond(one producer 5 consumer 1 conclude)example?
thanks very much!
Upvotes: 1
Views: 861
Reputation: 76
You can provide several consumers via varags to Disruptor.handleEventsWith
. Afterwards register the conclusion with a call of then
(fluent DSL). The second call ensures that events are handled by all consumers before being passed to the concluding step.
A working example can look like this:
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;
import java.util.concurrent.*;
public class Diamond {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, 1024, executor, ProducerType.SINGLE, new SleepingWaitStrategy());
//register five consumers and a final conclude
disruptor.handleEventsWith(new Consumer(1), new Consumer(2), new Consumer(3), new Consumer(4), new Consumer(5)).then(new Conclude());
disruptor.start();
for (int i = 0; i < 3; i++) {
disruptor.publishEvent((event, sequence, newValue) -> event.setValue(newValue), i);
}
disruptor.shutdown();
executor.shutdown();
}
public static class Consumer implements EventHandler<LongEvent> {
private int i;
public Consumer(int i) { this.i = i; }
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Consumer: " + i);
event.setValue(event.getValue() + 1);
}
}
public static class Conclude implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Conclude: " + event.getValue());
}
}
public static class LongEvent
{
private long value;
public void setValue(long value)
{
this.value = value;
}
public long getValue() {
return this.value;
}
}
}
The events simply contain a long value. The consumers increment the value, the final step prints it. The for
loop puts three events with initial values 1, 2, and 3 into the ring.
Note that you do not need to synchronize the work on the LongEvent
in the Consumer
as the ringbuffer ensures that only one handler is working on an event at once. Furthermore note how the prints of the consumers vary on several runs.
Upvotes: 3