Reputation: 745
I need some help to write two IObservables which perform similar tasks (described below as 'Normal Trigger' and 'Dwell Trigger'). I have working input streams, but I am not sure how to combine them, perform the analysis (e.g. holding on to state variables for the 'Dwell Trigger' to store the 1st interesting coord, as well as our progress percentage) and then produce a new output of a different type, all described within an observable. Let me describe what I am attempting to write:
INPUT = two streams: 1. An unreliable hot stream of Points (x,y coordinates), e.g. the mouse position. The number of points per second could fluctuate slightly, or the stream could produce no values for periods of time. 2. A hot stream of boolean values.
OUTPUT: A combination of the data (a point from stream 1 together with a value derived from stream 2), which will be a double and represent a percentage (only 100% or 0%). An output will only be produced if these conditions are met; A signal is received on the trigger stream and the last signal on the coords stream was within a configured timespan. Another way of saying that would be that we have received a trigger and have a coords which is not old/stale.
Coords stream: -x-x-x-x-x---x-x-x-x------x-x-x-x-x-x-x-
Trigger stream:----------x------------x-------x--------
Result stream: ----------x--------------------x--------
INPUT = one stream: 1. An unreliable hot stream of Points (x,y coordinates), e.g. the mouse position. The number of points per second could fluctuate slightly, or the stream could produce no values for periods of time.
OUTPUT: Same output type as in the 'Normal Trigger' (a point from stream 1 together with a new derived value). An output will only be produced if these conditions are met; The coords stream has supplied coords over a certain timespan where the coords are all within a small area and there are no big gaps between the values, e.g. I've received consistent, unbroken coords for 0.1 second that all relate to the same small area and there were no gap between any coords of more than 0.01 second. I then wish to produce an output which includes the first coord from the unbroken chain and a double indicating that we are, say, 10% to our goal. If we then receive another 0.1 second of coords in the same area with no big gaps then I wish to output the SAME first coord from the unbroken chain, with another double to show that we are 20% to our goal. If a big gap is encountered, or the coords move away from the initial area then we report 0%, discard our progress and initial coord and continue looking for sufficiently long (0.1 sec) unbroken chains. If we reach 100% of our goal then this is output and start looking again (beginning again at 0%).
Coords stream: -x-x-x-x-x---x-x-x-x------x-x-x-x-x-x-x-
Result stream: ---------x------------------------x-----
This may be difficult to understand so let me explain what these are for. The coordinates are locations on the screen, and the first 'Normal Trigger' is attempting to capture the location when a button is pressed (the button press being the trigger). In the second scenario (the 'Dwell Trigger') there is no button press, so we want to capture the position of the mouse pointer when it stays ('dwells') in the same small area. This is, however, gradual, so we want to recognise when the mouse position has been consistent for a small period of time, register that it has been in the same position for 10% of the time needed, then if it stays in the same area register that we are 20% of the way, etc, until we are happy that the mouse has been kept in the same area for long enough to output 100%, which means the user has 'dwelled' in the same position long enough to register their interest. We then start looking again to see where they dwell again, which may be in the same spot.
The 'Dwell Trigger' seems much more challenging to me, as I presume you would need to do all of the following;
I have working Rx observables for the coord and trigger streams. Pseudo code/descriptions of where I should be looking would be most appreciated.
Thank you for reading my enormous post! Julius
Following Christopher's suggestion of using Observable.Create I came up with the below. I do not know whether this is considered the "hacky" way of creating streams and it would be better to describe what I'm doing using another set of Rx methods, but this works. Please note that the functionality is slightly different from what I described;
The output signal is not 100% or 0%, but 1 (for 100%) and -1 (for -100% which occurs when the opposite trigger is encountered). For example pressing the mouse button DOWN might be 1, but releasing it might be -1.
public static IObservable<TriggerSignalWithPoint> CombineWithPointIfRelevent(
this IObservable<bool> triggerSource,
IObservable<Timestamped<Point>> pointsSource,
TimeSpan pointsMaxReleventAge)
{
return Observable.Create<TriggerSignalWithPoint>(subj =>
{
bool disposed = false;
Timestamped<Point>? latestPoint = null;
Action disposeChildSubscriptions = null;
var pointsSubscription = pointsSource.Subscribe(
timestampedPoint =>
{
latestPoint = timestampedPoint;
},
ex =>
{
subj.OnError(ex);
disposeChildSubscriptions();
});
var triggerSubscription = triggerSource
.Where(_ => disposed == false)
.Subscribe(
b =>
{
Point? latestUsefulPoint =
latestPoint.HasValue &&
DateTimeOffset.Now.Subtract(latestPoint.Value.Timestamp) <= pointsMaxReleventAge
? latestPoint.Value.Value
: (Point?) null;
float signal = b ? 1 : -1;
subj.OnNext(new TriggerSignalWithPoint(signal, latestUsefulPoint));
},
ex =>
{
subj.OnError(ex);
disposeChildSubscriptions();
},
() =>
{
subj.OnCompleted();
disposeChildSubscriptions();
});
disposeChildSubscriptions = () =>
{
disposed = true;
if (triggerSubscription != null)
{
triggerSubscription.Dispose();
triggerSubscription = null;
}
if (pointsSubscription != null)
{
pointsSubscription.Dispose();
pointsSubscription = null;
}
};
return disposeChildSubscriptions;
});
}
N.B. This solution is adapted from Paul Bett's answer here; CombineLatest, but only push for the left
Any criticism or help would be hugely helpful as I am only just brushing the surface of Rx.
Upvotes: 4
Views: 1919
Reputation: 10783
I have several questions.
I have made some assumptions on these above questions, and assumed that you need at least two events in a region (I have called it a fence) to generate a percentage. I have also assumed that we will only react to cursor move events and timestamp them to do any temporal analysis.
Having said that, I have slapped together a set of Rx Unit tests and a Model that could be useful to start with for building a solution.
First I start with the classic Point class with X/Y properties
public class Point
{
public int X { get; set; }
public int Y { get; set; }
}
Then I created a Model that hopefully encapsulates the kind of things we are trying to solve. I am not sure if this is the best name for this class
class Dweller
{
private static readonly TimeSpan BigGapPeriod = TimeSpan.FromSeconds(0.5);
private readonly Point _startLocation;
private readonly DateTimeOffset _startTime;
private readonly DateTimeOffset _currentTime;
private readonly TimeSpan _durationInFence;
private static readonly TimeSpan CompleteTime = TimeSpan.FromSeconds(1);
public Dweller()
: this(new Point(), DateTimeOffset.MinValue, DateTimeOffset.MinValue)
{ }
private Dweller(Point startLocation, DateTimeOffset startTime, DateTimeOffset currentTime)
{
_startLocation = startLocation;
_startTime = startTime;
_currentTime = currentTime;
_durationInFence = currentTime - _startTime;
}
public TimeSpan DurationInFence
{
get { return _durationInFence; }
}
public double Percentage
{
get { return RoundDown(Math.Min(_durationInFence.Ticks / (double)CompleteTime.Ticks, 1.0), 1); }
}
public Dweller CreateNext(Point location, DateTimeOffset now)
{
if (IsInitialValue() || !IsWithinFence(location) || HasCompleted() || IsNewSequence(now))
{
return new Dweller(location, now, now);
}
return new Dweller(_startLocation, _startTime, now);
}
private bool IsNewSequence(DateTimeOffset now)
{
return now > (_currentTime + BigGapPeriod);
}
private bool HasCompleted()
{
return Percentage == 1.0;
}
private bool IsInitialValue()
{
return _startTime == DateTimeOffset.MinValue;
}
private bool IsWithinFence(Point point)
{
//Put your own logic here
return Math.Abs(point.X - _startLocation.X) < 100
&& Math.Abs(point.Y - _startLocation.Y) < 100;
}
private static double RoundDown(double i, double decimalPlaces)
{
var power = Math.Pow(10, decimalPlaces);
return Math.Floor(i * power) / power;
}
}
I built this up from these Unit tests. Note that I use the TestScheduler to fake-out time. This means that the tests can run very quickly and I dont need to have any Thread.Sleep or WaitHandle primitives.
[TestFixture]
public class DwellTriggerTests : ReactiveTest
{
//Need a predicate to break the fence. Maybe we actually want to know how long we have been within the fence?
// Scan-> home cord, start time, current duration
//
public IObservable<double> Query(IObservable<Point> coords, IScheduler scheduler)
{
return coords.Scan(
new Dweller(),
(acc, cur) => acc.CreateNext(cur, scheduler.Now))
.Select(dweller => dweller.Percentage)
.DistinctUntilChanged()
.Where(percentage => percentage > 0.0);
}
[Test]
public void Trigger_10Percent_after_100ms_of_mouse_position_within_fence()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(040.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(060.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(080.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(100.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(120.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(120.Milliseconds(), 0.1)
);
}
[Test]
public void Trigger_20Percent_after_200ms_of_mouse_position_within_fence()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(120.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(220.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(120.Milliseconds(), 0.1),
OnNext(220.Milliseconds(), 0.2)
);
}
[Test]
public void Trigger_100Percent_after_1000ms_of_mouse_position_within_fence()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(220.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(420.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(620.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(820.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(1020.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(220.Milliseconds(), 0.2),
OnNext(420.Milliseconds(), 0.4),
OnNext(620.Milliseconds(), 0.6),
OnNext(820.Milliseconds(), 0.8),
OnNext(1020.Milliseconds(), 1.0)
);
}
[Test]
public void Reset_after_sequence_hits_100Percent()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(220.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(420.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(620.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(820.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(1020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(1120.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(1220.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(220.Milliseconds(), 0.2),
OnNext(420.Milliseconds(), 0.4),
OnNext(620.Milliseconds(), 0.6),
OnNext(820.Milliseconds(), 0.8),
OnNext(1020.Milliseconds(), 1.0),
OnNext(1220.Milliseconds(), 0.1)
);
}
[Test]
public void Reset_if_period_of_500ms_of_silence_occurs()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(120.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(621.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(721.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(120.Milliseconds(), 0.1),
OnNext(721.Milliseconds(), 0.1)
);
}
}
public static class TestExtentions
{
public static long Milliseconds(this int input)
{
return TimeSpan.FromMilliseconds(input).Ticks;
}
public static long Seconds(this int input)
{
return TimeSpan.FromSeconds(input).Ticks;
}
}
If it helps, add these using statements
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using NUnit.Framework;
Upvotes: 2
Reputation: 3464
This should work for the normal trigger:
public static IObservable<double> NormalTrigger<T>(this IObservable<T> source, IObservable<bool> trigger, TimeSpan window)
{
return source.Select(s =>
trigger
.Take(window)
.Take(1)
.Select(t => t ? 100.0 : 0.0))
.Switch();
}
EDIT: Test code, works as expected
var trigger = new Subject<bool>();
coords.NormalTrigger(trigger, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
coords.OnNext(new Point(0, 0));
System.Threading.Thread.Sleep(1100);
trigger.OnNext(true); // Shouldn't trigger
coords.OnNext(new Point(1.01, 0));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(1.02, 1));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(1.03, 1));
System.Threading.Thread.Sleep(50);
trigger.OnNext(false); // Should trigger 0
coords.OnNext(new Point(0.5, 0.5));
System.Threading.Thread.Sleep(50);
trigger.OnNext(true); // Should trigger 100
coords.OnNext(new Point(1.04, 1));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(2.05, 40));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(0.06, 0));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(1.07, 0));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(0.08, 0));
System.Threading.Thread.Sleep(1100);
trigger.OnNext(true);
Console.ReadLine();
Upvotes: 0