Leor Greenberger
Leor Greenberger

Reputation: 153

Streaming an IConnectableObservable after processing

I am trying to write a method that takes an IConnectableObservable, does some processing on it and returns a new IConnectableObservable that streams the processed data plus some additional items. The sequence being streamed is finite, but it has side effects so it needs to be run only once. However, I am trying to do two things with it:

  1. Transform each element in the stream using a Select query.
  2. Collect each element in the stream into an array and do some processing on the array and stream the results.

Below is my best attempt at doing this, but I feel that there probably is a superior way of doing this that I haven't figured out.

protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
    IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
    var obsResults = output.Select(o =>
    {
        var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
        return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
        {
            Component = "Variable Gain Module Input",
            Description = "Measurement Accuracy",
            Limits = limits,
            Output = o,
            Passed = _validationService.Validate(o.Result, limits)
        };
    });

    var observable = Observable.Create<ITestResult<ITestOutput<double, double>, ITestLimit<double>>>(obs =>
    {
        var resultTask = obsResults.ForEachAsync(obs.OnNext);
        var fitTask = output.ToArray().ForEachAsync(arr =>
        {
            resultTask.Wait();
            var fit = ComputeErrorFit(arr, testCase);
            obs.OnNext(GetGainErrorResult(fit.Item2, testCase));
        });
        output.Connect();
        Task.WaitAll(resultTask, fitTask);
        obs.OnCompleted();
        return Disposable.Empty;
    });

    return observable.Publish();
}

Edited 10/7/2015:

Here is the rest of the code:

private ITestResult<ITestOutput<double, double>, ITestLimit<double>> GetGainErrorResult(double gainError, InputVerificationTestCase testCase)
{
    var gainErrorLimit = GenerateDcAccuracyLimits.CalculateGainErrorLimits(testCase.FullScaleRange, testCase.Limits);
    return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
    {
        Component = "Variable Gain Module Input",
        Description = "Gain Error",
        Passed = _validationService.Validate(gainError, gainErrorLimit),
        Output = new TestOutput<double, double> { Input = 0, Result = gainError },
        Limits = gainErrorLimit
    };
}

private Tuple<double, double> ComputeErrorFit(ITestOutput<double, double>[] outputs, InputChannelTestCase testCase)
{
    var input = outputs.Select(o => o.Input);
    var vErr = outputs.Select(o => o.Result - o.Input);
    return Fit.Line(input.ToArray(), vErr.ToArray());
}

Also in an abstract base class, I have the following:

public IConnectableObservable<TOutput> RunSingleChannel(TCase testCase)
{
    dut.Acquisition.SampleRate.Value = SampleRate;
    dut.AnalogChannels[testCase.Channel].InputRange.Value = testCase.FullScaleRange;
    var testOutput = CreateTestProcedure(testCase.Channel).RunAsync(testCase.InputVoltages);
    return ProcessOutput(testOutput.Replay(), testCase);
}

protected abstract IConnectableObservable<TOutput> ProcessOutput(IConnectableObservable<ITestOutput<double, TAcq>> output, TCase testCase);

Upvotes: 2

Views: 120

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

It seems that you're going about doing things the hard way with Rx. You really need to avoid mixing in Tasks with Observables. They make your code hard to reason about and often lead to deadlocks and other concurrency issues.

You should try something like this instead:

protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
    IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
    var source = output.RefCount();
    return
        source
            .Select(o =>
            {
                var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
                return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
                {
                    Component = "Variable Gain Module Input",
                    Description = "Measurement Accuracy",
                    Limits = limits,
                    Output = o,
                    Passed = _validationService.Validate(o.Result, limits)
                };
            })
            .Merge(
                source
                    .ToArray()
                    .Select(arr => GetGainErrorResult(ComputeErrorFit(arr, testCase).Item2, testCase)))
            .Publish();
}

It's a little odd that you're using connectable observables, but the above should roughly be doing what you need.

I've tested the code using this sample:

public IConnectableObservable<int> ProcessOutput(IConnectableObservable<int> output)
{
    var source = output.RefCount();
    return
        source
            .Merge(source.ToArray().Select(arr => arr.Sum()))
            .Publish();
}

void Main()
{
    var output = Observable.Range(1, 10).Publish();

    var processed = ProcessOutput(output);

    processed.Subscribe(x => Console.WriteLine(x));

    processed.Connect();
}

Which outputs:

1
2
3
4
5
6
7
8
9
10
55

I've also checked that the original observable values are only produced once.

Upvotes: 3

Related Questions