Reputation: 1765
I am trying to get the 20 latest values of an observable and exposing it as a property without blocking occurring. At the moment, my code looks like:
class Foo
{
private IObservable<int> observable;
public Foo(IObservable<int> bar)
{
this.observable = bar;
}
public IEnumerable<int> MostRecentBars
{
get
{
return this.observable.TakeLast(20).ToEnumerable();
}
}
}
However, when the MostRecentBars getter is called, this is blocking, presumably because ToEnumerable will not return until there are at least 20 observed values.
Is there a built-in way to expose up to a maximum of 20 most recent values of the observable without blocking? If there are less than 20 observed values then it should just return all of them.
Upvotes: 2
Views: 706
Reputation: 708
Seriously, doesn't this all depend on how long you willing to wait? How do you know the observable has completed when you have 19 items in hand? One second later you are sure it's done? ten years later? You are sure it's done? How do you know? It's Observable, so you have to keep observing it until your operators or whatever monadic transforms you apply do some useful transform with the incoming stream.
I would think the (possibly new addition) Window or Buffer with the TimeSpan overload would work. Especially Window, it releases an Observable of Observable so once the outer Observable is created, you can actually listen for the First of 20 items, and then you need to listen carefully for the OnCompleted, or you lose the whole point of the Window operator, but you get the idea.
Upvotes: -1
Reputation: 12403
I have a few extensions I tend to attach to any project I build with the reactive extensions, one of them is a sliding window:
public static IObservable<IEnumerable<T>> SlidingWindow<T>(this IObservable<T> o, int length)
{
Queue<T> window = new Queue<T>();
return o.Scan<T, IEnumerable<T>>(new T[0], (a, b) =>
{
window.Enqueue(b);
if (window.Count > length)
window.Dequeue();
return window.ToArray();
});
}
This returns an array of the most recent N items (or less, if there have not been N items yet).
For your case, you should be able to do:
class Foo
{
private IObservable<int> observable;
private int[] latestWindow = new int[0];
IDisposable slidingWindowSubscription;
public Foo(IObservable<int> bar)
{
this.observable = bar;
slidingWindowSubscription = this.observable.SlidingWindow(20).Subscribe(a =>
{
latestWindow = a;
});
}
public IEnumerable<int> MostRecentBars
{
get
{
return latestWindow;
}
}
}
Upvotes: 1
Reputation: 33637
Although you have already got your answer, I was thinking of solving this using Replay Subject with buffer and came up with something like:
class Foo
{
private ReplaySubject<int> replay = new ReplaySubject<int>(20);
public Foo(IObservable<int> bar)
{
bar.Subscribe(replay);
}
public IEnumerable<int> MostRecentBars
{
get
{
var result = new List<int>();
replay.Subscribe(result.Add); //Replay fill in the list with buffered items on same thread
return result;
}
}
}
Let me know if this fits into your problem.
Upvotes: 1
Reputation: 117027
I'll give you two choices. One uses the Rx Scan
operator, but I think that one makes it a little more complicated to read. The other uses a standard Queue
with locking. You can choose.
(1)
class Foo
{
private int[] bars = new int[] { };
public Foo(IObservable<int> bar)
{
bar
.Scan<int, int[]>(
new int[] { },
(ns, n) =>
ns
.Concat(new [] { n, })
.TakeLast(20)
.ToArray())
.Subscribe(ns => bars = ns);
}
public IEnumerable<int> MostRecentBars
{
get
{
return bars;
}
}
}
(2)
class Foo
{
private Queue<int> queue = new Queue<int>();
public Foo(IObservable<int> bar)
{
bar.Subscribe(n =>
{
lock (queue)
{
queue.Enqueue(n);
if (queue.Count > 20)
{
queue.Dequeue();
}
}
});
}
public IEnumerable<int> MostRecentBars
{
get
{
lock (queue)
{
return queue.ToArray();
}
}
}
}
I hope these help.
Upvotes: 2
Reputation: 5355
I can't think of a built-in Rx operator(s) that fits your requirements. You could implement it this way:
class Foo
{
private IObservable<int> observable;
private Queue<int> buffer = new Queue<int>();
public Foo(IObservable<int> bar)
{
this.observable = bar;
this.observable
.Subscribe(item =>
{
lock (buffer)
{
if (buffer.Count == 20) buffer.Dequeue();
buffer.Enqueue(item);
}
});
}
public IEnumerable<int> MostRecentBars
{
get
{
lock (buffer)
{
return buffer.ToList(); // Create a copy.
}
}
}
}
Upvotes: 1