Reputation: 1015
I am investigating how to develop a plugin framework for a project and Rx seems like a good fit for what i am trying to achieve. Ultimately, the project will be a set of plugins (modular functionality) that can be configured via xml to do different things. The requirements are as follows
In my mind, a plugin is essentially a data transformation entity. This means a plugin either
If you take the concept further, a plugin can consist of a number of all three types above.For example within a plugin you can have an IntGenerator module that generates some data to a ConsoleWorkUnit module etc. So what I am trying to model in the main function is the wiring that a plugin would have to do its work.
To that end, I have the following base classes using the Immutable nuget from Microsoft. What I am trying to achieve is to abstract away the Rx calls so they can be used in modules so the ultimate aim would be to wrap up calls to buffer etc in abstract classes that can be used to compose complex queries and modules. This way the code is a bit more self documenting than having to actually read all the code within a module to find out it subscribes to a buffer or window of type x etc.
public abstract class OutputBase<TOutput> : SendOutputBase<TOutput>
{
public abstract void Work();
}
public interface IBufferedBase<TOutput>
{
void Work(IList<ImmutableList<Data<TOutput>>> list);
}
public abstract class BufferedWorkBase<TInput> : IBufferedBase<TInput>
{
public abstract void Work(IList<ImmutableList<Data<TInput>>> input);
}
public abstract class SendOutputBase<TOutput>
{
private readonly ReplaySubject<ImmutableList<Data<TOutput>>> _outputNotifier;
private readonly IObservable<ImmutableList<Data<TOutput>>> _observable;
protected SendOutputBase()
{
_outputNotifier = new ReplaySubject<ImmutableList<Data<TOutput>>>(10);
_observable = _outputNotifier.SubscribeOn(ThreadPoolScheduler.Instance);
_observable = _outputNotifier.ObserveOn(ThreadPoolScheduler.Instance);
}
protected void SetOutputTo(ImmutableList<Data<TOutput>> output)
{
_outputNotifier.OnNext(output);
}
public void ConnectOutputTo(IWorkBase<TOutput> unit)
{
_observable.Subscribe(unit.Work);
}
public void BufferOutputTo(int count, IBufferedBase<TOutput> unit)
{
_observable.Buffer(count).Subscribe(unit.Work);
}
}
public abstract class WorkBase<TInput> : IWorkBase<TInput>
{
public abstract void Work(ImmutableList<Data<TInput>> input);
}
public interface IWorkBase<TInput>
{
void Work(ImmutableList<Data<TInput>> input);
}
public class Data<T>
{
private readonly T _value;
private Data(T value)
{
_value = value;
}
public static Data<TData> Create<TData>(TData value)
{
return new Data<TData>(value);
}
public T Value { get { return _value; } }
}
These base classes are used to create three classes; one for generating some int data, one to print out the data when they occur and the last to buffer the data as it comes in and sum the values in threes.
public class IntGenerator : OutputBase<int>
{
public override void Work()
{
var list = ImmutableList<Data<int>>.Empty;
var builder = list.ToBuilder();
for (var i = 0; i < 1000; i++)
{
builder.Add(Data<int>.Create(i));
}
SetOutputTo(builder.ToImmutable());
}
}
public class ConsoleWorkUnit : WorkBase<int>
{
public override void Work(ImmutableList<Data<int>> input)
{
foreach (var data in input)
{
Console.WriteLine("ConsoleWorkUnit printing {0}", data.Value);
}
}
}
public class SumPrinter : WorkBase<int>
{
public override void Work(ImmutableList<Data<int>> input)
{
input.ToObservable().Buffer(2).Subscribe(PrintSum);
}
private void PrintSum(IList<Data<int>> obj)
{
Console.WriteLine("Sum of {0}, {1} is {2} ", obj.First().Value,obj.Last().Value ,obj.Sum(x=>x.Value) );
}
}
These are run in a main like this
var intgen = new IntGenerator();
var cons = new ConsoleWorkUnit();
var sumPrinter = new SumPrinter();
intgen.ConnectOutputTo(cons);
intgen.BufferOutputTo(3,sumPrinter);
Task.Factory.StartNew(intgen.Work);
Console.ReadLine();
Is this architecture sound?
Upvotes: 1
Views: 133
Reputation: 39192
You are buffering your observable (.Buffer(count)
) so that it only signals after count
notifications arrive.
However, your IntGenerator.DoWork
only ever produces a single value. Thus you never "fill" the buffer and trigger downstream notifications.
Either change DoWork
so that it eventually produces more values, or have it complete the observable stream when it finishes its work. Buffer
will release the remaining buffered values when the stream completes. To do this, it means somewhere IntGenerator.DoWork
needs to cause a call to _outputNotifier.OnCompleted()
Upvotes: 2