Reputation: 1307
I need to pre-process and post-process an event every time it gets observed. Here is the thing I came up with, that works:
var subj = new Subject<Event>();
var observable = Observable.Create<Event>(obs =>
{
subj.Finally(obs.OnCompleted);
return subj.Subscribe(e =>
{
try
{
Preprocess(e);
obs.OnNext(e);
Postprocess(e);
}
catch (Exception ex) { obs.OnError(ex); }
});
});
My question: is this the right way to do it, or maybe there's a better template/extension method for this?
Upvotes: 3
Views: 498
Reputation: 29786
There are a few subtleties here.
The title of your question is mentions modifying events - but you shouldn't really do this. I am assuming that pre and post-process report on the event, but don't modify it. If you need to do this, it's probably better to change the Do
below to a Select
and have Preprocess return a modified copy of the value -
this immutable approach is more explicit and less error-prone. It's not clear how post-modification of an event makes sense in an observable stream - I would avoid it and have the subscriber report observation instead. There's nothing in the solution that follows that technically prevents you mutating the events in the processing methods - but behaviour if you are mutating and have multiple downstream subscribers will need to be handled very carefully indeed. Such mutation is not idiomatic Rx, or generally good programming practice.
There are many downstream operators that introduce asynchronicity so if you are trying to record observation by an ultimate subscriber, there's no guarantee it would have occurred at the point your Postprocess is called. For example, inserting a simple Delay
before a subscriber could scupper you, since Postprocess will be called before the subscriber has seen the event. The only guarantee you have is that Postprocess is called after the immediate downstream subscriber has returned from OnNext
- that's it. You can't say anything about the timing beyond that point, so it's only post processing in a very narrow sense. This is why I would put "post-processing" in the subscriber which carries out what you deem as your "final" action or (if that subscriber is synchronous) immediately before it.
Your call to Finally
isn't going to do anything - it doesn't modify the observable it is applied to, it returns a new observable that incorporates it's behaviour that you are discarding.
You are catching exceptions thrown by the subscriber. This is particularly subtle - you should not do this, or send OnError
to the subscriber afterwards because the subscriber is now in an unknown error state. I talk about this at length in How to handle exceptions thrown by observer's onNext in RxJava? (the answer is applicable to .NET). Whether or not you should post-process in the event of subscriber failure isn't clear from your question, but as your implementation does not attempt this, I haven't either.
Your implementation isn't passing through OnCompleted
and OnError
events from the upstream source (subj
in your case).
Here's an approach that might work for you, the above caveats notwithstanding.
You can use Do
(or for mutation, Select
changing preprocess to a Func<T>
as discussed above) to handle the pre-processing side of things, which makes things easier. Here is a convenient custom operator to manage this:
public static class ObservableExtensions
{
public static IObservable<T> Process<T>(
this IObservable<T> source,
Action<T> preprocess,
Action<T> postprocess)
{
return Observable.Create<T>(o =>
source.Do(preprocess).Subscribe(x =>
{
o.OnNext(x);
try
{
postprocess(x);
}
catch (Exception e)
{
o.OnError(e);
}
},
o.OnError,
o.OnCompleted)
);
}
}
The Do will correctly propagate a pre-processing error as an OnError
and the try-catch will handle post-processing errors. We deliberately do not handle errors in the subscriber as discussed above. The Create
method will enforce the rest of the Rx grammar correctly.
Use it like so:
subj.Process(Preprocess, PostProcess)
.Subscribe(/* observer or handlers etc. */);
Here, using the reactive testing framework (nuget rx-testing) and the assertion library Shouldly (nuget shouldly) are some unit tests for this operator:
public class TestProcess : ReactiveTest
{
[Test]
public void ErrorFreeStreamProcessedCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)", "2", "Postprocess(2)"
};
var actual = new List<string>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add($"Postprocess({x})"));
var result = scheduler.CreateObserver<int>();
sut.Do(x => actual.Add(x.ToString())).Subscribe(result);
scheduler.Start();
result.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
actual.ShouldBe(expected);
}
[Test]
public void ErrorInPreprocessHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Error"
};
var expectedException = new ApplicationException("Error");
var actual = new List<string>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
var sut = xs.Process(
x => actual.Add(x == 1 ? $"Preprocess({x})" : throw expectedException),
x => actual.Add($"Postprocess({x})"));
var result = scheduler.CreateObserver<int>();
sut.Do(x => actual.Add(x.ToString()),
e => actual.Add(e.Message)).Subscribe(result);
scheduler.Start();
result.Messages.AssertEqual(
OnNext(100, 1),
OnError<int>(200, expectedException)
);
actual.ShouldBe(expected);
}
[Test]
public void ErrorInPostprocessHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)", "2", "Error"
};
var expectedException = new ApplicationException("Error");
var actual = new List<string>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add(x == 1 ? $"Postprocess({x})" : throw expectedException));
var result = scheduler.CreateObserver<int>();
sut.Do(x => actual.Add(x.ToString()),
e => actual.Add(e.Message)).Subscribe(result);
scheduler.Start();
result.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnError<int>(200, expectedException)
);
actual.ShouldBe(expected);
}
[Test]
public void ErrorInSubscriberHandledCorrectly()
{
var expected = new List<string>
{
"Preprocess(1)", "1", "Postprocess(1)",
"Preprocess(2)"
};
var expectedException = new ApplicationException("Error");
var actual = new List<string>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable<int>(
OnNext(100, 1),
OnNext(200, 2),
OnCompleted<int>(300)
);
var sut = xs.Process(
x => actual.Add($"Preprocess({x})"),
x => actual.Add($"Postprocess({x})"));
var result = scheduler.CreateObserver<int>();
sut.Subscribe(
x => { if (x != 1) throw expectedException; else actual.Add(x.ToString()); result.OnNext(x); },
result.OnError,
result.OnCompleted);
try
{
scheduler.Start();
}
catch
{
}
result.Messages.AssertEqual(
OnNext(100, 1)
);
actual.ShouldBe(expected);
}
}
Upvotes: 5