Reputation: 1148
I'm trying to use Async
versions of Microsoft.Management.Infrastructure
, but I'm having hard time figuring out how to do session management with Reactive Extensions.
Here's some background:
CimSession.CreateAsync
, which returns IObservable
CimSession.CreateAsync
returns a CimAsyncResult<T>
, which wraps CimAsyncDelegatedObservable<T>
CimAsyncDelegatedObservable<T>
does not dispose CimSession
at any point, and thus one must manually dispose CimSession
when it is not needed anymore.CimSession
, I'm trying to run a WQL query using QueryInstancesAsync
, for example session.QueryInstancesAsync(@"root\cimv2", "WQL", "SELECT Username FROM Win32_ComputerSystem")
If CimSession
returned Task
s instead of Observable
s, the code would be pretty straightforward:
using var session = CimSession.CreateAsync(".");
var results = session.QueryInstancesAsync(@"root\cimv2", "WQL", "SELECT Username FROM Win32_ComputerSystem");
But as the Async
methods are implemented using Observable
s, I'm not sure how to translate this to idiomatic usage of Observable
s. Sure I could translate my methods to Task
s using System.Reactive.Linq.Observable.ForEachAsync
, but I'd like to learn how to use Observables properly.
To sum it up, my questions would be:
CimSession
to Observable
CimSession
gets disposed, if there are no subscribers? Should I force at least one subscriber, but having factory method?CimSession
observable when results for QueryInstancesAsync
arrive?Upvotes: 1
Views: 205
Reputation: 117064
It's a bit of a whacky interface, but give this a go and let me know if it works:
IObservable<string> query =
from session in CimSession.CreateAsync(".")
from x in Observable.Using(
() => session,
s => s.QueryInstancesAsync(@"root\cimv2", "WQL", "SELECT Username FROM Win32_ComputerSystem"))
from y in Observable.Using(
() => x,
z => Observable.Return(z.GetCimSessionComputerName()))
select y;
If that works, I'd suggest wrapping it in a separate class:
public static class CimSessionEx
{
public static IObservable<T> CreateObservable<T>(string computerName, string namespaceName, string queryDialect, string queryExpression, Func<CimInstance, IObservable<T>> factory) =>
from session in CimSession.CreateAsync(computerName)
from instance in Observable.Using(() => session, s => s.QueryInstancesAsync(namespaceName, queryDialect, queryExpression))
from y in Observable.Using(() => instance, factory)
select y;
}
Then you could use it like this:
IObservable<string> query =
CimSessionEx.CreateObservable(
".",
@"root\cimv2",
"WQL",
"SELECT Username FROM Win32_ComputerSystem",
i => Observable.Return(i.GetCimSessionComputerName()));
Upvotes: 1