Reputation: 7877
I'm using Guava's EventBus to kick off some processing and report results. Here's a very simple compilable example:
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
public class Test {
public static class InitiateProcessing { }
public static class ProcessingStarted { }
public static class ProcessingResults { }
public static class ProcessingFinished { }
public static EventBus bus = new EventBus();
@Subscribe
public void receiveStartRequest(InitiateProcessing evt) {
System.out.println("Got processing request - starting processing");
bus.post(new ProcessingStarted());
System.out.println("Generating results");
bus.post(new ProcessingResults());
System.out.println("Generating more results");
bus.post(new ProcessingResults());
bus.post(new ProcessingFinished());
}
@Subscribe
public void processingStarted(ProcessingStarted evt) {
System.out.println("Processing has started");
}
@Subscribe
public void resultsReceived(ProcessingResults evt) {
System.out.println("got results");
}
@Subscribe
public void processingComplete(ProcessingFinished evt) {
System.out.println("Processing has completed");
}
public static void main(String[] args) {
Test t = new Test();
bus.register(t);
bus.post(new InitiateProcessing());
}
}
I use these events as a way for other software components to react in preparation for this processing. For example, they may have to save their current state before processing and restore it after.
I would expect the output of this program to be:
Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed
Instead, the actual output is:
Got processing request - starting processing
Generating results
Generating more results
Processing has started
got results
got results
Processing has completed
The event that is supposed to indicate that processing has started actually happens after the actual processing ("generating results").
After looking at the source code, I understand why it's behaving this way. Here's the relevant source code for the EventBus
.
/**
* Drain the queue of events to be dispatched. As the queue is being drained,
* new events may be posted to the end of the queue.
*/
void dispatchQueuedEvents() {
// don't dispatch if we're already dispatching, that would allow reentrancy
// and out-of-order events. Instead, leave the events to be dispatched
// after the in-progress dispatch is complete.
if (isDispatching.get()) {
return;
}
// dispatch event (omitted)
What's happening is since I'm already dispatching the top level InitiateProcessing
event, the rest of the events just get pushed to the end of the queue. I would like this to behave similar to .NET events, where invoking the event doesn't return until all handlers have completed.
I don't quite understand the reason for this implementation. Sure, the events are guaranteed to be in order, but the order of the surrounding code gets completely distorted.
Is there any way to get the bus to behave as described and produce the desired output? I did read in the Javadocs that
The EventBus guarantees that it will not call a subscriber method from multiple threads simultaneously, unless the method explicitly allows it by bearing the @AllowConcurrentEvents annotation.
But I don't think this applies here - I'm seeing this issue in a single threaded application.
Edit
The cause of the issue here is that I'm post
ing from within a subscriber. Since the event bus is not reentrant, these "sub-posts" get queued up and are handled after the first handler completes. I can comment out the if (isDispatching.get()) { return; }
section in the EventBus
source and everything behaves as I would expect - so the real question is what potential problems have I introduced by doing so? It seems the designers made a conscientious decision to not allow reentrancy.
Upvotes: 13
Views: 15428
Reputation: 703
I know this question is 4 years old, but I just ran into the same problem today. There is a simple (and counter-intuitive) change to get the behavior you want. Per https://stackoverflow.com/a/53136251/1296767, you can use an AsyncEventBus with a DirectExecutor:
public static EventBus bus = new AsyncEventBus(MoreExecutors.newDirectExecutorService());
Running your test code with the above change, the results are exactly what you want:
Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed
Upvotes: 3
Reputation: 1700
I try to summarize Guava's EventBus event delivery behaviour:
If an event E1 is posted at moment t1, all subscribers are notified. If one of the subscribers posts an event itself in it's @Subscribe-method (a tiny moment later), the "new" event E2 is enqueued and delivered afterwards. Afterwards means here: after all @Subscribe-methods for E1 from t1 did return.
Compare this kind of "cascaded" event posting to breadth first tree traversal.
It seems to be the explicit choosen design of EventBus.
Upvotes: 7
Reputation: 110054
EventBus
generally operates on the principle that the code posting an event to the bus shouldn't care about what the subscribers do with the events or when, other than that the order the events were posted in is respected (in the case of a synchronous event bus anyway).
If you want specific methods to be called at specific times in the course of your method and you want to be sure those methods complete before your method continues (as you seem to in your example), why not call those methods directly? When you use an event bus, you're explicitly separating your code from what exactly happens in response to a given event. This is desirable in many cases and is the main reason EventBus
exists, but it doesn't seem to be quite what you want here.
Upvotes: 7
Reputation: 4843
While posting to the EventBus does not return until all "subscribers" have been signaled .. those subscribers might NOT have begun execution. That means that when the first bus.post returns, you continue the next post without any intervening subscriber having begun to process.
public void post(Object event) Posts an event to all registered subscribers. This method will return successfully after the event has been posted to all subscribers, and regardless of any exceptions thrown by subscribers. If no subscribers have been subscribed for event's class, and event is not already a DeadEvent, it will be wrapped in a DeadEvent and reposted.
Parameters: event - event to post.
Upvotes: 2