Reputation: 2262
I have a long running task (creating textures from depth images from the Kinect One) that is implemented using Reactive Extensions. The gist of it is listed below:
kinectWrapper.DepthFrames
.ObserveOn(new EventLoopScheduler())
.Select(f => do some CPU intensive data manipulation to create the color texture I want)
.Subscribe(colorFrame => fill texture on GPU)
The problem is that both the select and the subscribe are rather heavy on the system and won't run at full speed. I've managed to get it running at acceptable speed on my development PC with a .Sample(TimeSpan.FromMilliseconds(100))
, but I'd rather have it reduce the framerate based on CPU usage.
I think there are two possibilities:
Upvotes: 3
Views: 540
Reputation: 101
A solution could achieved by modifying the behaviour of the extension method found here: http://rxx.codeplex.com/workitem/20724
One example is below. In this case, I have modified the behaviour so that the extension method will limit the number of queued notifications by discarding the oldest until the queue size is acceptable.
To meet your requirements, you could modify this so that it discards certain notifications based on CPU metrics that you can read using the System.Diagnostics.PerformanceCounter class.
However, you could also attempt to abstract yourself away from such specific details, perhaps you could use the extension method below with a scheduler that uses a low priority thread.
This would mean that notifications would be more likely to be discarded when the CPU was busy.
kinectWrapper.DepthFrames.ThrottledObserveOn(
new EventLoopScheduler(start => new Thread(start) {Priority = ThreadPriority.Lowest, IsBackground = true}),
5).Select(...
public static IObservable<TSource> ThrottledObserveOn<TSource>(
this IObservable<TSource> source,
IScheduler scheduler,
int maximumQueuedNotifications)
{
Contract.Requires(source != null);
Contract.Requires(scheduler != null);
Contract.Requires(maximumQueuedNotifications >= 0);
return Observable.Create<TSource>(observer =>
{
var notificationsGate = new object();
var acceptingNotification = false;
var nextNotifications = new Queue<Notification<TSource>>();
Notification<TSource> completionNotification = null;
var schedulerDisposable = new MultipleAssignmentDisposable();
var subscriptionDisposable = source.Materialize().Subscribe(notification =>
{
bool startAcceptingNotifications;
lock (notificationsGate)
{
startAcceptingNotifications = !acceptingNotification;
acceptingNotification = true;
if (notification.Kind == NotificationKind.OnNext)
{
nextNotifications.Enqueue(notification);
}
else
{
completionNotification = notification;
}
}
if (startAcceptingNotifications)
{
schedulerDisposable.Disposable = scheduler.Schedule(rescheduleAction =>
{
Notification<TSource> notificationToAccept;
lock (notificationsGate)
{
if (nextNotifications.Any())
{
do
{
notificationToAccept = nextNotifications.Dequeue();
}
while (nextNotifications.Count > maximumQueuedNotifications);
}
else
{
notificationToAccept = completionNotification;
completionNotification = null;
}
}
notificationToAccept.Accept(observer);
bool continueAcceptingNotification;
lock (notificationsGate)
{
continueAcceptingNotification = acceptingNotification = nextNotifications.Any() || completionNotification != null;
}
if (continueAcceptingNotification)
{
rescheduleAction();
}
});
}
});
return new CompositeDisposable(subscriptionDisposable, schedulerDisposable);
});
}
Upvotes: 2