lapsus
lapsus

Reputation: 3035

Use Reactive Extensions to filter on "significant changes" in observable stream

I have a GeoLocationProvider (which implements IObservable<System.Device.Location.GeoCoordinate> which outputs the current location every x milliseconds.

Now I want to use RX to read all those GPS coordinates and only notify subscribers of the subject of a location change if it is significant (for example - distance traveled > 10 meters).

My "Main" code

// [...]
IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();
LocationFeed locationFeed = new LocationFeed(locationProvider);

// register any interested observers on the locationFeed.
ConsoleLocationReporter c1 = new ConsoleLocationReporter("reporter0001");
locationFeed.Subscribe(c1);

My LocationFeed implementation looks like that:

using System;
using System.Device.Location;
using System.Reactive.Subjects;

namespace My.Namespace.Movement
{
    public class LocationFeed : ISubject<GeoCoordinate>, IDisposable
    {
        private readonly IDisposable _subscription;
        private readonly Subject<GeoCoordinate> _subject;

        public LocationFeed(IObservable<GeoCoordinate> observableSource)
        {
            _subject = new Subject<GeoCoordinate>();
            _subscription = observableSource.Subscribe(_subject); // TODO: Add logic to filter to only significant movement changes (> 10m)
        }

        public void Dispose()
        {
            _subscription?.Dispose();
            _subject?.Dispose();
        }

        public void OnNext(GeoCoordinate value)
        {
            _subject.OnNext(value);
        }

        public void OnError(Exception error)
        {
            _subject.OnError(error);
        }

        public void OnCompleted()
        {
            _subject.OnCompleted();
        }

        public IDisposable Subscribe(IObserver<GeoCoordinate> observer)
        {
            return _subject.Subscribe(observer);
        }
    }
}

Question 1: GeoCoordinate provides a method c1.DistanceTo(c2) to calculate the distance between two coordinates. I want to only report (publish) new GeoCoordinates if the threshold compared to the last one pushed is greater than x. How do I achieve that?

Question 2: Is the usage of subject OK and the way I implemented ISubject? I didn't want to add all the wiring in my "main" code and move it all to a separate class.

Upvotes: 1

Views: 662

Answers (1)

Lee Campbell
Lee Campbell

Reputation: 10783

I would strongly suggest not implementing ISubject<T> (or for that matter IObservable<T> or IObserver<T>). Instead try to compose the existing factories and types and then expose them as "has a" relationships not "is a" relationships.

As you can see your LocationFeed is purely a wrapper over the observableSource parameter, so doesn't appear to solve any problem. I would suggest deleting it.

With regards to your posted problem, one solution is to use a Buffer of size 2 and stride length of 1.

IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();

locationProvider
    .Buffer(2,1)
    .Where(buffer=>buffer[0].DistanceTo(buffer[1]) > 10)
    .Select(buffer=>buffer[1])
    .Subscribe(
        pos => Console.WriteLine(pos),
        ex => { },
        () => {});

or you could use Scan

IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();

locationProvider
    .Scan(Tuple.Create(GeoCoordinate.Zero,GeoCoordinate.Zero), (acc, cur)=>Tuple.Create(acc.Item2, cur))
    .Where(pair=>pair.Item1.DistanceTo(pair.Item2) > 10)
    .Select(pair=>pair.Item2)
    .Subscribe(
        pos => Console.WriteLine(pos),
        ex => { },
        () => {});

I am not sure what your requirements are for the first value produced. Should it be published or not?

EDIT: Here is a tested solution (using Point type) that will push a Unit when a "significant" change occurs. If this isn't exactly what you want, it should be enough for you to tinker with to get what you actually want

void Main()
{
    var zero = new System.Drawing.Point(0,0);
    var fenceDistance = 10;

    var scheduler = new TestScheduler();
    var source = scheduler.CreateColdObservable(
        ReactiveTest.OnNext(1, new System.Drawing.Point(0,0)),
        ReactiveTest.OnNext(2, new System.Drawing.Point(0,9)),  //Not far enough
        ReactiveTest.OnNext(3, new System.Drawing.Point(0,10)), //Touches the fence
        ReactiveTest.OnNext(4, new System.Drawing.Point(0,15)), //Not far enough
        ReactiveTest.OnNext(5, new System.Drawing.Point(0,40))  //Breaches the fence        
        );

    var observer = scheduler.CreateObserver<Unit>();

    source
        .Scan(Tuple.Create(zero, zero), (acc, cur) =>
        {
            if (DistanceBetween(acc.Item1, cur) >= fenceDistance)
            {
                return Tuple.Create(cur, cur);
            }
            else
            {
                return Tuple.Create(acc.Item1, cur);
            }
        })
        .Where(pair => pair.Item1 == pair.Item2)
        .Select(pair => Unit.Default)
        .Subscribe(observer);


    scheduler.Start();

    ReactiveAssert.AreElementsEqual(new[] {
        ReactiveTest.OnNext(1, Unit.Default),
        ReactiveTest.OnNext(3, Unit.Default),
        ReactiveTest.OnNext(5, Unit.Default)
    },observer.Messages);

}

// Define other methods and classes here
public static double DistanceBetween(System.Drawing.Point a, System.Drawing.Point b)
{
    var xDelta = a.X -b.X;
    var yDelta = a.Y - b.Y;

    var distanceSqr = (xDelta * xDelta) + (yDelta * yDelta);
    return Math.Sqrt(distanceSqr);
}

Upvotes: 4

Related Questions