Reputation: 435
Background.
I am trying to figure out MailboxProcessor. The idea is to use it as a some kind of state machine and pass arguments around between the states and then quit. Some parts are going to have async communication so I made a Sleep there. It's a console application, making a Post does nothing because main thread quits and kills everything behind it. I am making a PostAndReply in main. Also, I have tried without
let sleepWorkflow = async
, doesn't make any difference.
Questions.
(I am probably doing something wrong)
Go24 is not async. Changing RunSynchronously to StartImmediate makes no visible difference. The end should be somewhere below GetMe instead. At the same time Done is printed after Fetch. Isn't the control supposed t be returned to the main thread on sleep?
Go24, wait go24 1, end Fetch 1 Done GetMe ...
Run time is terrible slow. Without delay in Fetch it's about 10s (stopwatch). I thought F# threads are lightweight and should use threadpool. According to debugger it takes appr 1s to create every and it looks like real threads.
Also, changing to [1..100] will "pause" the program for 100s, according to ProcessExplorer 100 threads are created during that time and only then everything is printed. I would actually prefer fewer threads and slow increase.
Code.
Program.fs
[<EntryPoint>]
let main argv =
let a = Mailbox.MessageBasedCounter.DoGo24 1
let a = Mailbox.MessageBasedCounter.DoFetch 1
let b = Mailbox.MessageBasedCounter.GetMe
let task i = async {
//Mailbox.MessageBasedCounter.DoGo24 1
let a = Mailbox.MessageBasedCounter.DoFetch i
return a
}
let stopWatch = System.Diagnostics.Stopwatch.StartNew()
let x =
[1..10]
|> Seq.map task
|> Async.Parallel
|> Async.RunSynchronously
stopWatch.Stop()
printfn "%f" stopWatch.Elapsed.TotalMilliseconds
printfn "a: %A" a
printfn "b: %A" b
printfn "x: %A" x
0 // return an integer exit code
Mailbox.fs
module Mailbox
#nowarn "40"
type parserMsg =
| Go24 of int
| Done
| Fetch of int * AsyncReplyChannel<string>
| GetMe of AsyncReplyChannel<string>
type MessageBasedCounter () =
/// Create the agent
static let agent = MailboxProcessor.Start(fun inbox ->
// the message processing function
let rec messageLoop() = async{
let! msg = inbox.Receive()
match msg with
| Go24 n ->
let sleepWorkflow = async{
printfn "Go24, wait"
do! Async.Sleep 4000
MessageBasedCounter.DoDone() // POST Done.
printfn "go24 %d, end" n
return! messageLoop()
}
Async.RunSynchronously sleepWorkflow
| Fetch (i, repl) ->
let sync = async{
printfn "Fetch %d" i
do! Async.Sleep 1000
repl.Reply( "Reply Fetch " + i.ToString() ) // Reply to the caller
return! messageLoop()
}
Async.RunSynchronously sync
| GetMe (repl) ->
let sync = async{
printfn "GetMe"
repl.Reply( "GetMe" ) // Reply to the caller
return! messageLoop()
}
Async.RunSynchronously sync
| Done ->
let sync = async{
printfn "Done"
return! messageLoop()
}
Async.RunSynchronously sync
}
// start the loop
messageLoop()
)
// public interface to hide the implementation
static member DoDone () = agent.Post( Done )
static member DoGo24 (i:int) = agent.Post( Go24(i) )
static member DoFetch (i:int) = agent.PostAndReply( fun reply -> Fetch(i, reply) )
static member GetMe = agent.PostAndReply( GetMe )
Upvotes: 1
Views: 813
Reputation: 243096
I'm not necessarily sure that this is the main problem, but the nested asyncs and Async.RunSynchrously
in the agent code look suspicious.
You do not need to create a nested async - you can just call asynchronous operations in the body of the match
clauses directly:
// the message processing function
let rec messageLoop() = async{
let! msg = inbox.Receive()
match msg with
| Go24 n ->
printfn "Go24, wait"
do! Async.Sleep 4000
MessageBasedCounter.DoDone()
printfn "go24 %d, end" n
return! messageLoop()
| Fetch (i, repl) ->
(...)
Aside from that, it is important to understand that the agent has exactly one instance of the body computation running. So, if you block the body of the agent, all other operations will be queued.
If you want to start some task (like the synchronous operations) in the background and resume the agent immediately, you can use Async.Start
inside the body (but be sure to call the main loop recursively in the main part of the body):
| Go24 n ->
// Create work item that will run in the background
let work = async {
printfn "Go24, wait"
do! Async.Sleep 4000
MessageBasedCounter.DoDone()
printfn "go24 %d, end" n }
// Queue the work in a thread pool to be processed
Async.Start(work)
// Continue the message loop, waiting for other messages
return! messageLoop()
Upvotes: 2