philologon
philologon

Reputation: 2105

Approach to consume stream, transform, then hand to other consumers (without state)

New to Rx; trying to figure it out.

I am taking data from an accelerometer via events, adapting the data to my own format, then providing a data stream to other consumers via Reactive Extensions.

My approach is partially working in that an important constructor is being called. Can someone look at this and help me understand what I should do differently?

Hopefully, this snippet is sufficient:

public class accel_raw_producer : IObservable<AccelerometerFrame_raw>
{
   private static Spatial spatial = null;
   private IObservable<EventPattern<SpatialDataEventArgs>> spatialEvents;

   public accel_raw_producer ()
   {
      spatial = new Spatial();
      spatial.close();
      spatialEvents = System.Reactive.Linq.Observable.FromEventPattern
         <SpatialDataEventHandler, SpatialDataEventArgs>(
         handler => handler.Invoke,
         h => spatial.SpatialData += h,
         h => spatial.SpatialData -= h);
      subscription = spatialEvents.Subscribe();
      spatial.open(-1);
   }


   public IDisposable Subscribe(IObserver<AccelerometerFrame_raw> observer)
   {
      return
         (from evt in spatialEvents
         let e = evt.EventArgs
         select new AccelerometerFrame_raw
         (
            e.spatialData[0].Acceleration[0],
            e.spatialData[0].Acceleration[1],
            e.spatialData[0].Acceleration[2],
            e.spatialData[0].AngularRate[0],
            e.spatialData[0].AngularRate[1],
            e.spatialData[0].AngularRate[2]
         )).Subscribe();
   }
}

public consumerClass :IObserver<AccelerometerFrame_raw>
{
   accel_raw_producer accelStream;
   IDisposable Unsubscriber;

   public consumerClass()
   {
      accelStream = new accel_raw_producer();
      Unsubscriber = accelStream.Subscribe(this);
   }

   public void OnCompleted()
   {
      throw new NotImplementedException();
   }

   public void OnError(Exception error)
   {
      throw new NotImplementedException();
   }

   public void OnNext(AccelerometerFrame_raw accelFrame)
   {
      if (null == accelFrame) return;
      AccelX = accelFrame.Acceleration.X;
      AccelY = accelFrame.Acceleration.Y;
      AccelZ = accelFrame.Acceleration.Z;
      GyroX = accelFrame.Rotation.X;
      GyroY = accelFrame.Rotation.Y;
      GyroZ = accelFrame.Rotation.Z;
   }    
}

When I set a breakpoint in AccelerometerFrame_raw, it gets triggered, but the values are not propagating up to the consumer. So something in this method needs to be different.

Upvotes: 0

Views: 71

Answers (1)

Jim Wooley
Jim Wooley

Reputation: 10418

I'm not sure if I'm missing some details here, but you don't appear to be using the observer input parameter and the subscribe doesn't really do anything with the onNext value. Personally, I would consider swinging this around and just passing out the IObservable and let the caller subscribe to it themselves:

public IObservable<AccelerometerFrame_raw> AccelerometerFrames()
{
     return
        from evt in spatialEvents
        let e = evt.EventArgs
        select new AccelerometerFrame_raw
        (
           e.spatialData[0].Acceleration[0],
           e.spatialData[0].Acceleration[1],
           e.spatialData[0].Acceleration[2],
           e.spatialData[0].AngularRate[0],
           e.spatialData[0].AngularRate[1],
           e.spatialData[0].AngularRate[2]
        );
}

Upvotes: 3

Related Questions