eran otzap
eran otzap

Reputation: 12533

C# Reactive Extensions - Subscribe to stream of aggregation

I Have a stream of points and would like to combine each 2 points in order to draw a line .

public class MyPoint
{
    public int X { get; set; }
    public int Y { get; set; }
}

I am looking for something that would combine the functionality of Aggregate and Select , Meaning that i would like to later Subscribe to ether a complex type combining the 2 points , or to receive an aggregation as a parameter to my Observer's OnNext delegate :

Something like :

    pointObservable.Subscribe((prev, curr) => { }); 

or

    pointObservable.Subscribe((myLineStruct) => { }); 

Sample to build on :

  List<MyPoint> points = new List<MyPoint>();

  for (int i = 0; i < 10; i++)
  {
       points.Add(new MyPoint{ X = i , Y = i * 10});
  }

  IObservable<MyPoint> pointObservable = points.ToObservable();

After trying 2 solutions , i came across some issues :

First of here is my actual Stream :

observable  // Stream of 250 points arriving every interval 
     .Take(_xmax + 10)   // for test purposes take only the Graph + 10 
     .Select(NormalizeSampleByX) // Nomalize X  ( override Real X with display X (
     .Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX)  // which returns current only if current.X > prev.X 
     .DistinctUntilChanged() // remove all redundant previous points elements 

     // here i end up with a stream of normalized points 

     .Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr})
             // Dmitry Ledentsov 's addition  
            .Subscribe(res =>
            {
               Debug.WriteLine(" {0} {1}  , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);   
            });

with Dmitry's addition i get the following result .

0 862  , 252 -21 
1 888  , 253 -24 
2 908  , 254 -28 
3 931  , 255 -31 
4 941  , 256 -35 
5 890  , 257 -38 
6 802  , 258 -41 
7 676  , 259 -44 
8 491  , 260 -48 
9 289  , 261 -51 
10 231  , 262 -55 

@Enigmativity's suggestion :

 observable.Take(_xmax + 10)
            .Select(NormalizeSample)
            .Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX)
            .DistinctUntilChanged()
            .Publish(obs => obs.Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr}))
            .Subscribe(res =>
            {
               Debug.WriteLine(" {0} {1}  , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);   
            });

results in :

 59 862  , 1 -21 
 60 867  , 2 -24 
 61 893  , 3 -28 
 62 912  , 4 -31 
 63 937  , 5 -35 
 64 937  , 6 -38 
 65 870  , 7 -41 
 66 777  , 8 -44 
 67 632  , 9 -48 
 68 444  , 10 -51 
 69 289  , 11 -55  
 ...
 ...

Upvotes: 2

Views: 648

Answers (3)

Dmitry Ledentsov
Dmitry Ledentsov

Reputation: 3660

The easiest way is probably zipping the sequence with a shifted original sequence:

var res = pointObservable.Zip(
    pointObservable.Skip(1),
    (p1, p2) => new { A = p1, B = p2 }
);

res.Subscribe(Console.WriteLine);

resulting in

{ A = (0,0), B = (1,10) }
{ A = (1,10), B = (2,20) }
{ A = (2,20), B = (3,30) }
{ A = (3,30), B = (4,40) }
{ A = (4,40), B = (5,50) }
{ A = (5,50), B = (6,60) }
{ A = (6,60), B = (7,70) }
{ A = (7,70), B = (8,80) }
{ A = (8,80), B = (9,90) }

given a ToString Method for MyPoint

Update:

as of the comments, in order to avoid unwanted side effects on subscriptions, the original sequence has to be Published before zipping. Thus, James World's answer using Scan is probably what you should use.

Using James' CombineWithPrevious:

var res = pointObservable
    .CombineWithPrevious((p1, p2) => new { A = p1, B = p2 })
    .Skip(1);

gives the same result

or a more succint version by Engimativity:

var res = pointObservable
    .Publish(po => 
        po.Zip(
            po.Skip(1),
            (p1, p2) => new { A = p1, B = p2 }
        )
    );

Upvotes: 2

James World
James World

Reputation: 29786

Observable.Scan is the easiest way to fold or compare current and previous items. I blogged on this here with some nice diagrams. Here is the code from that article, with an example specifically with points. The extension method is very flexible though, and it works with any source and result types:

public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
    this IObservable<TSource> source,
    Func<TSource, TSource, TResult> resultSelector)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (previous, current) => Tuple.Create(previous.Item2, current))
        .Select(t => resultSelector(t.Item1, t.Item2));
}

So if you have a complex type Delta like this:

public class Delta
{
    public Point P1 { get;set; }
    public Point P2 { get;set; }

    public static Delta Create(Point P1, Point P2)
    {
        return new Delta {
            P1 = P1,
            P2 = P2
        };
    }

    public override string ToString()
    {
        return string.Format("Delta is (" + (P2.X - P1.X)
            + "," + (P2.Y - P1.Y) + ")");
    }
}

You can use as follows:

Subject<Point> ps = new Subject<Point>();

ps.CombineWithPrevious(Delta.Create)
  .Subscribe(d => Console.WriteLine(d));

ps.OnNext(new Point(1,1));
ps.OnNext(new Point(2,2));
ps.OnNext(new Point(2,3));

Your output will be:

Delta is (0,0)
Delta is (1,1)
Delta is (1,1)
Delta is (2,3)

Note that default(TSource) is used to set an initial default - you can easily modify this to specify an initial default value, or handle that in the result selector, or skip the first element etc (.Skip(1)) - there are lots of options.

Upvotes: 3

Brandon
Brandon

Reputation: 39192

You want Scan:

points
    .Scan((LineSegment)null, (prev, point) => new LineSegment(prev == null ? point : prev.End, point))
    .Skip(1) // skip the first line segment which will not be valid
    .Subscribe(lineSegment => ... );

Upvotes: 1

Related Questions