Reputation: 7915
How to wait for subscribers method completion when calling onNext and the subscriber is on another thread/task? I miss something like await onNext() e.g. for Subject...
For example when running on the same Thread:
[Test, Explicit]
public void TestSyncOnSameTask()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
using (var subject = new Subject<int>())
using (subject
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId)))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
}
}
Will print the output:
Starting on threadId:10
Received 3 on threadId:10
Received 2 on threadId:10
Received 1 on threadId:10
OnCompleted on threadId:10
When running on separate Threads (e.g. to offload the UI-Thread)
[Test, Explicit]
public void TestObserveOnSeparateTask()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
using (var subject = new Subject<int>())
using (subject.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(
o => { Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Task.Delay(1000 * o);
},
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId)))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
}
}
The following result as output will apear:
Starting on threadId:10
Received 3 on threadId:11
As you can see... the using block is left cause the subject call OnNext() returns immediately and does not wait for the completion of the subscriber Task.
How to "await" the subscriber call? Is the order guaranteed? I have not found good documentation for the Synchronize() method and what this method really synchronizes.
I look for something like:
await subject.OnNext(3);
await subject.OnNext(2);
In the "old school way" I have used a BlockingCollection as queue and
public async Task<IResponse> Request(IRequest request)
{
var taskCompletionSource = new TaskCompletionSource<IResponse>();
Task<IResponse> tsk = taskCompletionSource.Task;
Action<IResponse> responseHandler = taskCompletionSource.SetResult;
this.requestProcessor.Process(request, responseHandler);
return await tsk.ConfigureAwait(false);
}
and the requestprocessor is something like:
while (true)
{
//blockingCollection is shared between caller and runner
blockingCollection.TryTake(out request, Timeout.Infinite);
// call callback when finished to return to awaited method
}
Now I want to use RX/Observables instead but this offloading topic hits me... It is more about publish/subscribe with Reactive Extensions but I think I need something like request/response here. Any way to do this with RX?
Upvotes: 1
Views: 1224
Reputation: 10783
Why is it that you want to block? My guess is you are with doing this because you think you need to, or becuse you are in a test and want to block until the end of the test to validate the results.
I will tackle the test theory first.
In Rx, there is the concept of Schedulers (the IScheduler
interface).
You use these to control where and when work should be performed.
E.g. you can specify that work can be scheduled on a TaskPool/ThreadPool, a dedicated Thread or the Dispatcher (WPF).
You can also specify if the work should be done as soon as possible, or some time in the future.
These types are the preferred way for controlling time/concurrency, so Task.Delay(1000 * o);
can be removed form your code.
To replicate your provided test in teh Rx idomatic way, you would rewrite it (without subjects, dont get me started) using the TestScheduler
and the supporting ITestObserver<T>
and ITestObservable<T>
types.
// Define other methods and classes here
public void TestObserveOnSeparateTask()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks, 3),
ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks, 2),
ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks, 1),
ReactiveTest.OnCompleted<int>(TimeSpan.FromSeconds(4).Ticks)
);
var observer = scheduler.CreateObserver<int>();
var subscription = source
.ObserveOn(scheduler)
// .Subscribe(
// o =>
// {
// Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
// //Task.Delay(1000 * o);
// },
// () => Console.WriteLine("OnCompleted on threadId:{0}",
// Thread.CurrentThread.ManagedThreadId));
.Subscribe(observer);
scheduler.Start();
//Pretty silly test, as we apply no query/projection to the source.
// Note we add 1 tick due to it being a relative/cold observable and the ObserveOn scheduling will take one time slice to perform.
ReactiveAssert.AreElementsEqual(
new[] {
ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks + 1, 3),
ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks + 1, 2),
ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks + 1, 1),
ReactiveTest.OnCompleted<int>(TimeSpan.FromSeconds(4).Ticks + 1)
},
observer.Messages);
}
Ok, but what if you are blocking because you want to wait form some sort of result, or you want to know the source has completed?
In these cases, you want to embrace Rx and the continution patterns you can apply.
With operators like SelectMany
, Concat
etc you can get some sequence to only fire after some other sequence has produced a value, or perhaps completed.
I refer to these patterns as Asynchronous gates.
You can see Matt Barrett explain them in this video at 51:03.
He shows how in a fully async(Rx) system how you would only execute some activities when other precondition events have occurred.
I dont think I can give a better example without fully knowing what you are trying to achieve, but I doubt the answer is blocking, if the question involves "the UI thread".
With regards to the Synchronize()
method, this is purely for implementation of IObservable<T>
sequences that are not following the contract of being serialised.
You can apply that operator to enforce that contract.
In theory should not need to be used, if everyone is playing by the rules.
**Edit to discuss CQRS/ES **
With regards to your CQRS part of your question, you also can do this if you reduce your coupling. I imagine you want the issuing of a command to be somewhat request/response. i.e. you want to know if the command was accepted by the system and processed successfully. This is one concern. A separate concern would be to asynchronously update read-models.
In this case I would probably just use a Task
/Task<T>
as the return value of the method that accepts the Command.
Independently, read-models would be subscribed to the observable sequence representation of the events that result from commands being issued to the aggregate root.
As a super contrived example, I have humped together the most minimal implementation of a DDD ES that I could come up with to show Commands, Aggregates, repositories/EventStore and observing the events produced off the command.
void Main()
{
var repository = new EventStore();
repository.Events
.OfType<FooEvent>()
.Subscribe(evt=>UpdateReadModel(evt));
var aggreate = new MyAggregate(Guid.NewGuid());
var command = new FooCommand(1);
aggreate.Handle(command);
repository.Save(aggreate);
}
public void UpdateReadModel(FooEvent fooEvent)
{
Console.WriteLine(fooEvent.SomeValue);
}
// Define other methods and classes here
public class FooCommand
{
public FooCommand(int someValue)
{
SomeValue = someValue;
}
public int SomeValue { get; private set; }
}
public class FooEvent
{
public FooEvent(int someValue)
{
SomeValue = someValue;
}
public int SomeValue { get; private set; }
}
public interface IAggregate
{
List<object> GetUncommittedEvents();
void Commit();
}
public class MyAggregate : IAggregate
{
private readonly Guid _id;
private readonly List<object> _uncommittedEvents = new List<object>();
public MyAggregate(Guid id)
{
_id = id;
}
public List<Object> GetUncommittedEvents()
{
return _uncommittedEvents;
}
public void Commit()
{
_uncommittedEvents.Clear();
}
public void Handle(FooCommand command)
{
//As a result of processing the FooCommand we emit the FooEvent
var fooEvent = new FooEvent(command.SomeValue);
_uncommittedEvents.Add(fooEvent);
}
}
public class EventStore
{
private readonly ISubject<object> _events = new ReplaySubject<object>();
public IObservable<object> Events { get { return _events.AsObservable(); } }
public void Save(IAggregate aggregate)
{
foreach (var evt in aggregate.GetUncommittedEvents())
{
_events.OnNext(evt);
}
aggregate.Commit();
}
}
Now in the real world, you would be using something like GetEventStore.com, NEventStore or Cedar.EventStore, but the structure remains the same. You push commands at the aggregate, it produces events, those events are saved to the eventsource, the event source then emits those event (out of band/Async) to the readmodels.
Upvotes: 3
Reputation: 117027
To get what you want done, you could do this:
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var are = new AutoResetEvent(false);
using (var subject = new Subject<int>())
{
using (
subject
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Task.Delay(1000 * o);
}, () =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
are.WaitOne();
}
}
Console.WriteLine("Done.");
Upvotes: 3