MrFoh
MrFoh

Reputation: 2701

Reduce stream into value with Rx scan operator

I have a stream of location data from a real-time data source and I am trying to use the rx scan operator to reduce the stream and compute the distance traveled as new location changes are emitted.

The emitted values are in this format

{
  lat: 3.4646343,
  lng: 6.4343234,
  speed: 1.3353,
  heading: 279
}

And this is the processor

function distanceBetweenPoints (point, point2, unit: string = 'M') {
    const radlat1 = Math.PI * (point.lat / 180);
    const radlat2 = Math.PI * (point2.lat / 180);
    const theta = point.lng - point2.lng;
    const radtheta = Math.PI * theta / 180;
    let dist = Math.sin(radlat1) * Math.sin(radlat2) + Math.cos(radlat1) * Math.cos(radlat2) * Math.cos(radtheta);
    dist = Math.acos(dist);
    dist = dist * 180 / Math.PI;
    dist = dist * 60 * 1.1515;

    if (unit == 'KM') { dist = dist * 1.609344; }
    if (unit == 'M') { dist = dist * 1.609344 * 1000; }
    return dist;
}

const location$: Subject<LocationUpdate> = new Subject();

location$
    .pipe(
        map(location => {
            return { lat: location.lat, lng: location.lng };
        }),
        scan((x, y) => {
            return distanceBetweenPoints(x, y);
        }),
        takeUntil(locationUpdateEnd$)
    )
    .subscribe(distance => console.info('distance', distance));

I'm pretty sure that I am using the scan operator wrong because i'm seeing Nan and a numeric value in the output. Any help would be appreciated.

Upvotes: 1

Views: 65

Answers (2)

MrFoh
MrFoh

Reputation: 2701

I used the pairwise operator

location$
        .pipe(
            map(coords => {
                return { lat: coords.lat, lng: coords.lng };
            }),
            pairwise(),
            scan((acc, coords) => {
                return acc + distanceBetweenPoints(coords[0], coords[1]);
            }, 0),
            takeUntil(endtrip$)
        )
        .subscribe(console.log);

Upvotes: 0

miedwar
miedwar

Reputation: 7278

Use pairwise() instead of scan().

pairwise() will emit pairs of values: [last_value, current_value], in your case, [last_location, current_location].

Then map() that to your distance function.

Upvotes: 2

Related Questions