Reputation: 2465
I currently am using observables to manage messages being generated on bus which are being pushed over various streams.
All works well but as messages can come in, it's possible for the system to try and write multiple messages to the stream at once (i.e. messages coming in from multiple threads) or that messages are published quicker than they can be written to the stream... as you can image, this causes issues when writing.
Hence I'm trying to figure out how I can organize things so that when messages come in only one will be processed at a time. Any thoughts?
public class MessageStreamResource : IResourceStartup
{
private readonly IBus _bus;
private readonly ISubject<string> _sender;
public MessageStreamResource(IBus bus)
{
_bus = bus;
_senderSubject = new Subject<string>();
//`All` can publish messages at the same time as it's
//collecting data being generated from different threads
_bus.All.Subscribe(message => Observable.Start(() => ProcessMessage(message), TaskPoolScheduler.Default));
//Note the above hops off the calls context so that the
//writing to the stream wont slow down the caller.
}
public void Configure(IAppBuilder app)
{
app.Map("/stream", async context =>
{
...
await context.Response.WriteAsync("Lets party!\n");
await context.Response.Body.FlushAsync();
var unSubscribe = _sender.Subscribe(async t =>
{
//PROBLEM HERE
//I only want this callback to be executed
//one at a time...
await context.Response.WriteAsync($"{t}\n");
await context.Response.Body.FlushAsync();
});
...
await HoldOpenTask;
});
}
private void ProcessMessage(IMessage message)
{
_sender.OnNext(message.Payload);
}
}
Upvotes: 1
Views: 211
Reputation: 61666
If I understood the question correctly, this possibly can be done with SemaphoreSlim
:
// ...
var semaphore = new SemaphoreSlim(initialCount: 1);
var unSubscribe = _sender.Subscribe(async t =>
{
//PROBLEM HERE
//I only want this callback to be executed
//one at a time...
await semaphore.WaitAsync();
try
{
await context.Response.WriteAsync($"{t}\n");
await context.Response.Body.FlushAsync();
}
finally
{
semaphore.Release();
}
});
SemaphoreSlim
is IDisposable
, make sure to dispose of it when appropriate.
Updated, from the second look, MapExtensions.Map
accepts Action<IAppBuilder>
, so you're passing an async void
lambda, essentially creating a bunch of fire-and-forget asynchronous operations. The Map
call will return to the caller, while they may still be lingering around. This is most likely not what you want, is it?
Upvotes: 2