pad
pad

Reputation: 41290

Parallel exists function in F#

Motivation

I have a long-running boolean function which should be executed in an array and I want to return immediately if an element in the array satisfies the condition. I would like to do the search in parallel and terminate other threads when the first complete thread returns an correct answer.

Question

What is a good way to implement parallel exists function in F#? Since my goal is performance, an efficient solution is preferred to an easy or idiomatic one.

Test case

Suppose that I want to find whether one value exists in an array or not. And the comparison function (equals) is simulated as a computation-expensive one:

open System.Diagnostics
open System.Threading

// Source at http://parallelpatterns.codeplex.com/releases/view/50473
let doCpuIntensiveOperation seconds (token:CancellationToken) throwOnCancel =
        if (token.IsCancellationRequested) then
            if (throwOnCancel) then token.ThrowIfCancellationRequested()
            false
        else
            let ms = int64 (seconds * 1000.0)
            let sw = new Stopwatch()
            sw.Start()
            let checkInterval = Math.Min(20000000, int (20000000.0 * seconds))

            // Loop to simulate a computationally intensive operation
            let rec loop i = 
                // Periodically check to see if the user has requested 
                // cancellation or if the time limit has passed
                let check = seconds = 0.0 || i % checkInterval = 0
                if check && token.IsCancellationRequested then
                    if throwOnCancel then token.ThrowIfCancellationRequested()
                    false
                elif check && sw.ElapsedMilliseconds > ms then
                    true
                else 
                  loop (i + 1)

            // Start the loop with 0 as the first value
            loop 0

let inline equals x y =
    doCpuIntensiveOperation 0.01 CancellationToken.None false |> ignore
    x = y

The array consists of 1000 randomly generated elements and the searching value is guaranteed in the 2nd half of the array (so sequential search has to go through at least a half of the array):

let rand = new System.Random()
let m = 1000
let N = 1000000
let xs = [|for _ in 1..m -> rand.Next(N)|]
let i = rand.Next((m-1)/2, m-1);;

#time "on";;
let b1 = parallelExists (equals xs.[i]) xs;; // Parallel
let b2 = Array.exists (equals xs.[i]) xs;; // Sequential

Upvotes: 3

Views: 306

Answers (2)

Daniel
Daniel

Reputation: 47914

I think you can take the following steps:

  1. Spawn a number of workers (threads or async computations), and pass each an equal slice of the array and a cancellation token which will be shared by all workers

  2. When a worker finds the searched item, it calls Cancel on the token (each worker should check the cancel state of the token on each iteration and bail if needed)

I don't have time at the moment to write the code, so there could be some detail I'm omitting.

This answer, and related question, may be helpful.

UPDATE

This is an example of what I'm thinking

open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

let getChunks size array =
  let rec loop s n =
    seq {
      if n > 0 then
        let r = n - size
        if r > 0 then yield (s, size); yield! loop (s + size) r
        else yield (s, size + r)
    }      
  loop 0 (Array.length array)

[<Literal>]
let CHUNK_SIZE = 3

let parallelExists f (array:_[]) =
  use cts = new CancellationTokenSource()
  let rec checkSlice i n = 
    if n > 0 && not cts.IsCancellationRequested then
      if f array.[i] then cts.Cancel()
      else checkSlice (i + 1) (n - 1)
  let workers =
    array
    |> getChunks CHUNK_SIZE
    |> Seq.map (fun (s, c) -> Task.Factory.StartNew(fun () -> checkSlice s c))
    |> Seq.toArray
  try 
    Task.WaitAll(workers, cts.Token)
    false
  with :? OperationCanceledException -> true

Usage

let array = Array.init 10 id
let exists = 
  array |> parallelExists (fun i ->
    Thread.Sleep(500)
    i = 9)
printfn "%b" exists //true

Upvotes: 3

David Grenier
David Grenier

Reputation: 1110

The F# Powerpack has PSeq.exists which maps to PLINQ's ParallelEnumerable.Any which is part of the BCL. There's also ParallelEnumerable.First

I tried to decompile but wouldn't understand right away what was going on. So instead I went and executed the following side-effecting code to confirm that it's using some sort of cancellation once it found the element:

let elems = seq {
    for x = 0 to 1000000 do
        printfn "test"
        yield x }

open System
open System.Linq;;

ParallelEnumerable.First (ParallelEnumerable.AsParallel(elems), Func<_,_>(fun x -> x = 1))

Upvotes: 1

Related Questions