Reputation: 846
I have the following block of code that when run I expected to consume the blocking collection and within the Subscribe() action perform some async activity that is awaited on before RX moves onto the next entry in the observer sequence.
What actually happened is that all the entries are processed (or appear to be) in parallel.
The code as it stands at the moment is as follows!
uploadQueues[workspaceId] //BlockingCollection<FileEvent>>
.GetConsumingEnumerable()
.ToObservable()
.SubscribeOn(new TaskPoolScheduler(new TaskFactory()))
.Subscribe(
async fileEvent =>
{
//process file event
Debug.WriteLine($"Upload queue processor 1: {fileEvent.Event} | {fileEvent.SourcePath} => {fileEvent.DestPath}");
await Task.Delay(TimeSpan.FromSeconds(1));
})
Any pointers in the correct direction would be greatly appreciated.. also wondering if a instead of using RX I just simply spawn a long running TPL task that consumes from the blocking collection!
Thoughts?
Upvotes: 1
Views: 123
Reputation: 10783
[Upvoted @J.Lennon's answer]
Welcome to using Rx. As J.Lennon mentioned above, there is some improvement suggested to your code above.
As with any code synchronous or asynchronous we want to consider our resources and how we ensure that we clean then up afterwards. We also need to consider how to handle exceptions. In addition to these concerns, when working with asynchrony (and therefore concurrency) we have the additional concern of cancellation. We should provide the ability to cancel the operation and we should also make sure that cancellation does what the consumer would expect.
While I think that Rx is not the best suited tool for reading off queues (see http://introtorx.com/Content/v1.0.10621.0/01_WhyRx.html#Wont), if you really wanted to force Rx into the picture then you may also want to be sure that you meet the above concerns..
Code below uses
CancellationToken
s to allow the GetConsumingEnumerable()
to be cancelled (https://msdn.microsoft.com/en-us/library/dd395014(v=vs.110).aspx)EventLoopScheduler
to ensure a singe thread is dedicated to processing the queue. This also makes it serialized. (http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#EventLoopScheduler)Concat
tip from above to ensure the the async work you are doing is also serialised. (http://introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#Concat)OnError
handler. However this leads me back to why I dont like Rx for processing queues. What should you do in a failure? Attempt to re-read the message and fall into an infinite loop, or drop it on the floor and pretend it didn't happen? (http://introtorx.com/Content/v1.0.10621.0/03_LifetimeManagement.html#Subscribe)Sample code
IDictionary<string, BlockingCollection<object>> uploadQueues = new Dictionary<string, BlockingCollection<Object>>();
public IObservable<object> ListenToQueueEvents(string workspaceId)
{
//Not sure what the return value is here, using `Object` as a place holder.
// Note that we are using an overload that takes a `CancellationToken`
return Observable.Create<object>((obs, ct) =>
{
var els = new EventLoopScheduler(ts => new Thread(ts)
{
IsBackground = true,//? or false? Should it stop the process from terminating if it still running?
Name = $"{workspaceId}Processor"
});
var subscription = uploadQueues[workspaceId] //BlockingCollection<FileEvent>>
.GetConsumingEnumerable(ct) //Allow cancellation while wating for next item
.ToObservable(els) //Serialise onto a single thread.
.Select(evt=>TheAsyncThingIWasDoingInTheSubscribe(evt).ToObservable())
.Concat()
.Subscribe(obs);
//You could try to dispose of the els (EventLoopScheduler), But I have had issues doing so in the past.
// Leaving as Background should allow it to die (but non deterministically) :-(
return Task.FromResult(subscription);
});
}
private static Task<object> TheAsyncThingIWasDoingInTheSubscribe(object evt)
{
//The return of the async thing you were doing in the subscribe
return Task.FromResult(new Object());
}
Upvotes: 2
Reputation: 3361
The main problem with your Rx query is the async/await
part, because you aren't handling or taking care of the SynchronizationContext.Current
But they're other problems with your entire code.
Staring in the SubscribeOn
, TaskPoolSchedule
is IDisposable
class, you should implement and dispose that properly.
Also, when you are using ToObservable()
method it doesn't use any IScheduler
- check for details here. You could specify a EventLoopScheduler
to guarantee that just one single thread/resource is going to be used to process
(although is not necessary because the GetConsumingEnumerable
is already locking one thread just to consume).
If you just want to simulate a delay, the best way to do is:
enumerableItems.ToObservable()
.Select(a => Observable.Return(a).DelaySubscription(TimeSpan.FromSeconds(1)))
.Concat() // Keep push order
.Subscribe(
fileEvent =>
{
Debug.WriteLine(fileEvent);
});
DelaySubscription
is a little bit more efficient than Delay
just be aware, that the delay is not synchronous, so probably would will end up with in another Thread.ThreadId
, but that doesn't matter because the order and the sequence is going to be kept.
Now if you want to use async/await with Rx and keep it single threat, that is a problem for another question...
Upvotes: 2