Reputation: 41290
I have a long-running boolean function which should be executed in an array and I want to return immediately if an element in the array satisfies the condition. I would like to do the search in parallel and terminate other threads when the first complete thread returns an correct answer.
What is a good way to implement parallel exists function in F#? Since my goal is performance, an efficient solution is preferred to an easy or idiomatic one.
Suppose that I want to find whether one value exists in an array or not. And the comparison function (equals
) is simulated as a computation-expensive one:
open System.Diagnostics
open System.Threading
// Source at http://parallelpatterns.codeplex.com/releases/view/50473
let doCpuIntensiveOperation seconds (token:CancellationToken) throwOnCancel =
if (token.IsCancellationRequested) then
if (throwOnCancel) then token.ThrowIfCancellationRequested()
false
else
let ms = int64 (seconds * 1000.0)
let sw = new Stopwatch()
sw.Start()
let checkInterval = Math.Min(20000000, int (20000000.0 * seconds))
// Loop to simulate a computationally intensive operation
let rec loop i =
// Periodically check to see if the user has requested
// cancellation or if the time limit has passed
let check = seconds = 0.0 || i % checkInterval = 0
if check && token.IsCancellationRequested then
if throwOnCancel then token.ThrowIfCancellationRequested()
false
elif check && sw.ElapsedMilliseconds > ms then
true
else
loop (i + 1)
// Start the loop with 0 as the first value
loop 0
let inline equals x y =
doCpuIntensiveOperation 0.01 CancellationToken.None false |> ignore
x = y
The array consists of 1000 randomly generated elements and the searching value is guaranteed in the 2nd half of the array (so sequential search has to go through at least a half of the array):
let rand = new System.Random()
let m = 1000
let N = 1000000
let xs = [|for _ in 1..m -> rand.Next(N)|]
let i = rand.Next((m-1)/2, m-1);;
#time "on";;
let b1 = parallelExists (equals xs.[i]) xs;; // Parallel
let b2 = Array.exists (equals xs.[i]) xs;; // Sequential
Upvotes: 3
Views: 306
Reputation: 47914
I think you can take the following steps:
Spawn a number of workers (threads or async computations), and pass each an equal slice of the array and a cancellation token which will be shared by all workers
When a worker finds the searched item, it calls Cancel on the token (each worker should check the cancel state of the token on each iteration and bail if needed)
I don't have time at the moment to write the code, so there could be some detail I'm omitting.
This answer, and related question, may be helpful.
This is an example of what I'm thinking
open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
let getChunks size array =
let rec loop s n =
seq {
if n > 0 then
let r = n - size
if r > 0 then yield (s, size); yield! loop (s + size) r
else yield (s, size + r)
}
loop 0 (Array.length array)
[<Literal>]
let CHUNK_SIZE = 3
let parallelExists f (array:_[]) =
use cts = new CancellationTokenSource()
let rec checkSlice i n =
if n > 0 && not cts.IsCancellationRequested then
if f array.[i] then cts.Cancel()
else checkSlice (i + 1) (n - 1)
let workers =
array
|> getChunks CHUNK_SIZE
|> Seq.map (fun (s, c) -> Task.Factory.StartNew(fun () -> checkSlice s c))
|> Seq.toArray
try
Task.WaitAll(workers, cts.Token)
false
with :? OperationCanceledException -> true
Usage
let array = Array.init 10 id
let exists =
array |> parallelExists (fun i ->
Thread.Sleep(500)
i = 9)
printfn "%b" exists //true
Upvotes: 3
Reputation: 1110
The F# Powerpack has PSeq.exists which maps to PLINQ's ParallelEnumerable.Any which is part of the BCL. There's also ParallelEnumerable.First
I tried to decompile but wouldn't understand right away what was going on. So instead I went and executed the following side-effecting code to confirm that it's using some sort of cancellation once it found the element:
let elems = seq {
for x = 0 to 1000000 do
printfn "test"
yield x }
open System
open System.Linq;;
ParallelEnumerable.First (ParallelEnumerable.AsParallel(elems), Func<_,_>(fun x -> x = 1))
Upvotes: 1