Reputation: 2270
I recently came across a case where it would be handy to be able to spawn a bunch of threads, block and wait for exactly one answer (the first one to arrive), cancelling the rest of the threads and then unblocking.
For example, suppose I have a search function that takes a seed value. Let us stipulate that the search function can be trivially parallelized. Furthermore, our search space contains many potential solutions, and that for some seed values, the function will search indefinitely, but that at least one seed value will yield a solution in a reasonable amount of time.
It would be great if I could to this search in parallel, totally naively, like:
let seeds = [|0..100|]
Array.Parallel.map(fun seed -> Search(seed)) seeds
Sadly, Array.Parallel.map
will block until all of the threads have completed. Bummer. I could always set a timeout in the search function, but then I'm almost certain to wait for the longest-running thread to finish; furthermore, for some problems, the timeout might not be long enough.
In short, I'd like something sort of like the UNIX sockets select()
call, only for arbitrary functions. Is this possible? It doesn't have to be in a pretty data-parallel abstraction, as above, and it doesn't have to be F# code, either. I'd even be happy to use a native library and call it via P/Invoke.
Upvotes: 1
Views: 365
Reputation: 203818
You can create a bunch of tasks and then use Task.WaitAny
or Task.WhenAny
to either synchronously wait for the first task to finish or create a task that will be completed when the first task finishes, respectively.
A simple synchronous example:
var tasks = new List<Task<int>>();
var cts = new CancellationTokenSource();
for (int i = 0; i < 10; i++)
{
int temp = i;
tasks.Add(Task.Run(() =>
{
//placeholder for real work of variable time
Thread.Sleep(1000 * temp);
return i;
}, cts.Token));
}
var value = Task.WaitAny(tasks.ToArray());
cts.Cancel();
Or for an asynchronous version:
public static async Task<int> Foo()
{
var tasks = new List<Task<int>>();
var cts = new CancellationTokenSource();
for (int i = 0; i < 10; i++)
{
int temp = i;
tasks.Add(Task.Run(async () =>
{
await Task.Delay(1000 * temp, cts.Token);
return temp;
}));
}
var value = await await Task.WhenAny(tasks);
cts.Cancel();
return value;
}
Upvotes: 4
Reputation: 422
let rnd = System.Random()
let search seed = async {
let t = rnd.Next(10000)
//printfn "seed: %d ms: %d" seed t
do! Async.Sleep t
return sprintf "seed %d finish" seed
}
let processResult result = async {
//Todo:
printfn "%s" result
}
let cts = new System.Threading.CancellationTokenSource()
let ignoreFun _ = () //if you don't want handle
let tasks =
[0..10]
|> List.map (fun i ->
async {
let! result = search i
do! processResult result
cts.Cancel()
}
)
Async.StartWithContinuations(Async.Parallel tasks, ignoreFun, ignoreFun, ignoreFun, cts.Token)
Upvotes: 3
Reputation: 45096
This seemed to work for me
namespace CancellParallelLoops
{
class Program
{
static void Main(string[] args)
{
int[] nums = Enumerable.Range(0, 10000000).ToArray();
CancellationTokenSource cts = new CancellationTokenSource();
// Use ParallelOptions instance to store the CancellationToken
ParallelOptions po = new ParallelOptions();
po.CancellationToken = cts.Token;
po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
Console.WriteLine("Press any key to start. Press 'c' to cancel.");
Console.ReadKey();
// Run a task so that we can cancel from another thread.
Task.Factory.StartNew(() =>
{
if (Console.ReadKey().KeyChar == 'c')
cts.Cancel();
Console.WriteLine("press any key to exit");
});
try
{
Parallel.ForEach(nums, po, (num) =>
{
double d = Math.Sqrt(num);
Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
if (num == 1000) cts.Cancel();
po.CancellationToken.ThrowIfCancellationRequested();
});
}
catch (OperationCanceledException e)
{
Console.WriteLine(e.Message);
}
Console.ReadKey();
}
}
}
Upvotes: 0