Reputation: 4198
I am using F# mailbox processor to synchronize data exchange so there are some operation and flags (send and finalized) are set to true, but sometimes I need to reset then so I am using api to do this, so my api has a special command I called system purge this operation sets some system flags to back to false, but problem is that it does not fire.
As an api engine I am using Nancy (but I don't think that is the source of the problem) so module that resets system flags styatuis would look like:
let purgeHandler =
Func<obj, Response>
(fun _ ->
try
HandshakeProcessor.Post(HandshakeService.Purge)
let jsonBytes = System.Text.Encoding.UTF8.GetBytes(""" {} """)
new Response
(ContentType = "application/json",
StatusCode = HttpStatusCode.OK,
Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously)
with
ex ->
let jsonBytes = System.Text.Encoding.UTF8.GetBytes(sprintf """ { "result": "false", "error": "%s" } """ ex.Message)
new Response
(ContentType = "application/json",
StatusCode = HttpStatusCode.InternalServerError,
Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously))
this.Put("Purge", purgeHandler)
Is pretty simple as soon I receive request I am posting purge command to mailbox to set flags to false.
So my Mailbox looks like:
type HandshakeService =
| Finalized of (bool * bool)
| Purge
| Get of AsyncReplyChannel<HandshakePools>
let HandshakeProcessor : MailboxProcessor<HandshakeService> =
let handshake = HandshakePools()
MailboxProcessor.Start(fun inbox ->
let rec registerMessagePoint (responseType : ResponseType, message : DatabaseMessage) =
async{
let! msg = inbox.Receive()
match msg with
| Finalized (send, finalized) ->
handshake.Finalized <- finalized
handshake.Send <- send
| Purge ->
handshake.Send <- false
handshake.Finalized <- false
return! registerMessagePoint(responseType, message)
| Get replyChannel ->
handshake |> replyChannel.Reply
return! registerMessagePoint (responseType, message)
}
registerMessagePoint(ResponseType.Unknown, DatabaseMessage (0us, 0us, String.Empty)))
Furthermore when I use get flag status inside FUNC like :
let pools = HandshakeProcessor.PostAndReply((fun reply -> HandshakeService.Get reply), timeout = 10000)
printfn "\n\n\n\n FINALIZED %b \n\n\n\n" pools.Finalized
I am getting timeout exception.
Also if I move Post before Func it is firing up, but only once during module initialization then when i call this method again it wont bugde, like:
let purgeHandler (handshakePool : HandshakePools) =
HandshakeProcessor.Post(HandshakeService.Purge)
Func<obj, Response>
So problem is when I fire up API method purge post is skipped. It does not enter purge method in mailbox and flags remain same, I have put breakpoint in purge part of the mailbox. Does anyone know to solve this problem ?
Upvotes: 1
Views: 97
Reputation: 4198
It seems there is no easy way to do that, all I have found are usages of SignalR with nancyFx in this case. So have resigned from MailboxProcessor and create mediator instead.
Mediator:
type HttpCommunicationMediator private () =
static member val private _instance = lazy HttpCommunicationMediator()
static member Instance = HttpCommunicationMediator._instance.Value
member val private purgeSettings = Event<unit>()
[<CLIEvent>]
member this.PurgeSetting = this.purgeSettings.Publish
member this.InvokePurgeSetting = this.purgeSettings.Trigger
Invoke when API request comes:
HttpCommunicationMediator.Instance.InvokePurgeSetting()
Execution of event (put where it can be registered):
HttpCommunicationMediator.Instance.PurgeSetting.AddHandler (fun _ _-> (* resets flags *))
Well it is not perfect, but it works.
Upvotes: 1