Reputation: 3061
I've coded the "classical" bank account kata with F# MailboxProcessor
to be thread safe. But when I try to parallelize adding a transaction to an account, it's very slow very quick: 10 parallel calls are responsive (2ms), 20 not (9 seconds)! (See last test Account can be updated from multiple threads
beneath)
Since MailboxProcessor
supports 30 million messages per second (see theburningmonk's article), where the problem comes from?
// -- Domain ----
type Message =
| Open of AsyncReplyChannel<bool>
| Close of AsyncReplyChannel<bool>
| Balance of AsyncReplyChannel<decimal option>
| Transaction of decimal * AsyncReplyChannel<bool>
type AccountState = { Opened: bool; Transactions: decimal list }
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) =
async {
let! message = inbox.Receive()
match message with
| Close channel ->
channel.Reply state.Opened
return! loop { state with Opened = false }
| Open channel ->
printfn $"Opening"
channel.Reply (not state.Opened)
return! loop { state with Opened = true }
| Transaction (tran, channel) ->
printfn $"Adding transaction {tran}, nb = {state.Transactions.Length}"
channel.Reply true
return! loop { state with Transactions = tran :: state.Transactions }
| Balance channel ->
let balance =
if state.Opened then
state.Transactions |> List.sum |> Some
else
None
balance |> channel.Reply
return! loop state
}
loop { Opened = false; Transactions = [] }
)
member _.Open () = agent.PostAndReply(Open)
member _.Close () = agent.PostAndReply(Close)
member _.Balance () = agent.PostAndReply(Balance)
member _.Transaction (transaction: decimal) =
agent.PostAndReply(fun channel -> Transaction (transaction, channel))
// -- API ----
let mkBankAccount = Account
let openAccount (account: Account) =
match account.Open() with
| true -> Some account
| false -> None
let closeAccount (account: Account option) =
account |> Option.bind (fun a ->
match a.Close() with
| true -> Some a
| false -> None)
let updateBalance transaction (account: Account option) =
account |> Option.bind (fun a ->
match a.Transaction(transaction) with
| true -> Some a
| false -> None)
let getBalance (account: Account option) =
account |> Option.bind (fun a -> a.Balance())
// -- Tests ----
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
let should_not_equal expected actual =
if expected <> actual then
Ok expected
else
Error (expected, actual)
let ``Returns empty balance after opening`` =
let account = mkBankAccount() |> openAccount
getBalance account |> should_equal (Some 0.0m)
let ``Check basic balance`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let updatedBalance =
account
|> updateBalance 10.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
updatedBalance |> should_equal (Some 10.0m)
let ``Balance can increment or decrement`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let addedBalance =
account
|> updateBalance 10.0m
|> getBalance
let subtractedBalance =
account
|> updateBalance -15.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
addedBalance |> should_equal (Some 10.0m),
subtractedBalance |> should_equal (Some -5.0m)
let ``Account can be closed`` =
let account =
mkBankAccount()
|> openAccount
|> closeAccount
getBalance account |> should_equal None,
account |> should_not_equal None
#time
let ``Account can be updated from multiple threads`` =
let account =
mkBankAccount()
|> openAccount
let updateAccountAsync =
async {
account
|> updateBalance 1.0m
|> ignore
}
let nb = 10 // 👈 10 is quick (2ms), 20 is so long (9s)
updateAccountAsync
|> List.replicate nb
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
getBalance account |> should_equal (Some (decimal nb))
#time
Upvotes: 3
Views: 165
Reputation: 4488
Your problem is that your code don't uses Async all the way up.
Your Account class has the method Open
, Close
, Balance
and Transaction
and you use a AsyncReplyChannel
but
you use PostAndReply
to send the message. This means: You send a message to the MailboxProcessor with a channel to reply. But, at this point, the method waits Synchronously to finish.
Even with Async.Parallel
and multiple threads it can mean a lot of threads lock themsels. If you change
all your Methods to use PostAndAsyncReply
then your problem goes away.
There are two other performance optimization that can speed up performance, but are not critical in your example.
Calling the Length of a list is bad. To calculate the length of a list, you must go through the whole list. You only use this in Transaction to print the length, but consider if the transaction list becomes longer. You alway must go through the whole list, whenever you add a transaction. This will be O(N) of your transaction list.
The same goes for calling (List.sum). You have to calculate the current Balance whenever you call Balance. Also O(N).
As you have a MailboxProcessor, you also could calculate those two values instead of completly recalculating those values again and again.Thus, they become O(1) operations.
On top, i would change the Open
, Close
and Transaction
messages to return nothing, as in my Opinion, it doesn't make sense that they return anything. Your examples even makes me confused of what the bool
return
values even mean.
In the Close
message you return state.Opened
before you set it to false. Why?
In the Open
message you return the negated state.Opened
. How you use it later it just looks wrong.
If there is more meaning behind the bool
please make a distinct Discriminated Union out of it, that describes the purpose of what it returns.
You used an option<Acount>
throughout your code, i removed it, as i don't see any purpose of it.
Anyway, here is a full example, of how i would write your code that don't have the speed problems.
type Message =
| Open
| Close
| Balance of AsyncReplyChannel<decimal option>
| Transaction of decimal
type AccountState = {
Opened: bool
Transactions: decimal list
TransactionsLength: int
CurrentBalance: decimal
}
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) = async {
match! inbox.Receive() with
| Close ->
printfn "Closing"
return! loop { state with Opened = false }
| Open ->
printfn "Opening"
return! loop { state with Opened = true }
| Transaction tran ->
let l = state.TransactionsLength + 1
printfn $"Adding transaction {tran}, nb = {l}"
if state.Opened then
return! loop {
state with
Transactions = tran :: state.Transactions
TransactionsLength = l
CurrentBalance = state.CurrentBalance + tran
}
else
return! loop state
| Balance channel ->
if state.Opened
then channel.Reply (Some state.CurrentBalance)
else channel.Reply None
return! loop state
}
let defaultAccount = {
Opened = false
Transactions = []
TransactionsLength = 0
CurrentBalance = 0m
}
loop defaultAccount
)
member _.Open () = agent.Post(Open)
member _.Close () = agent.Post(Close)
member _.Balance () = agent.PostAndAsyncReply(Balance)
member _.Transaction transaction = agent.Post(Transaction transaction)
(* Test *)
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
(* --- API --- *)
let mkBankAccount = Account
(* Opens the Account *)
let openAccount (account: Account) =
account.Open ()
(* Closes the Account *)
let closeAccount (account: Account) =
account.Close ()
(* Updates Account *)
let updateBalance transaction (account: Account) =
account.Transaction(transaction)
(* Gets the current Balance *)
let getBalance (account: Account) =
account.Balance ()
#time
let ``Account can be updated from multiple threads`` =
let account = mkBankAccount ()
openAccount account
let updateBalanceAsync = async {
updateBalance 1.0m account
}
let nb = 50
List.replicate nb updateBalanceAsync
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
Async.RunSynchronously (async {
let! balance = getBalance account
printfn "Balance is %A should be (Some %f)" balance (1.0m * decimal nb)
})
#time
Upvotes: 2