Reputation: 6091
There is a subscription to an observable that sends out log messages. Some of the log messages come from other threads because they are are in F# async blocks. I need to be able to write out the messages from the main thread.
Here is the code that currently filters out many of the log messages because they are not on the main thread:
member x.RegisterTrace() =
Logging.verbose <- x.Verbose
let id = Threading.Thread.CurrentThread.ManagedThreadId
Logging.subscribe (fun trace ->
if id = Threading.Thread.CurrentThread.ManagedThreadId then
match trace.Level with
| TraceLevel.Warning -> x.WriteWarning trace.Text
| TraceLevel.Error -> x.WriteWarning trace.Text
| _ -> x.WriteObject trace.Text
else
Diagnostics.Debug.Write(sprintf "not on main PS thread: %A" trace)
)
I have various forms of using System.Threading.SynchronizationContent
.Current
, .SetSynchronizationConent
, .Send
, .Post
. I've also dabbled with System.Threading.Tasks.TaskScheduler.FromCurrentSynchronizationContext
. I've also tried Async.SwitchToContext
. No matter what I do, System.Threading.Thread.CurrentThread.ManagedThreadId
ends up being different and PowerShell complains. Am I going about this wrong?
Here is the work-in-progress pull request and more details about the problem.
UPDATE 2015-06-16 Tue 11:45 AM PST
@RCH Thank you, but using Async.SwitchToContext
to set the SynchronizationContext
does not appear to work. Here is the code and debugging output when I do Paket-Restore -Force
:
member x.RegisterTrace() =
let a = Thread.CurrentThread.ManagedThreadId
Logging.verbose <- x.Verbose
let ctx = SynchronizationContext.Current
Logging.subscribe (fun trace ->
let b = Thread.CurrentThread.ManagedThreadId
async {
let c = Thread.CurrentThread.ManagedThreadId
do! Async.SwitchToContext ctx
let d = Thread.CurrentThread.ManagedThreadId
Debug.WriteLine (sprintf "%d %d %d %d %s" a b c d trace.Text)
} |> Async.Start
)
An expert at work recommended another solution that I'm going to try that involves passing in the context when subscribing.
UPDATE 2015-06-16 Tue 5:30 PM PST
I got help creating an IObservable.SubscribeOn that allows the SynchrnonizationContext to be passed in. Unfortunately, it doesn't solve the problem eithe, but may be part of the solution. May be a custom SynchronizationContext is needed like that SingleThreadSynchrnonizationContext. I would love help making one, but before I do that, I'm going to try out System.Reactive's Observable.ObserveOn(Scheduler.CurrentThread)
.
UPDATE 2015-06-16 Tue 8:30 PM PST
I haven't been able to get Rx to work either. Scheduler.CurrentThread
doesn't behave the way I was hoping. I then tried out these changes and the callback doesn't get called.
member x.RegisterTrace() =
Logging.verbose <- x.Verbose
let a = Threading.Thread.CurrentThread.ManagedThreadId
let ctx = match SynchronizationContext.Current with null -> SynchronizationContext() | sc -> sc
let sch = SynchronizationContextScheduler ctx
Logging.event.Publish.ObserveOn sch
|> Observable.subscribe (fun trace ->
let b = Threading.Thread.CurrentThread.ManagedThreadId
Debug.WriteLine(sprintf "%d %d %s" a b trace.Text)
A custom SynchronizationContext may be what is needed. :/
Upvotes: 1
Views: 695
Reputation: 6091
I ended up creating an EventSink
that has a queue of callbacks that are executed on the main PowerShell thread via Drain()
. I put the main computation on another thread. The pull request has the full code and more details.
Upvotes: 1
Reputation: 8551
For UI applications (Forms, WPF, F# interactive, ...) selecting SynchronizationContext.Current
and Async.SwitchToContext
from your trials as well as borrowing some code from Paket is enough.
For console applications, however, there is no SynchronizationContext and thus no thread the continuations can be Post
ed to, so they will end up on the thread pool. A possible workaround is found on MSDN Blogs.
Solution for UI applications only:
Having
module Logging
open System.Diagnostics
type Trace = { Level: TraceLevel; Text: string }
let public event = Event<Trace>()
let subscribe callback = Observable.subscribe callback event.Publish
and
[<AutoOpen>]
module CmdletExt
open System.Diagnostics
open System.Threading
type PSCmdletStandalone() =
member x.RegisterTrace() =
let syncContext = SynchronizationContext.Current
Logging.subscribe (fun trace ->
async {
do! Async.SwitchToContext syncContext
let threadId = Thread.CurrentThread.ManagedThreadId
match trace.Level with
| TraceLevel.Warning -> printfn "WARN (on %i): %s" threadId trace.Text
| TraceLevel.Error -> printfn "ERROR (on %i): %s" threadId trace.Text
| _ -> printfn "(on %i): %s" threadId trace.Text
} |> Async.Start // or Async.StartImmediate
)
then, registering on the main thread
#load "Logging.fs"
#load "SO.fs"
open System.Diagnostics
open System.Threading
open System
(new PSCmdletStandalone()).RegisterTrace()
printfn "Main: %i" Thread.CurrentThread.ManagedThreadId
for i in Enum.GetValues(typeof<TraceLevel>) do
async {
let workerId = Thread.CurrentThread.ManagedThreadId
do Logging.event.Trigger { Level = unbox i; Text = sprintf "From %i" workerId }
} |> Async.Start
yields e.g.
Main: 1
WARN (on 1): From 23
(on 1): From 25
ERROR (on 1): From 22
(on 1): From 13
(on 1): From 14
Upvotes: 1