Reputation: 29179
I have the following code to test https://github.com/fsprojects/FSharpx.Async/blob/master/src/FSharpx.Async/BlockingQueueAgent.fs
However, it prints Done 0
at the end. It seems the [ enqueue(); enqueue(); enqueue() ]
is not run?
let ag = new BlockingQueueAgent<int option>(500)
let enqueue() = async { for i = 0 to 1000 do ag.Add (Some i) }
[ enqueue(); enqueue(); enqueue() ] |> Async.Parallel |> Async.Ignore |> Async.Start
ag.Add None
let mutable x = 0
let rec dequeue() =
async {
let! m = ag.AsyncGet()
match m with
| Some v ->
x <- x + v
return! dequeue()
| None ->
printfn "Done %d" x
}
dequeue() |> Async.RunSynchronously
Upvotes: 0
Views: 49
Reputation: 80765
It seems that you have a race condition: ag.Add None
happens before the enqueue
copies even have a chance to run, because you're starting them on a background thread with Async.Start
. In order to maintain the correct order of operations, make them all part of a single async workflow:
let enqueueAll = async {
do! [ enqueue(); enqueue(); enqueue() ] |> Async.Parallel |> Async.Ignore
ag.Add None
}
Async.Start enqueueAll
This way, ag.Add None
won't be executed until Async.Parallel
finishes, which will happen after all copies of enqueue()
are done.
Upvotes: 1