Reputation: 41
I have a series of modules that use a RX publish/subscribe model.
Here is the event registration code (repeated per subscribing module):
_publisher.GetEvent<DataEvent>()
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool)
.Subscribe(module.OnDataEvent);
The publisher is simple, thanks to José Romaniello's code:
public class EventPublisher : IEventPublisher
{
private readonly ConcurrentDictionary<Type, object> _subjects =
new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>()
{
var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>());
return subject.AsObservable();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
object subject;
if (_subjects.TryGetValue(typeof(TEvent), out subject))
{
((ISubject<TEvent>)subject).OnNext(sampleEvent);
}
}
}
Now my problem: As you can see above I used the .ObserveOn(Scheduler.TaskPool) method to spin off a new thread from the pool for every module, for every event. This is becuase I have lots of events and modules. The problem, of course, is that the events get mixed up in time order, as some events get fired close to each other and then end up calling the OnDataEvent callback in the wrong order (Each OnDataEvent carries a timestamp).
Is there a simple way to use RX to ensure the correct order of events? Or could I write my own Scheduler to ensure each module gets the events in sequence?
The events are published in the correct sequence, of course.
Thanks in advance.
Upvotes: 2
Views: 2838
Reputation: 117175
Try using this implementation of the EventPublisher
:
public class EventPublisher : IEventPublisher
{
private readonly EventLoopScheduler _scheduler = new EventLoopScheduler();
private readonly Subject<object> _subject = new Subject<object>();
public IObservable<TEvent> GetEvent<TEvent>()
{
return _subject
.Where(o => o is TEvent)
.Select(o => (TEvent)o)
.ObserveOn(_scheduler);
}
public void Publish<TEvent>(TEvent sampleEvent)
{
_subject.OnNext(sampleEvent);
}
}
It uses an EventLoopScheduler
to ensure that all events occur in order and on the same background thread.
Remove the ObserveOn
from your subscription because if you observe on another thread you risk having events occur in the wrong order again.
Does this solve your problem?
Upvotes: 1
Reputation: 33657
Try the Synchronize method like:
_publisher.GetEvent<DataEvent>()
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool).Synchronize()
.Subscribe(module.OnDataEvent);
Although I tried your scenario with same code and found that the data arrive in sequence and doesn't overlap. May be this is something specific to your application.
Upvotes: 0