tesla1060
tesla1060

Reputation: 2775

F# parallel design pattern

I am trying to run some same task in parallel in a F# Console project.

The task is as follow

  1. Dequeue a table name from a ConcurrentQueue(this queue contains the table names my program need to process)
  2. Open a SqlDataReader for the table
  3. Write each row in the SqlDataReader to a StreamWriter
  4. Zip the file created by the StreamWriter
  5. Repeat 1 - 4

So basically each task is a while loop (pose as a recursion) to contiunously process tables. And I would like to start 4 tasks in parallel, and another thing is I would like to stop execution with a user keystroke on the Console, say Enter key. But the execution should only be stopped if the current task has complete step 4.

I have tried the following

let rec DownloadHelper (cq:ConcurrentQueue<Table>) sqlConn = 
    let success, tb = cq.TryDequeue()
    if success then
        printfn "Processing %s %s" tb.DBName tb.TBName
        Table2CSV tb.DBName tb.TBName sqlConn
        DownloadHelper cq sqlConn

let DownloadTable (cq:ConcurrentQueue<Table>) connectionString=
    use con = new SqlConnection(connectionString)
    con.Open()
    DownloadHelper cq con

let asyncDownloadTask = async { return DownloadTable cq connectionString}
let asyncMultiDownload =
    asyncDownloadTask
    |> List.replicate 4
    |> Async.Parallel
asyncMultiDownload
|>Async.RunSynchronously
|>ignore

There are two problems with the above code,

  1. It blockes the main thread, thus I dont know how to do the keystroke part
  2. I am not sure how to stop execution gracefully.

My second try is as below to use CancellationToken,

let tokenSource = new CancellationTokenSource()

let cq = PrepareJobs connectionString
let asyncDownloadTask = async {  DownloadTable cq connectionString}
let task = async {  
            asyncDownloadTask
            |> List.replicate 4
            |> Async.Parallel
            |>ignore}
let val1 = Async.Start(task, cancellationToken =tokenSource.Token)
Console.ReadLine() |> ignore
tokenSource.Cancel()
Console.ReadLine() |> ignore
0

But it seems I am not even able to start the task at all.

Upvotes: 3

Views: 502

Answers (1)

Grundoon
Grundoon

Reputation: 2764

There are three problems with your code.

First, the DownloadHelper should do one table only. By making it recursive, you are taking too much control and inhibiting parallelism.

Second, simply placing an operation in an async expression does not magically make it async. Unless the DownloadTable function itself is async, the code will block until it is finished.

So when you run four downloads in parallel, once started, they will all run to completion, regardless of the cancellation token.

Thirdly, in your second example you use Async.Parallel but then throw the output away, which is why your task does nothing! I think what you wanted to do was throw away the result of the async, not the async itself.

Here's my version of your code, to demonstrate these points.

First, a dummy function that takes up time:

let longAtomicOperation milliSecs = 
    let sw = System.Diagnostics.Stopwatch()
    let r = System.Random()
    let mutable total = 0.0
    sw.Start()
    while sw.ElapsedMilliseconds < int64 milliSecs do
        total <- total + sin (r.NextDouble())
    // return something
    total

// test
#time
longAtomicOperation 2000
#time
// Real: 00:00:02.000, CPU: 00:00:02.000, GC gen0: 0, gen1: 0, gen2: 0

Note that this function is not async -- once started it will run to completion.

Now let's put it an an async:

let asyncTask id = async {  
    // note that NONE of the operations are async
    printfn "Started %i" id
    let result = longAtomicOperation 5000 // 5 seconds
    printfn "Finished %i" id
    return result
    }

None of the operations in the async block are async, so we are not getting any benefit.

Here's the code to create four tasks in parallel:

let fourParallelTasks = async {
    let! results = 
        List.init 4 asyncTask
        |> Async.Parallel
    // ignore
    return ()
    }

The result of the Async.Parallel is not ignored, but is assigned to a value, which forces the tasks to be run. The async expression as a whole returns unit though.

If we test it:

open System.Threading

// start the task
let tokenSource = new CancellationTokenSource()
do Async.Start(fourParallelTasks, cancellationToken = tokenSource.Token)

// wait for a keystroke
System.Console.WriteLine("press a key to cancel")
System.Console.ReadLine() |> ignore
tokenSource.Cancel()
System.Console.ReadLine() |> ignore

We get output that looks like this, even if a key is pressed, because once started, each task will run to completion:

press a key to cancel
Started 3
Started 1
Started 2
Started 0
Finished 1
Finished 3
Finished 2
Finished 0

On the other hand, if we create a serial version, like this:

let fourSerialTasks = async {
    let! result1 = asyncTask 1
    let! result2 = asyncTask 2
    let! result3 = asyncTask 3
    let! result4 = asyncTask 4
    // ignore
    return ()
    }

Then, even though the tasks are atomic, the cancellation token is tested between each step, which allows cancellation of the subsequence tasks.

// start the task
let tokenSource = new CancellationTokenSource()
do Async.Start(fourSerialTasks, cancellationToken = tokenSource.Token)

// wait for a keystroke
System.Console.WriteLine("press a key to cancel")
System.Console.ReadLine() |> ignore
tokenSource.Cancel()
System.Console.ReadLine() |> ignore

The above code can be cancelled between each step when a key is pressed.

To process all elements of the queue this way in batches of four, just convert the parallel version into a loop:

let rec processQueueAsync() = async {
    let! result = processFourElementsAsync()
    if result <> QueueEmpty then
        do! processQueueAsync()
    // ignore
    return ()
    }

Finally, to me, using async is not about running things in parallel so much as it is to write non-blocking code. So if your library code is blocking, the async approach is not going to provide too much benefit.

To ensure your code is non-blocking, you need to using the async versions of SqlDataReader methods in your helper, such as NextResultAsync.

Upvotes: 4

Related Questions