Reputation: 2179
I have a worker which exposes a Subject<string>
, which publishes log messages very quickly. Writing to the console is slow, so I want to only write to the console every 100ms, at the most. When the task is finished I want to write out the most recent published string, to avoid having things like Doing work 2312/2400 ...done
. (or even ...done
if the task takes <100ms.)
I'm new to reactive extensions, and though I've heard talks about how awesome they are, this is the first time I've noticed a situation where they could help me.
So, in summary, 1) Don't give me an event more than once every 100ms 2) I need to know about the final event, regardless of when it arrives.
I'll put my code in an answer below, but please suggest something better. Maybe I've missed a standard call which achieves this?
Upvotes: 1
Views: 738
Reputation: 15618
It's actually simpler than you make out if the observable that is the source of your events called OnCompleted()
when it's done. This will do the trick by itself:
observable.Sample(TimeSpan.FromMilliseconds(100)).Subscribe(log);
This is because Sample
will fire one last time when the source of it's events completes. Here's a full test:
var sub = new Subject<string>();
var gen = Observable.Interval(TimeSpan.FromMilliseconds(50)).Select((_,i) => i).Subscribe(i => sub.OnNext(i.ToString()));
sub.Sample(TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine);
Thread.Sleep(3500);
sub.OnCompleted();
Even though I sleep for 3.5 seconds, 4 events fire, the last one firing when I call OnCompleted()
.
Another point to note is that it's bad form if Worker.GetObservable()
actually returns a Subject<string>
- even if it is that in the Worker class, what it really should do is return just the IObservable<string>
interface. This is more than mere style, it is a separation of concerns and returning the minimum functional interface needed.
Upvotes: 4
Reputation: 2179
This is the code I've come up with, which seems to do the job. I'm not sure how correct or idiomatic it is though.
var observable = Worker.GetObservable(); //Actually a Subject<string>
Action<string> log = x => Console.Write("\r" + x);
string latest = "";
using(observable.Subscribe(x => latest = x))
using(observable.Sample(TimeSpan.FromMilliseconds(100)).Subscribe(log))
{
Worker.DoWorkWithLogs();
log(latest);
}
Console.WriteLine(" ...done.");
Upvotes: 0