Reputation: 766
I'm struggling with my first simple "hello world" RX application. I'm using VS2010 RC, plus the latest RX download.
The following is the simple console app;
class Program
{
static void Main(string[] args)
{
var channel = new MessageChannel()
.Where(m => m.process)
.Subscribe((MyMessage m) => Console.WriteLine(m.subject));
//channel.GenerateMsgs();
}
}
public class MyMessage
{
public string subject;
public bool process;
}
public class MessageChannel: IObservable<MyMessage>
{
List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>();
public IDisposable Subscribe(IObserver<MyMessage> observer)
{
observers.Add(observer);
return observer as IDisposable;
}
public void GenerateMsgs()
{
foreach (IObserver<MyMessage> observer in observers)
{
observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
}
}
}
I get a ArgumentNullException at the Where clause. Here's the stack;
System.ArgumentNullException was unhandled
Message=Value cannot be null.
Parameter name: disposable
Source=System.Reactive
ParamName=disposable
StackTrace:
at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable)
at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()
at System.Threading.Scheduler.NowScheduler.Schedule(Action action)
at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer)
at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18
at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()
InnerException:
Upvotes: 3
Views: 1109
Reputation: 10783
!!Red flag!!
I would strongly suggest that you don't implement IObserver<T>
or IObservable<T>
yourself. Favor the use of Observable.Create<T>
or as a last resort use the Subject
types. There are lots of things you need to consider to correctly implement these interfaces which is handled for you by the correct Rx types and operators.
In this example I would urge you to drop the MessageChannel type and swap it for
class Program
{
static void Main(string[] args)
{
var channel = GenerateMsgs()
.Where(m => m.process)
.Subscribe((MyMessage m) => Console.WriteLine(m.subject));
}
public IObservable<MyMessage> GenerateMsgs()
{
return Observable.Create<MyMessage>(observer=>
{
observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
});
}
}
public class MyMessage
{
public string subject;
public bool process;
}
On further inspection of a system design you may have some sort of Service that exposes "channels" as observable sequences.
public interface OrderService
{
IObservable<OrderRequest> OrderRequests();
IObservable<Order> ProcessedOrders();
IObservable<OrderRejection> OrdersRejections();
}
Thus negating the need for these custom implementations of IObserver<T>
or IObservable<T>
.
Upvotes: 1
Reputation: 70983
This line seems to be causing the fuss:
return observer as IDisposable;
You are not supposed to assume the the observer is disposable, you are supposed to return a disposable object that knows about "unsubscribing".
The method returns a reference to an IDisposable interface. This enables the observer to unsubscribe (that is, to stop receiving notifications) before the provider has finished sending them and called the subscriber's OnCompleted method.
You can make it work by doing something like:
public class MessageChannel: IObservable<MyMessage>
{
class Subscription : IDisposable {
MessageChannel _c;
IObservable<MyMessage> _obs;
public Subscription(MessageChannel c, IObservable<MyMessage> obs) {
_c = c; _obs = obs;
}
public void Dispose() {
_c.Unsubscribe(_obs);
}
}
public IDisposable Subscribe(IObserver<MyMessage> observer)
{
observers.Add(observer);
return new Subscription(this, observer);
}
void Unsubscribe(IObservable<MyMessage> obs) {
observers.Remove(obs);
}
}
Upvotes: 1