Reputation: 3029
I can't work out why the following code is hanging at the call to GetTotal
. I don't seem to be able to debug inside the MailboxProcessor, so it's hard to see what's going on.
module Aggregator
open System
type Message<'T, 'TState> =
| Aggregate of 'T
| GetTotal of AsyncReplyChannel<'TState>
type Aggregator<'T, 'TState>(initialState, f) =
let myAgent = new MailboxProcessor<Message<'T, 'TState>>(fun inbox ->
let rec loop agg =
async {
let! message = inbox.Receive()
match message with
| Aggregate x -> return! loop (f agg x)
| GetTotal replyChannel ->
replyChannel.Reply(agg)
return! loop agg
}
loop initialState
)
member m.Aggregate x = myAgent.Post(Aggregate(x))
member m.GetTotal = myAgent.PostAndReply(fun replyChannel -> GetTotal(replyChannel))
let myAggregator = new Aggregator<int, int>(0, (+))
myAggregator.Aggregate(3)
myAggregator.Aggregate(4)
myAggregator.Aggregate(5)
let totalSoFar = myAggregator.GetTotal
printfn "%d" totalSoFar
Console.ReadLine() |> ignore
It seems to work fine when using an identical MailboxProcessor directly, rather than wrapping in the Aggregator
class.
Upvotes: 3
Views: 373
Reputation: 243096
The problem is that you did not start the agent. You can either call Start
after you create the agent:
let myAgent = (...)
do myAgent.Start()
Alternatively, you can create the agent using MailboxProcessor<'T>.Start
instead of calling the constructor (I usually prefer this option, because it looks more functional):
let myAgent = MailboxProcessor<Message<'T, 'TState>>.Start(fun inbox -> (...) )
I suppose that you couldn't debug the agent, because the code inside agent wasn't actually running. I tried adding printfn "Msg: %A" message
right after the call to Receive
inside the agent (to print incoming messages for debugging) and I noticed that, after calling Aggregate
, no messages were actually received by the agent... (It only blocked after calling GetTotal
, which avaits reply)
As a side-note, I would probably turn GetTotal
into a method, so you'd call GetTotal()
. Properties are re-evaluated each time you access them, so your code does the same thing, but best practices don't recommend using properties that do complex work.
Upvotes: 7
Reputation: 6437
You forgot to start the mailbox:
open System
type Message<'T, 'TState> =
| Aggregate of 'T
| GetTotal of AsyncReplyChannel<'TState>
type Aggregator<'T, 'TState>(initialState, f) =
let myAgent = new MailboxProcessor<Message<'T, 'TState>>(fun inbox ->
let rec loop agg =
async {
let! message = inbox.Receive()
match message with
| Aggregate x -> return! loop (f agg x)
| GetTotal replyChannel ->
replyChannel.Reply(agg)
return! loop agg
}
loop initialState
)
member m.Aggregate x = myAgent.Post(Aggregate(x))
member m.GetTotal = myAgent.PostAndReply(fun replyChannel -> GetTotal(replyChannel))
member m.Start() = myAgent.Start()
let myAggregator = new Aggregator<int, int>(0, (+))
myAggregator.Start()
myAggregator.Aggregate(3)
myAggregator.Aggregate(4)
myAggregator.Aggregate(5)
let totalSoFar = myAggregator.GetTotal
printfn "%d" totalSoFar
Console.ReadLine() |> ignore
Upvotes: 4