Greg
Greg

Reputation: 2179

Get from observable no more than every 100ms, but always get the final value

I have a worker which exposes a Subject<string>, which publishes log messages very quickly. Writing to the console is slow, so I want to only write to the console every 100ms, at the most. When the task is finished I want to write out the most recent published string, to avoid having things like Doing work 2312/2400 ...done. (or even ...done if the task takes <100ms.)

I'm new to reactive extensions, and though I've heard talks about how awesome they are, this is the first time I've noticed a situation where they could help me.

So, in summary, 1) Don't give me an event more than once every 100ms 2) I need to know about the final event, regardless of when it arrives.

I'll put my code in an answer below, but please suggest something better. Maybe I've missed a standard call which achieves this?

Upvotes: 1

Views: 738

Answers (3)

Ankur
Ankur

Reputation: 33637

You can use BufferWithTime function.

Upvotes: 0

yamen
yamen

Reputation: 15618

It's actually simpler than you make out if the observable that is the source of your events called OnCompleted() when it's done. This will do the trick by itself:

observable.Sample(TimeSpan.FromMilliseconds(100)).Subscribe(log);

This is because Sample will fire one last time when the source of it's events completes. Here's a full test:

var sub = new Subject<string>();
var gen = Observable.Interval(TimeSpan.FromMilliseconds(50)).Select((_,i) => i).Subscribe(i => sub.OnNext(i.ToString()));

sub.Sample(TimeSpan.FromSeconds(1))
   .Subscribe(Console.WriteLine);

Thread.Sleep(3500);
sub.OnCompleted();

Even though I sleep for 3.5 seconds, 4 events fire, the last one firing when I call OnCompleted().

Another point to note is that it's bad form if Worker.GetObservable() actually returns a Subject<string> - even if it is that in the Worker class, what it really should do is return just the IObservable<string> interface. This is more than mere style, it is a separation of concerns and returning the minimum functional interface needed.

Upvotes: 4

Greg
Greg

Reputation: 2179

This is the code I've come up with, which seems to do the job. I'm not sure how correct or idiomatic it is though.

var observable = Worker.GetObservable(); //Actually a Subject<string>
Action<string> log = x => Console.Write("\r" + x);
string latest = "";
using(observable.Subscribe(x => latest = x))
using(observable.Sample(TimeSpan.FromMilliseconds(100)).Subscribe(log))
{
    Worker.DoWorkWithLogs();
    log(latest);
}
Console.WriteLine(" ...done.");

Upvotes: 0

Related Questions