Gizz
Gizz

Reputation: 41

Reactive Extensions - raising async events and subscribing on specific threads

I have a series of modules that use a RX publish/subscribe model.

Here is the event registration code (repeated per subscribing module):

_publisher.GetEvent<DataEvent>()                 
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool)
    .Subscribe(module.OnDataEvent);  

The publisher is simple, thanks to José Romaniello's code:

public class EventPublisher : IEventPublisher
{
    private readonly ConcurrentDictionary<Type, object> _subjects = 
        new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>() 
    { 
        var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>()); 
        return subject.AsObservable(); 
    }
    public void Publish<TEvent>(TEvent sampleEvent)
    {
        object subject; 
        if (_subjects.TryGetValue(typeof(TEvent), out subject)) 
        { 
            ((ISubject<TEvent>)subject).OnNext(sampleEvent); 
        }
    }
}

Now my problem: As you can see above I used the .ObserveOn(Scheduler.TaskPool) method to spin off a new thread from the pool for every module, for every event. This is becuase I have lots of events and modules. The problem, of course, is that the events get mixed up in time order, as some events get fired close to each other and then end up calling the OnDataEvent callback in the wrong order (Each OnDataEvent carries a timestamp).

Is there a simple way to use RX to ensure the correct order of events? Or could I write my own Scheduler to ensure each module gets the events in sequence?

The events are published in the correct sequence, of course.

Thanks in advance.

Upvotes: 2

Views: 2838

Answers (2)

Enigmativity
Enigmativity

Reputation: 117175

Try using this implementation of the EventPublisher:

public class EventPublisher : IEventPublisher
{
    private readonly EventLoopScheduler _scheduler = new EventLoopScheduler(); 
    private readonly Subject<object> _subject = new Subject<object>();

    public IObservable<TEvent> GetEvent<TEvent>() 
    { 
        return _subject
            .Where(o => o is TEvent)
            .Select(o => (TEvent)o)
            .ObserveOn(_scheduler);
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        _subject.OnNext(sampleEvent);
    }
}

It uses an EventLoopScheduler to ensure that all events occur in order and on the same background thread.

Remove the ObserveOn from your subscription because if you observe on another thread you risk having events occur in the wrong order again.

Does this solve your problem?

Upvotes: 1

Ankur
Ankur

Reputation: 33657

Try the Synchronize method like:

_publisher.GetEvent<DataEvent>()                 
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool).Synchronize()
    .Subscribe(module.OnDataEvent);

Although I tried your scenario with same code and found that the data arrive in sequence and doesn't overlap. May be this is something specific to your application.

Upvotes: 0

Related Questions