Tuomas Hietanen
Tuomas Hietanen

Reputation: 5321

F# 3.0: System.Exception: multiple waiting reader continuations for mailbox

I'm trying to run some MailboxProcessor testing, and seems like mailbox Scan() fails with "System.Exception: multiple waiting reader continuations for mailbox". This happens with Async.Start and Async.StartImmediate, etc. (Async.RunSynchronously also won't work as then there is only one processor and no customers after initial customers).

Here is demo code, this works in interactive:

#if INTERACTIVE
#r "../packages/FSharp.Data.2.0.4/lib/net40/FSharp.Data.dll"
#endif
open System
open FSharp.Data
let random = new Random()
let data = FreebaseData.GetDataContext()
let customerNames = data.Commons.Computers.``Computer Scientists``
let nameAmount = customerNames |> Seq.length
// ----

type Customer() =
    let person = customerNames |> Seq.nth (random.Next nameAmount)
    member x.Id = Guid.NewGuid()
    member x.Name = person.Name
    member x.RequiredTime = random.Next(10000)

type Barber(name) =
    member x.Name = name

type ``Possible actions notified to barber`` = 
| CustomerWalksIn of Customer

let availableCustomers = new MailboxProcessor<``Possible actions notified to barber``>(fun c -> async { () })

let createBarber name = 
    Console.WriteLine("Barber " + name + " takes inital nap...")
    let rec cutSomeHairs () = 
        async{
            do! availableCustomers.Scan(function 
                | CustomerWalksIn customer ->
                    async {
                        Console.WriteLine("Barber " + name + " is awake and started cutting " + customer.Name + "'s hair.")
                        // exception also happen with Threading.Thread.Sleep()
                        do! Async.Sleep customer.RequiredTime
                        Console.WriteLine("Barber " + name + " finnished cutting " + customer.Name + "'s hair. Going to sleep now...")
                    } |> Some)
            do! cutSomeHairs ()
            }
    cutSomeHairs() |> Async.StartImmediate

availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)

createBarber "Tuomas";
createBarber "Seppo";

availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)

...and the stacktrace I get after running a while is:

System.Exception: multiple waiting reader continuations for mailbox
   at <StartupCode$FSharp-Core>[email protected](AsyncParams`1 _arg1)
   at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>[email protected](Object state)
   at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.TimerQueueTimer.CallCallback()
   at System.Threading.TimerQueueTimer.Fire()
   at System.Threading.TimerQueue.FireNextTimers()
   at System.Threading.TimerQueue.AppDomainTimerCallback()
Stopped due to error

or same without threads:

System.Exception: multiple waiting reader continuations for mailbox
   at <StartupCode$FSharp-Core>[email protected](AsyncParams`1 _arg1)
   at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>[email protected](Object state)
Stopped due to error

Upvotes: 0

Views: 158

Answers (2)

user3841986
user3841986

Reputation:

As Tomas already pointed out, the MailboxProcessor directly allows only a single reader and that one way to solve this issue within the async system is to write your own queue or mailbox type. One thing that the article pointed to by Tomas doesn't talk about, however, is that another way to implement new communication primitives is to use Async.FromContinuations rather than MailboxProcessor and AsyncReplyChannel.

The main advantage of using Async.FromContinuations is that you get a much more direct access to the async mechanism and don't have to work within the limitations imposed by MailboxProcessor and AsyncReplyChannel. The main disadvantage is that you need to make your queue or mailbox thread safe by yourself.

As a concrete example, Anton Tayanovskyy's blog post, Making Async 5x Faster, contains an implementation of a multiple reader-writer synchronous channel implemented using Async.FromContinuations.

Upvotes: 1

Tomas Petricek
Tomas Petricek

Reputation: 243096

The Receive and Scan methods of an MailboxProcessor should be only called from the body of the agent. To quote the MSDN documentation:

This method is for use within the body of the agent. For each agent, at most one concurrent reader may be active, so no more than one concurrent call to Receive, TryReceive, Scan or TryScan may be active. The body of the scanner function is locked during its execution, but the lock is released before the execution of the asynchronous workflow.

So, you'll need to structure your code differently. I do not have a detailed answer, but it sounds like my article on implementing blocking queue using agents could help here.

Upvotes: 1

Related Questions