Reputation: 2301
In a situation with multiple 'request threads' and one dumb 'worker thread', the request threads must queue.
Consider two possibilities:
Each request thread calls Monitor.Wait on its own dedicated object, which goes in a FIFO Queue. When a result arrives, the oldest object is pulsed.
All request threads take a number and call Monitor.Wait on a shared object. When a result arrives, Monitor.PulseAll is called on the shared object, and all request threads check to see if their number is up.
There may be other options, but please ignore them for the purposes of this question.
Question - When there are lots of queuing threads:
The lock objects are just 'new object()' instances.
My gut feeling is scenario 1 is more efficient, since only one thread will act when a pulse happens, and base object instances are very resource-light (right?). However, I do not understand the mechanics of Wait very well. Perhaps there are more resources required if more objects are being 'monitored'?
Thanks in advance for your insights.
I have written code below to illustrate the two scenarios.
Further explanation:
In my situation, the 'worker' thread accepts work and produces results asychronously. It does not know which request results belong to, except that the results are always produced in the same order in which the requests were received.
Although I do have an application for this, this question should be treated as academic. Please do not waste your time questioning the underlying assumptions or suggesting alternative solutions. However, questions to clarify the intent of the question are welcome.
using System;
using System.Collections.Generic;
using System.Threading;
namespace praccmd.threads
{
internal class Program
{
private static void Main(string[] args)
{
TestResets();
Console.WriteLine("press key");
Console.ReadKey();
}
private static void TestResets()
{
//lock object per work request
Console.WriteLine("----lock object per work request----");
for (int i = 1; i <= 10; i++)
{
Thread t = new Thread(ThreadLockObjPerRequest);
t.Name = "Thread_object_per_request_" + i;
t.Start();
}
//now pretend to be the WorkDone event
while (_ticketQueue.Count > 0)
{
Thread.Sleep(50);
lock (_receiveLock)
{
var doneTicketNext = _ticketQueue.Dequeue();
lock (doneTicketNext)
{
Monitor.Pulse(doneTicketNext);
Monitor.Wait(doneTicketNext);
}
}
}
//shared lock object (pulseall), one id per request
Console.WriteLine("----shared lock object----");
for (int i = 1; i <= 10; i++)
{
Thread t = new Thread(ThreadSharedLock);
t.Name = "Thread_shared_lock_object_" + i;
t.Start();
}
//now pretend to be the WorkDone event
while (_ticketNumberQueue.Count > 0)
{
Thread.Sleep(50);
lock (_sharedReceiveLock)
{
lock (_sharedLock)
{
_sharedLock.TicketNumber = _ticketNumberQueue.Dequeue();
Monitor.PulseAll(_sharedLock);
}
lock (_sharedThanksLock) Monitor.Wait(_sharedThanksLock);
}
}
}
//infrastructure for lock-object-per-request
private static readonly object _sendLock = new object();
private static readonly object _receiveLock = new object();
private static readonly Queue<object> _ticketQueue = new Queue<object>();
private static object TakeATicket()
{
var ticket = new object();
_ticketQueue.Enqueue(ticket);
return ticket;
}
//lock-object-per-request thread
private static void ThreadLockObjPerRequest()
{
var name = Thread.CurrentThread.Name;
object ticket;
lock (_sendLock)
{
ticket = TakeATicket();
//RequestWorkNonBlocking("some data specific to this request");
Console.WriteLine(name + " sends its request.");
}
var myResult = string.Empty;
lock (ticket)
{
Monitor.Wait(ticket);
//myResult = GetResultFromAStaticVariable();
Console.WriteLine(name + " gets its data.");
Monitor.Pulse(ticket);
}
//do something with myResult
}
//infrastructure for shared-lock
private class SharedLock { public int TicketNumber { get; set; } }
private static readonly SharedLock _sharedLock = new SharedLock { TicketNumber = 0 };
private static readonly dynamic _sharedReceiveLock = new object();
private static readonly dynamic _sharedThanksLock = new object();
private static readonly object _ticketIncrementLock = new object();
private static int _ticketNumber = 0;
private static readonly Queue<int> _ticketNumberQueue = new Queue<int>();
private static int TakeATicketNumber()
{
lock (_ticketIncrementLock)
{
_ticketNumberQueue.Enqueue(++_ticketNumber);
return _ticketNumber;
}
}
//thread for shared-lock
private static void ThreadSharedLock()
{
var name = Thread.CurrentThread.Name;
int ticketNumber;
lock (_sendLock)
{
ticketNumber = TakeATicketNumber();
//RequestWorkNonBlocking("some data specific to this request");
Console.WriteLine(name + " sends its request.");
}
var myResult = string.Empty;
do
{
lock (_sharedLock)
{
Monitor.Wait(_sharedLock);
if (_sharedLock.TicketNumber == ticketNumber)
{
myResult = "response"; //GetResultFromAStaticVariable();
Console.WriteLine(name + " gets its data.");
}
}
} while (myResult.Length == 0);
lock (_sharedThanksLock) Monitor.Pulse(_sharedThanksLock);
//do something with myResult
}
}
}
Upvotes: 2
Views: 980
Reputation: 1062770
Performance is always tricky, and will depend a lot on your specific context; you would probably have to measure it to get a good answer for that, noting that it probably depends on the number of expected outstanding tasks, etc.
The way I work this multiplexer scenario is using the Task
API; a new incoming request creates a TaskCompletionSource<T>
is created an enqueued (synchronized) into a queue, i.e. let's say each result (when it arrives later) is an int
:
private readonly Queue<TaskCompletionSource<int>> queue
= new Queue<TaskCompletionSource<int>>();
public Task<int> MakeRequest(...) {
var source = new TaskCompletionSource<int>();
lock(queue) {
queue.Enqueue(source);
}
return source.Task;
}
and then the worker, as results come in, can do something like:
private void SetNextResult(int value) {
TaskCompletionSource<int> source;
lock(queue) {
source = queue.Dequeue();
}
source.SetResult(value);
}
The nice things about this is that it allows each individual caller to decide how they want to respond to the delayed work:
.Wait
/ .Result
to block.ContinueWith
to add a callbackawait
to use a state-machine-based continuationUpvotes: 4