Reputation: 145
The situation I am uncertain of concerns the usage of a "threadsafe" PipeStream where multiple threads can add messages to be written. If there is no queue of messages to be written, the current thread will begin writing to the reading party. If there is a queue, and the queue grows while the pipe is writing, I want the thread that begun writing to deplete the queue.
I "hope" that this design (demonstrated below) discourages the continuous entering/releasing of the SemaphoreSlim and decrease the number of tasks scheduled. I say "hope" because I should test whether this complication has any positive performance implications. However, before even testing this I should first understand if the code does what I think it will, so please consider the following class, and below it a sequence of events;
Note: I understand that execution of tasks is not tied to any particular thread, but I find this is the easiest way to explain.
class SemaphoreExample
{
// Wrapper around a NamedPipeClientStream
private readonly MessagePipeClient m_pipe =
new MessagePipeClient("somePipe");
private readonly SemaphoreSlim m_semaphore =
new SemaphoreSlim(1, 1);
private readonly BlockingCollection<Message> m_messages =
new BlockingCollection<Message>(new ConcurrentQueue<Message>());
public Task Send<T>(T content)
where T : class
{
if (!this.m_messages.TryAdd(new Message<T>(content)))
throw new InvalidOperationException("No more requests!");
Task dequeue = TryDequeue();
return Task.FromResult(true);
// In reality this class (and method) is more complex.
// There is a similiar pipe (and wrkr) in the other direction.
// The "sent jobs" is kept in a dictionary and this method
// returns a task belonging to a completionsource tied
// to the "sent job". The wrkr responsible for the other
// pipe reads a response and sets the corresponding
// completionsource.
}
private async Task TryDequeue()
{
if (!this.m_semaphore.Wait(0))
return; // someone else is already here
try
{
Message message;
while (this.m_messages.TryTake(out message))
{
await this.m_pipe.WriteAsync(message);
}
}
finally { this.m_semaphore.Release(); }
}
}
Is this sequence of events possible? Should I forget this idea altogether and have every call to "Send" await on "TryDeque" and the semaphore within it? Perhaps the potential performance implications of scheduling another task per method call is negligible, even at a "high" frequency.
UPDATE:
Following the advice of Alex I am doing the following; Let the caller of "Send" specify a "maxWorkload" integer that specifies how many items the caller is prepared to do (for other callers, in the worst case) before delegating work to another thread to handle any extra work. However, before creating the new thread, other callers of "Send" is given an opportunity to enter the semaphore, thereby possibly preventing the use of an additional thread.
To not let any work be left lingering in the queue, any worker who successfully entered the semaphore and did some work must check if there is any new work added after exiting the semaphore. If this is true the same worker will try to re-enter (if "maxWorkload" is not reached) or delegate work as described above.
Example below: Send now sets up "TryPool" as a continuation of "TryDequeue". "TryPool" only begins if "TryDequeue" returns true (i.e. did some work while having entered the semaphore).
// maxWorkload cannot be -1 for this method
private async Task<bool> TryDequeue(int maxWorkload)
{
int currWorkload = 0;
while (this.m_messages.Count != 0 && this.m_semaphore.Wait(0))
{
try
{
currWorkload = await Dequeue(currWorkload, maxWorkload);
if (currWorkload >= maxWorkload)
return true;
}
finally
{
this.m_semaphore.Release();
}
}
return false;
}
private Task TryPool()
{
if (this.m_messages.Count == 0 || !this.m_semaphore.Wait(0))
return Task<bool>.FromResult(false);
return Task.Run(async () =>
{
do
{
try
{
await Dequeue(0, -1);
}
finally
{
this.m_semaphore.Release();
}
}
while (this.m_messages.Count != 0 && this.m_semaphore.Wait(0));
});
}
private async Task<int> Dequeue(int currWorkload, int maxWorkload)
{
while (currWorkload < maxWorkload || maxWorkload == -1)
{
Message message;
if (!this.m_messages.TryTake(out message))
return currWorkload;
await this.m_pipe.WriteAsync(message);
currWorkload++;
}
return maxWorkload;
}
Upvotes: 2
Views: 3936
Reputation: 13224
I tend to call this pattern the "GatedBatchWriter", i.e. the first thread through the gate handles a batch of tasks; its own and a number of others on behalf of other writers, until it has done enough work.
This pattern is primarily useful, when it is more efficient to batch work, because of overheads associated with that work. E.g. writing larger blocks to disk in one go, instead of multiple small ones.
And yes, this particular pattern has a specific race condition to be aware of: The "responsible writer", i.e. the one that got through the gate, determines that no more messages are in the queue and stops before releasing the semaphore (i.e. its write responsibility). A second writer arrived and in between those two decision points failed to acquire write responsibility. Now there is a message in the queue that will not be delivered (or delivered late, when the next writer arrives).
Additionally, what you are doing now, is not fair, in terms of scheduling. If there are many messages, the queue might never be empty, and the writer that got through the gate will be busy writing messages on behalf of the others for all eternity. You need to limit the batch size for the responsible writer.
Some other things you may want to change are:
Message
contain a task completion token.One more note: if there are a lot of messages, i.e. a high message load on average, a dedicated thread / long running task handling the queue will generally have a better performance.
Upvotes: 2