Reputation: 528
I'm not completely new to C#, but I'm not familiar enough with the language to know how to do what I need to do.
I have a file, call it File1.txt. File1.txt has 100,000 lines or so. I will duplicate File1.txt and call it File1_untested.txt. I will also create an empty file "Successes.txt" For each line in the file:
So, my question is, how can I multithread this?
My approach so far has been to create an object (LineChecker), give the object its line to check, and pass the object into a ThreadPool. I understand how to use ThreadPools for a few tasks with a CountdownEvent. However, it seems unreasonable to queue up 100,000 tasks all at once. How can I gradually feed the pool? Maybe 1000 lines at a time or something like that.
Also, I need to ensure that no two threads are adding to Successes.txt or removing from File1_untested.txt at the same time. I can handle this with lock(), right? What should I be passing into lock()? Can I use a static member of LineChecker?
I'm just trying to get a broad understanding of how something like this can be designed.
Upvotes: 4
Views: 329
Reputation: 27861
Since the tests takes a relatively significant amount of time then it makes sense to utilize multiple CPU cores. However, such utilization should be done only for the relatively expensive test, not for reading/updating the file. This is because reading/updating the file is relatively cheap.
Here is some example code that you can use:
Assuming the you have a relatively expensive Test method:
private bool Test(string line)
{
//This test is expensive
}
Here is a code sample that can utilize multiple CPU for testing:
Here we limit the number of items in the collection to 10, so that the thread that is reading from the file will wait for the other threads to catch up before reading more lines from the file.
This input thread will read much faster than the other threads can test, so we at the worst case we will have read 10 more lines than the testing threads have done testing. This makes sure we have good memory consumption.
CancellationTokenSource cancellation_token_source = new CancellationTokenSource();
CancellationToken cancellation_token = cancellation_token_source.Token;
BlockingCollection<string> blocking_collection = new BlockingCollection<string>(10);
using (StreamReader reader = new StreamReader(new FileStream(filename, FileMode.Open, FileAccess.Read)))
{
using (
StreamWriter writer =
new StreamWriter(new FileStream(success_filename, FileMode.OpenOrCreate, FileAccess.Write)))
{
var input_task = Task.Factory.StartNew(() =>
{
try
{
while (!reader.EndOfStream)
{
if (cancellation_token.IsCancellationRequested)
return;
blocking_collection.Add(reader.ReadLine());
}
}
finally //In all cases, even in the case of an exception, we need to make sure that we mark that we have done adding to the collection so that the Parallel.ForEach loop will exit. Note that Parallel.ForEach will not exit until we call CompleteAdding
{
blocking_collection.CompleteAdding();
}
});
try
{
Parallel.ForEach(blocking_collection.GetConsumingEnumerable(), (line) =>
{
bool test_reault = Test(line);
if (test_reault)
{
lock (writer)
{
writer.WriteLine(line);
}
}
});
}
catch
{
cancellation_token_source.Cancel(); //If Paralle.ForEach throws an exception, we inform the input thread to stop
throw;
}
input_task.Wait(); //This will make sure that exceptions thrown in the input thread will be propagated here
}
}
Upvotes: 2
Reputation: 61959
If your "test" was fast, then multithreading would not have given you any advantage whatsoever, because your code would be 100% disk-bound, and presumably you have all of your files on the same disk: you cannot improve the throughput of a single disk with multithreading.
But since your "test" will be waiting for a response from a webserver, this means that the test is going to be slow, so there is plenty of room for improvement by multithreading. Basically, the number of threads you need depends on how many requests the webserver can be servicing simultaneously without degrading the performance of the webserver. This number might still be low, so you might end up not gaining anything, but at least you can try.
If your file is not really huge, then you can read it all at once, and write it all at once. If each line is only 80 characters long, then this means that your file is only 8 megabytes, which is peanuts, so you can read all the lines into a list, work on the list, produce another list, and in the end write out the entire list.
This will allow you to create a structure, say, MyLine
which contains the index of each line and the text of each line, so that you can sort all lines before writing them, so that you do not have to worry about out-of-order responses from the server.
Then, what you need to do is use a bounding blocking queue like BlockingCollection
as @Paul suggested.
BlockingCollection
accepts as a constructor parameter its maximum capacity. This means that once its maximum capacity has been reached, any further attempts to add to it are blocked (the caller sits there waiting) until some items are removed. So, if you want to have up to 10 simultaneously pending requests, you would construct it as follows:
var sourceCollection = new BlockingCollection<MyLine>(10);
Your main thread will be stuffing sourceCollection
with MyLine
objects, and you will have 10 threads which block waiting to read MyLine
s from the collection. Each thread sends a request to the server, waits for a response, saves the result into a thread-safe resultCollection
, and attempts to fetch the next item from sourceCollection
.
Instead of using multiple threads you could instead use the async
features of C#, but I am not terribly familiar with them, so I cannot advice you on precisely how you would do that.
In the end, copy the contents of resultCollection
into a List
, sort the list, and write it to the output file. (The copy into a separate List
is probably a good idea because sorting the thread-safe resultCollection
will probably be much slower than sorting a non-thread-safe List
. I said probably.)
Upvotes: 2