Wojciech Szabowicz
Wojciech Szabowicz

Reputation: 4198

F# and mailbox processor does not execute

I am using F# mailbox processor to synchronize data exchange so there are some operation and flags (send and finalized) are set to true, but sometimes I need to reset then so I am using api to do this, so my api has a special command I called system purge this operation sets some system flags to back to false, but problem is that it does not fire.

As an api engine I am using Nancy (but I don't think that is the source of the problem) so module that resets system flags styatuis would look like:

let purgeHandler = 

    Func<obj, Response> 
        (fun _ -> 
            try
                HandshakeProcessor.Post(HandshakeService.Purge)
                let jsonBytes = System.Text.Encoding.UTF8.GetBytes(""" {} """)
                new Response 
                        (ContentType = "application/json", 
                            StatusCode = HttpStatusCode.OK,
                            Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously)
            with
                ex -> 
                    let jsonBytes = System.Text.Encoding.UTF8.GetBytes(sprintf """ { "result": "false", "error": "%s" } """ ex.Message)
                    new Response 
                        (ContentType = "application/json", 
                            StatusCode = HttpStatusCode.InternalServerError,
                            Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously))

this.Put("Purge", purgeHandler)

Is pretty simple as soon I receive request I am posting purge command to mailbox to set flags to false.

So my Mailbox looks like:

    type HandshakeService =
      | Finalized of (bool * bool)
      | Purge
      | Get of AsyncReplyChannel<HandshakePools>

    let HandshakeProcessor : MailboxProcessor<HandshakeService> = 
    let handshake = HandshakePools()
      MailboxProcessor.Start(fun inbox ->
        let rec registerMessagePoint (responseType : ResponseType, message : DatabaseMessage) =
            async{
                let! msg = inbox.Receive()
                match msg with 
                | Finalized (send, finalized) -> 
                    handshake.Finalized <- finalized
                    handshake.Send <- send
                | Purge ->
                    handshake.Send <- false
                    handshake.Finalized <- false
                    return! registerMessagePoint(responseType, message)
                | Get replyChannel ->
                    handshake |> replyChannel.Reply
                    return! registerMessagePoint (responseType, message)  
            }
        registerMessagePoint(ResponseType.Unknown, DatabaseMessage (0us, 0us, String.Empty)))

Furthermore when I use get flag status inside FUNC like :

let pools = HandshakeProcessor.PostAndReply((fun reply -> HandshakeService.Get reply), timeout = 10000)
printfn "\n\n\n\n  FINALIZED %b  \n\n\n\n" pools.Finalized

I am getting timeout exception.

Also if I move Post before Func it is firing up, but only once during module initialization then when i call this method again it wont bugde, like:

let purgeHandler (handshakePool : HandshakePools) = 

    HandshakeProcessor.Post(HandshakeService.Purge) 
    Func<obj, Response> 

So problem is when I fire up API method purge post is skipped. It does not enter purge method in mailbox and flags remain same, I have put breakpoint in purge part of the mailbox. Does anyone know to solve this problem ?

Upvotes: 1

Views: 97

Answers (1)

Wojciech Szabowicz
Wojciech Szabowicz

Reputation: 4198

It seems there is no easy way to do that, all I have found are usages of SignalR with nancyFx in this case. So have resigned from MailboxProcessor and create mediator instead.

Mediator:

type HttpCommunicationMediator private () =   

  static member val private _instance = lazy HttpCommunicationMediator()
  static member Instance = HttpCommunicationMediator._instance.Value

  member val private purgeSettings = Event<unit>()
  [<CLIEvent>]
  member this.PurgeSetting = this.purgeSettings.Publish
  member this.InvokePurgeSetting = this.purgeSettings.Trigger

Invoke when API request comes:

HttpCommunicationMediator.Instance.InvokePurgeSetting()

Execution of event (put where it can be registered):

HttpCommunicationMediator.Instance.PurgeSetting.AddHandler (fun _ _-> (* resets flags *))  

Well it is not perfect, but it works.

Upvotes: 1

Related Questions