pablodavila
pablodavila

Reputation: 69

RXJS - Executing a function after a stream being idle for x seconds

I'm using Phoenix Channels with RxJS and a service to make a map react to location updates. My map component is subscribed to that observable (service) from a BehaviorSubject and executes a function to reset/update markers on the map.

My main issue is whenever a user stops reporting its location, the marker is kept on screen an the reset method will not be ran because no event was received at the subscription. I have 3 ideas on how to implement it but sadly my RxJS skills are very low, they all involve using an interval:

  1. Create a different observable/subscription inside the map component and run the reset function every x seconds, regardless of any other events.
  2. Create a timer that executes the reset function every x seconds, but set it back to 0 whenever an event is received.
  3. Have the websocket service send a 'heartbeat' event every x seconds which would trigger the reset function on the map component.

I'm aware they are very similar but I'm having a hard time implementing it with my current knowledge of rxjs.

Websocket Service

this.channel.on('driver_update', data => {
  this.ordersOnDelivery = Object.assign(this.ordersOnDelivery, [data]);
  this.orders.next(this.ordersOnDelivery);
  // this.driversUpdated.emit(this.ordersOnDelivery);
});

Map Component

this.subscription = this.websocket.orders$.subscribe(ordersOnDelivery => {
  if (this.markers.length !== ordersOnDelivery.length) {
    console.log("Reseting markers with new locations")
    this.resetMarkers(ordersOnDelivery);
  } else {
    console.log("Updating markers")
    this.updateMarkers(ordersOnDelivery);
  }

Upvotes: 0

Views: 1611

Answers (2)

Mrk Sef
Mrk Sef

Reputation: 8022

You could create a special reset data and send it after a specified time.

this.subscription = this.websocket.orders$.pipe(
  switchMap(orders => timer(5000).pipe(
    mapTo(/*TimeoutData*/),
    startWith(orders)
  ))
).subscribe(orders => {
  if(isTimeoutData(orders)){
    console.log("Removing all markers")
    this.resetMarkers();
  }else if (this.markers.length !== ordersOnDelivery.length) {
    console.log("Reseting markers with new locations")
    this.resetMarkers(orders);
  } else {
    console.log("Updating markers")
    this.updateMarkers(orders);
  }
});

This will send /*TimeoutData*/ if this.websocket.orders$ doesn't emit for 5 seconds. It will unsubscribe from the timer and create a new one if order$ does emit before the timer fires. TimeoutData can be anything. For example, a concrete implementation may look like this.

this.subscription = this.websocket.orders$.pipe(
  switchMap(orders => timer(5000).pipe(
    mapTo(null),
    startWith(orders)
  ))
).subscribe(orders => {
  if(orders == null){
    console.log("Removing all markers")
    this.resetMarkers();
  }else if (this.markers.length !== ordersOnDelivery.length) {
    console.log("Reseting markers with new locations")
    this.resetMarkers(orders);
  } else {
    console.log("Updating markers")
    this.updateMarkers(orders);
  }
});

Update: You can replace timer(x) with interval(x). I'm not sure this is what you want, but this will keep emitting TimeoutData every x milliseconds until websocket.orders$ emits and resets the interval.

Upvotes: 1

Rafi Henig
Rafi Henig

Reputation: 6424

Consider replacing your BehaviorSubject with a simple Subject, then use timeoutWith (RXJS operator) to emit an empty value (null) when no value has been emitted within X seconds, as demonstrated below:

this.websocket.orders$
    .pipe(
        timeoutWith(X * 1000, EMPTY),
        repeat()
    )
    .subscribe(orders => {
        if (!orders)  // <== reset markers here
        else if (this.markers.length !== orders.length) this.resetMarkers(orders);
        else  this.updateMarkers(orders);          
    })

Note: the above example cannot be used along with a BehaviorSubject as it would repeatedly emit the value it buffers, alternately you could follow the below example if you insist on using BehaviorSubject:

 this.websocket.orders$
    .pipe(
        tap(orders => {
            if (this.markers.length !== orders.length) this.resetMarkers(orders);
            else this.updateMarkers(orders);
        }),
        switchMapTo(
            timer(X * 1000).pipe(() => {
                //  reset markers here
                //  timer gets reset on each new emitted event
            })
        )
    )
    .subscribe()

Upvotes: 1

Related Questions