Reputation: 2775
I am trying to run some same task in parallel in a F# Console project.
The task is as follow
ConcurrentQueue
(this queue contains the table names my program need to process)So basically each task is a while loop (pose as a recursion) to contiunously process tables. And I would like to start 4 tasks in parallel, and another thing is I would like to stop execution with a user keystroke on the Console, say Enter
key. But the execution should only be stopped if the current task has complete step 4.
I have tried the following
let rec DownloadHelper (cq:ConcurrentQueue<Table>) sqlConn =
let success, tb = cq.TryDequeue()
if success then
printfn "Processing %s %s" tb.DBName tb.TBName
Table2CSV tb.DBName tb.TBName sqlConn
DownloadHelper cq sqlConn
let DownloadTable (cq:ConcurrentQueue<Table>) connectionString=
use con = new SqlConnection(connectionString)
con.Open()
DownloadHelper cq con
let asyncDownloadTask = async { return DownloadTable cq connectionString}
let asyncMultiDownload =
asyncDownloadTask
|> List.replicate 4
|> Async.Parallel
asyncMultiDownload
|>Async.RunSynchronously
|>ignore
There are two problems with the above code,
My second try is as below to use CancellationToken,
let tokenSource = new CancellationTokenSource()
let cq = PrepareJobs connectionString
let asyncDownloadTask = async { DownloadTable cq connectionString}
let task = async {
asyncDownloadTask
|> List.replicate 4
|> Async.Parallel
|>ignore}
let val1 = Async.Start(task, cancellationToken =tokenSource.Token)
Console.ReadLine() |> ignore
tokenSource.Cancel()
Console.ReadLine() |> ignore
0
But it seems I am not even able to start the task at all.
Upvotes: 3
Views: 502
Reputation: 2764
There are three problems with your code.
First, the DownloadHelper
should do one table only.
By making it recursive, you are taking too much control and
inhibiting parallelism.
Second, simply placing an operation in an async
expression does not
magically make it async. Unless the DownloadTable
function itself
is async, the code will block until it is finished.
So when you run four downloads in parallel, once started, they will all run to completion, regardless of the cancellation token.
Thirdly, in your second example you use Async.Parallel
but then throw the output away, which is why your task
does nothing!
I think what you wanted to do was throw away the result of the async, not the async itself.
Here's my version of your code, to demonstrate these points.
First, a dummy function that takes up time:
let longAtomicOperation milliSecs =
let sw = System.Diagnostics.Stopwatch()
let r = System.Random()
let mutable total = 0.0
sw.Start()
while sw.ElapsedMilliseconds < int64 milliSecs do
total <- total + sin (r.NextDouble())
// return something
total
// test
#time
longAtomicOperation 2000
#time
// Real: 00:00:02.000, CPU: 00:00:02.000, GC gen0: 0, gen1: 0, gen2: 0
Note that this function is not async -- once started it will run to completion.
Now let's put it an an async
:
let asyncTask id = async {
// note that NONE of the operations are async
printfn "Started %i" id
let result = longAtomicOperation 5000 // 5 seconds
printfn "Finished %i" id
return result
}
None of the operations in the async block are async, so we are not getting any benefit.
Here's the code to create four tasks in parallel:
let fourParallelTasks = async {
let! results =
List.init 4 asyncTask
|> Async.Parallel
// ignore
return ()
}
The result of the Async.Parallel
is not ignored, but is assigned to a value,
which forces the tasks to be run. The async
expression as a whole returns unit though.
If we test it:
open System.Threading
// start the task
let tokenSource = new CancellationTokenSource()
do Async.Start(fourParallelTasks, cancellationToken = tokenSource.Token)
// wait for a keystroke
System.Console.WriteLine("press a key to cancel")
System.Console.ReadLine() |> ignore
tokenSource.Cancel()
System.Console.ReadLine() |> ignore
We get output that looks like this, even if a key is pressed, because once started, each task will run to completion:
press a key to cancel
Started 3
Started 1
Started 2
Started 0
Finished 1
Finished 3
Finished 2
Finished 0
On the other hand, if we create a serial version, like this:
let fourSerialTasks = async {
let! result1 = asyncTask 1
let! result2 = asyncTask 2
let! result3 = asyncTask 3
let! result4 = asyncTask 4
// ignore
return ()
}
Then, even though the tasks are atomic, the cancellation token is tested between each step, which allows cancellation of the subsequence tasks.
// start the task
let tokenSource = new CancellationTokenSource()
do Async.Start(fourSerialTasks, cancellationToken = tokenSource.Token)
// wait for a keystroke
System.Console.WriteLine("press a key to cancel")
System.Console.ReadLine() |> ignore
tokenSource.Cancel()
System.Console.ReadLine() |> ignore
The above code can be cancelled between each step when a key is pressed.
To process all elements of the queue this way in batches of four, just convert the parallel version into a loop:
let rec processQueueAsync() = async {
let! result = processFourElementsAsync()
if result <> QueueEmpty then
do! processQueueAsync()
// ignore
return ()
}
Finally, to me, using async is not about running things in parallel so much as it is to write non-blocking code. So if your library code is blocking, the async approach is not going to provide too much benefit.
To ensure your code is non-blocking, you need to using the async versions of SqlDataReader
methods
in your helper, such as NextResultAsync
.
Upvotes: 4