Dan Barowy
Dan Barowy

Reputation: 2270

How can I block only until the first thread finishes?

I recently came across a case where it would be handy to be able to spawn a bunch of threads, block and wait for exactly one answer (the first one to arrive), cancelling the rest of the threads and then unblocking.

For example, suppose I have a search function that takes a seed value. Let us stipulate that the search function can be trivially parallelized. Furthermore, our search space contains many potential solutions, and that for some seed values, the function will search indefinitely, but that at least one seed value will yield a solution in a reasonable amount of time.

It would be great if I could to this search in parallel, totally naively, like:

let seeds = [|0..100|]
Array.Parallel.map(fun seed -> Search(seed)) seeds

Sadly, Array.Parallel.map will block until all of the threads have completed. Bummer. I could always set a timeout in the search function, but then I'm almost certain to wait for the longest-running thread to finish; furthermore, for some problems, the timeout might not be long enough.

In short, I'd like something sort of like the UNIX sockets select() call, only for arbitrary functions. Is this possible? It doesn't have to be in a pretty data-parallel abstraction, as above, and it doesn't have to be F# code, either. I'd even be happy to use a native library and call it via P/Invoke.

Upvotes: 1

Views: 365

Answers (4)

Servy
Servy

Reputation: 203818

You can create a bunch of tasks and then use Task.WaitAny or Task.WhenAny to either synchronously wait for the first task to finish or create a task that will be completed when the first task finishes, respectively.

A simple synchronous example:

var tasks = new List<Task<int>>();
var cts = new CancellationTokenSource();

for (int i = 0; i < 10; i++)
{
    int temp = i;
    tasks.Add(Task.Run(() =>
    {
        //placeholder for real work of variable time
        Thread.Sleep(1000 * temp);

        return i;
    }, cts.Token));
}

var value = Task.WaitAny(tasks.ToArray());
cts.Cancel();

Or for an asynchronous version:

public static async Task<int> Foo()
{
    var tasks = new List<Task<int>>();
    var cts = new CancellationTokenSource();

    for (int i = 0; i < 10; i++)
    {
        int temp = i;
        tasks.Add(Task.Run(async () =>
        {
            await Task.Delay(1000 * temp, cts.Token);
            return temp;
        }));
    }

    var value = await await Task.WhenAny(tasks);
    cts.Cancel();

    return value;
}

Upvotes: 4

kwingho
kwingho

Reputation: 422

let rnd = System.Random()

let search seed = async {
    let t = rnd.Next(10000)
    //printfn "seed: %d  ms: %d" seed t
    do! Async.Sleep t
    return sprintf "seed %d finish" seed
}

let processResult result = async {
    //Todo:
    printfn "%s" result
}

let cts = new System.Threading.CancellationTokenSource()
let ignoreFun _ = () //if you don't want handle

let tasks = 
    [0..10]
    |> List.map (fun i ->
        async {
            let! result = search i
            do! processResult result
            cts.Cancel()
        }
    )

Async.StartWithContinuations(Async.Parallel tasks, ignoreFun, ignoreFun, ignoreFun, cts.Token)

Upvotes: 3

paparazzo
paparazzo

Reputation: 45096

This seemed to work for me

namespace CancellParallelLoops
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] nums = Enumerable.Range(0, 10000000).ToArray();
            CancellationTokenSource cts = new CancellationTokenSource();

            // Use ParallelOptions instance to store the CancellationToken
            ParallelOptions po = new ParallelOptions();
            po.CancellationToken = cts.Token;
            po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
            Console.WriteLine("Press any key to start. Press 'c' to cancel.");
            Console.ReadKey();

            // Run a task so that we can cancel from another thread.
            Task.Factory.StartNew(() =>
            {
                if (Console.ReadKey().KeyChar == 'c')
                    cts.Cancel();
                Console.WriteLine("press any key to exit");
            });

            try
            {
                Parallel.ForEach(nums, po, (num) =>
                {
                    double d = Math.Sqrt(num);
                    Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
                    if (num == 1000) cts.Cancel();
                    po.CancellationToken.ThrowIfCancellationRequested();
                });
            }
            catch (OperationCanceledException e)
            {
                Console.WriteLine(e.Message);
            }

            Console.ReadKey();
        }
    }
}

Upvotes: 0

Jane M.
Jane M.

Reputation: 56

Try synchronizng all threads using an event object, when you find a solution set the event, all others threads have to check periodically for the event state and stop execution if it was already set.

For more details, look here.

Upvotes: 0

Related Questions