Reputation: 69
I'm using Phoenix Channels with RxJS and a service to make a map react to location updates. My map component is subscribed to that observable (service) from a BehaviorSubject and executes a function to reset/update markers on the map.
My main issue is whenever a user stops reporting its location, the marker is kept on screen an the reset method will not be ran because no event was received at the subscription. I have 3 ideas on how to implement it but sadly my RxJS skills are very low, they all involve using an interval:
I'm aware they are very similar but I'm having a hard time implementing it with my current knowledge of rxjs.
Websocket Service
this.channel.on('driver_update', data => {
this.ordersOnDelivery = Object.assign(this.ordersOnDelivery, [data]);
this.orders.next(this.ordersOnDelivery);
// this.driversUpdated.emit(this.ordersOnDelivery);
});
Map Component
this.subscription = this.websocket.orders$.subscribe(ordersOnDelivery => {
if (this.markers.length !== ordersOnDelivery.length) {
console.log("Reseting markers with new locations")
this.resetMarkers(ordersOnDelivery);
} else {
console.log("Updating markers")
this.updateMarkers(ordersOnDelivery);
}
Upvotes: 0
Views: 1611
Reputation: 8022
You could create a special reset data and send it after a specified time.
this.subscription = this.websocket.orders$.pipe(
switchMap(orders => timer(5000).pipe(
mapTo(/*TimeoutData*/),
startWith(orders)
))
).subscribe(orders => {
if(isTimeoutData(orders)){
console.log("Removing all markers")
this.resetMarkers();
}else if (this.markers.length !== ordersOnDelivery.length) {
console.log("Reseting markers with new locations")
this.resetMarkers(orders);
} else {
console.log("Updating markers")
this.updateMarkers(orders);
}
});
This will send /*TimeoutData*/
if this.websocket.orders$
doesn't emit for 5 seconds. It will unsubscribe from the timer and create a new one if order$ does emit before the timer fires. TimeoutData
can be anything. For example, a concrete implementation may look like this.
this.subscription = this.websocket.orders$.pipe(
switchMap(orders => timer(5000).pipe(
mapTo(null),
startWith(orders)
))
).subscribe(orders => {
if(orders == null){
console.log("Removing all markers")
this.resetMarkers();
}else if (this.markers.length !== ordersOnDelivery.length) {
console.log("Reseting markers with new locations")
this.resetMarkers(orders);
} else {
console.log("Updating markers")
this.updateMarkers(orders);
}
});
Update: You can replace timer(x)
with interval(x)
. I'm not sure this is what you want, but this will keep emitting TimeoutData
every x milliseconds until websocket.orders$
emits and resets the interval.
Upvotes: 1
Reputation: 6424
Consider replacing your BehaviorSubject
with a simple Subject
, then use timeoutWith
(RXJS operator) to emit an empty value (null) when no value has been emitted within X seconds, as demonstrated below:
this.websocket.orders$
.pipe(
timeoutWith(X * 1000, EMPTY),
repeat()
)
.subscribe(orders => {
if (!orders) // <== reset markers here
else if (this.markers.length !== orders.length) this.resetMarkers(orders);
else this.updateMarkers(orders);
})
Note: the above example cannot be used along with a BehaviorSubject
as it would repeatedly emit the value it buffers, alternately you could follow the below example if you insist on using BehaviorSubject
:
this.websocket.orders$
.pipe(
tap(orders => {
if (this.markers.length !== orders.length) this.resetMarkers(orders);
else this.updateMarkers(orders);
}),
switchMapTo(
timer(X * 1000).pipe(() => {
// reset markers here
// timer gets reset on each new emitted event
})
)
)
.subscribe()
Upvotes: 1