Reputation: 9377
Using the MailboxProcessor
in F#
, what is the preferred way to communicate between them? - Wrapping the agents into objects like:
type ProcessAgent(saveAgent:SaveAgent) = ...
type SaveAgent() = ...
let saveAgent = new SaveAgent()
let processAgent = new ProcessAgent(mySaveAgent)
or what about:
type ProcessAgent(cont:string -> unit) = ...
type SaveAgent() = ...
let saveAgent = new SaveAgent()
let processAgent = new ProcessAgent(fun z -> saveAgent.Add z)
or maybe even something like:
type ProcessAgent() = ...
type SaveAgent() = ...
let saveAgent = new SaveAgent()
let processAgent = new ProcessAgent()
processAgent.Process item (fun z -> saveAgent.Add z)
Also is there ever any reason to wrap a normal function, that's is not maintaining some kind of state, into an agent?
Upvotes: 2
Views: 224
Reputation: 243096
The key thing about encapsulating the agents in classes is that it lets you break the direct dependencies between them. So, you can create the individual agents and then connect them into a bigger "agent network" just by registering event handlers, calling methods, etc.
An agent can essentially expose three kinds of members:
Actions are members of type 'T -> unit
. They send some message to the agent without waiting for any reply from the agent. This is essentially wrapping a call to agent.Post
.
Blocking actions are members of type 'T -> Async<'R>
. This is useful when you're sending some message to the agent, but then want to wait for a response (or confirmation that the action was processed). These do not block the logical thread (they are asynchronous) but they block the execution of the caller. This is essentially wrapping a call to agent.PostAndAsyncReply
.
Notifications are members of type IEvent<'T>
or IObservable<'T>
representing some sort of notification reported from the agent - e.g. when the agent finishes doing some work and wants to notify the caller (or other agents).
In your example, the processing agent is doing some work (asynchronously) and then returns the result, so I think it makes sense to use "Blocking action". The operation of the saving agent is just an "Action", because it does not return anything. To demonstrate the last case, I'll add "flushed" notification, which gets called when the saving agent saves all queued items to the actual storage:
// Two-way communication processing a string
type ProcessMessage =
PM of string * AsyncReplyChannel<string>
type ProcessAgent() =
let agent = MailboxProcessor.Start(fun inbox -> async {
while true do
let! (PM(s, repl)) = inbox.Receive()
repl.Reply("Echo: " + s) })
// Takes input, processes it and asynchronously returns the result
member x.Process(input) =
agent.PostAndAsyncReply(fun ch -> PM(input, ch))
type SaveAgent() =
let flushed = Event<_>()
let agent = (* ... *)
// Action to be called to save a new processed value
member x.Add(res) =
agent.Post(res)
// Notification triggered when the cache is flushed
member x.Flushed = flushed.Publish
Then you can create both agents and connect them in various ways using the members:
let proc = ProcessAgent()
let save = SaveAgent()
// Process an item and then save the result
async {
let! r = proc.Process("Hi")
save.Save(r) }
// Listen to flushed notifications
save.Flushed |> Event.add (fun () ->
printfn "flushed..." )
Upvotes: 3
Reputation: 11372
You don't need to create a class for your agents. Why not just write a function that returns your MailboxProcessor
?
let MakeSaveAgent() =
MailboxProcessor<SaveMessageType>.Start(fun inbox ->
(* etc... *))
let MakeProcessAgent (saveAgent: MailboxProcessor<SaveMessageType>) =
MailboxProcessor<ProcessMessageType>.Start(fun inbox ->
(* etc... you can send messages to saveAgent here *))
For your final question: no, not really, that would be adding unnecessary complication when a simple function returning Async<_>
would suffice.
Upvotes: 0