Reputation: 973
I am trying to poll an API as fast and as efficiently as possible to get market data. The API allows you to get market data from batchSize markets per request. The API allows you to have 3 concurrent requests but no more (or throws errors).
I may be requesting data from many more than batchSize different markets.
I continuously loop through all of the markets, requesting the data in batches, one batch per thread and 3 threads at any time.
The total number of markets (and hence batches) can change at any time.
I'm using the following code:
private static object lockObj = new object();
private void PollMarkets()
{
const int NumberOfConcurrentRequests = 3;
for (int i = 0; i < NumberOfConcurrentRequests; i++)
{
int batch = 0;
Task.Factory.StartNew(async () =>
{
while (true)
{
if (markets.Count > 0)
{
List<string> batchMarketIds;
lock (lockObj)
{
var numBatches = (int)Math.Ceiling((double)markets.Count / batchSize);
batchMarketIds = markets.Keys.Skip(batch*batchSize).Take(batchSize).ToList();
batch = (batch + 1) % numBatches;
}
var marketData = await GetMarketData(batchMarketIds);
// Do something with marketData
}
else
{
await Task.Delay(1000); // wait for some markets to be added.
}
}
}
});
}
}
Even though there is a lock in the critical section, each thread starts with batch = 0 (each thread is often polling for duplicate data).
If I change batch to a private volatile field the above code works as I want it to (volatile and lock).
So for some reason my lock doesn't work? I feel like it's something obvious but I'm missing it.
I believe that it is best here to use a lock instead of a volatile field, is this also correct?
Thanks
Upvotes: 0
Views: 138
Reputation: 5843
In my mind you should use Queue<> to create a jobs pipeline.
Something like this
private int batchSize = 10;
private Queue<int> queue = new Queue<int>();
private void AddMarket(params int[] marketIDs)
{
lock (queue)
{
foreach (var marketID in marketIDs)
{
queue.Enqueue(marketID);
}
if (queue.Count >= batchSize)
{
Monitor.Pulse(queue);
}
}
}
private void Start()
{
for (var tid = 0; tid < 3; tid++)
{
Task.Run(async () =>
{
while (true)
{
List<int> toProcess;
lock (queue)
{
if (queue.Count < batchSize)
{
Monitor.Wait(queue);
continue;
}
toProcess = new List<int>(batchSize);
for (var count = 0; count < batchSize; count++)
{
toProcess.Add(queue.Dequeue());
}
if (queue.Count >= batchSize)
{
Monitor.Pulse(queue);
}
}
var marketData = await GetMarketData(toProcess);
}
});
}
}
Upvotes: 1
Reputation: 4670
The issue was that you were defining the batch variable inside the for loop. That meant that the threads were using their own variable instead of sharing it.
Upvotes: 2