Reputation: 1
I have the following code in F# which is think is sufficiently concurrent to utilize the 4 cores of my machine. Yet cpu usage is limited to one core.
member x.Solve problemDef =
use flag = new ManualResetEventSlim(false)
let foundSoFar = MSet<'T>()
let workPile = MailboxProcessor<seq<'T>>.Start(fun inbox ->
let remaining = ref 0
let rec loop() = async {
let! data = inbox.Receive()
let data = data |> Seq.filter (not << foundSoFar.Contains) |> Array.ofSeq
foundSoFar.UnionWith data
let jobs = ref -1
for chunk in data |> Seq.distinct |> Seq.chunked 5000 do
Async.Start <| async {
Seq.collect problemDef.generators chunk
|> Array.ofSeq
|> inbox.Post
}
incr jobs
remaining := !remaining + !jobs
if (!remaining = 0 && !jobs = -1) then
flag.Set() |> ignore
else
return! loop()
}
loop()
)
workPile.Post problemDef.initData
flag.Wait() |> ignore
foundSoFar :> seq<_>
I use the MailboxProcessor as a workpile from where I get chunks of elements, filter them through a HashSet and create tasks with the new elements whose results are inserted in the workpile. This is repeated until no new elements are produced. The aim of this code is to asynchronously insert the chunks in the workpile, thus the use of tasks. My problem is that there is no parallelism.
Edit: thanks to @jon-harrop I solved the concurrency problem which was due to the lazy nature of seq, and rewrought the code following the suggestions. Is there any way to get rid of the ManualResetEvent without using a discriminated union as the message type of the agent(to support the asking message)?
Upvotes: 0
Views: 227
Reputation: 48687
You're mixing high-level concurrency primitives (tasks and agents) with ManualResetEventSlim
which is very bad. Can you use PostAndReply
instead?
You're using Seq
to do "work" in the spawned task which is lazy so it won't actually do anything until after it is posted back. Can you force evaluation inside the task with something like Array.ofSeq
?
The way you are using Task
is anomalous. Might be more idiomatic to switch to Async.Start
.
Without a complete solution I cannot validate any of my guesses...
think is sufficiently concurrent to utilize the 4 cores
Your mental model of multicore parallelism might be quite off the mark.
Upvotes: 1
Reputation: 243096
Without a complete example, I found it quite difficult to understand what your code does (perhaps because it combines quite a few different concurrent programming primitives, which makes it a bit hard to follow).
Anyway, the body of MailboxProcessor
is executed only once (if you want to get concurrency using plain agents, you need to start multiple agents). In the body of the agent, you start a task that runs problemDef.generators
for each chunk
.
This means that problemDef.generators
should run in parallel. However the code that calls foundSoFar.Contains
and foundSoFar.UnionWith
as well as Seq.distinct
is always run sequentially.
So, if problemDef.generators
is a simple and efficient function, the overhead with tracking foundSoFar
(which is done sequentially) is probably larger than what you get by parallelization.
I'm not familiar with MSet<'T>
, but if it is (or if you replaced it with) a thread safe mutable set, then you should be able to run some of the unioning right in the Task.StartNew
(in parallel with other unioning).
PS: As I said, it is difficult to tell without running the code, so my thinking may be completely wrong!
Upvotes: 2