Reputation: 15471
Say I wanted to separately handle the first event emitted by an Observable, but then continue my subscription.
For example, if I had the following chain:
observable.throttleTime(2000)
.takeFirst(() => console.log('potato'))
.switchMap((event:MouseEvent) => {
const element:Element = event.target as Element;
return this.innerObservable(('tomato'));
})
.subscribe(({result}) => console.log(result))
That fails, but how would I go about making such behaviour work?
Upvotes: 6
Views: 2136
Reputation: 544
You can use two subscribers, one for the default case to be applied to all events and the other for the special case of the first event:
import { of } from 'rxjs';
import { first } from 'rxjs/operators';
//emit 4 values
const source = of(1, 2, 3, 4);
/*
Subscriber One - only first value: 1
Subscriber Two - all values: 1
Subscriber Two - all values: 2
Subscriber Two - all values: 3
Subscriber Two - all values: 4
*/
const subscriberOne = source
.pipe(first())
.subscribe((val) => console.log(`Subscriber One - only first value: ${val}`));
const subscriberTwo = source.subscribe((val) =>
console.log(`Subscriber Two - all values: ${val}`)
);
see it live on stackblitz
And to answer your question, where there is a special handler for the first event and then a common handler for the rest of the event, excluding the first one:
import { of, skip } from 'rxjs';
import { first, publish } from 'rxjs/operators';
// emit 4 values
const source = of(1, 2, 3, 4);
// hold until `others` until `connect()` is called, skip `source` first event
const others = publish()(source.pipe(skip(1)));
// marker to make sure first event handler ran before the others
let onceRan = false;
// general case, has to be declared before so `others.subscribe()`
// runs before `others.connect()` is called
const subscriberTwo = others.subscribe((val) =>
console.log(`Subscriber Two - all values: ${val}, onceRan: ${onceRan}`)
);
// first event handler, `first()` => stops after the first event
const subscriberOne = source.pipe(first()).subscribe((val) => {
console.log(`Subscriber One - only first value: ${val}, onceRan: ${onceRan}`);
// toggle the merker
onceRan = true;
// release `others`
others.connect();
});
/*
Output:
Subscriber One - only first value: 1, onceRan: false
Subscriber Two - all values: 2, onceRan: true
Subscriber Two - all values: 3, onceRan: true
Subscriber Two - all values: 4, onceRan: true
*/
see it live on stackblitz
Upvotes: 2
Reputation: 2765
let isFirstCame= false;
observable
.subscribe(({result}) =>
if(!isFirst){
//here is our first one
doSpecialWithTheFirst(result);
isFirstCame=true;
}else{
//All the remaining
doOtherWithRemaining();
})
Upvotes: 1
Reputation: 682
You can create an operator that subscribes to the source observable, processes the first element and then removes itself from the chain. Let's call it onFirst
because takeFirst
is an existing operator in Java. Rxjs 5 example:
Observable.prototype.onFirst = function(callback) {
return Observable.create(observer => {
let source = this.publish()
let subs = source.subscribe({
next(v) {
subs.unsubscribe()
source.subscribe(observer)
callback(v)
},
error(e) { observer.error(e) },
complete() { observer.complete() }
})
source.connect()
})
}
// This prints
// first: 1
// 2
// 3
Observable.of(1, 2, 3)
.onFirst(v => console.log("first: " + v))
.subscribe(console.log)
Upvotes: 0
Reputation: 96891
Many RxJS 5 operators that take a callback function pass along also the index of the item going through. This means you could write for example the following:
observable.throttleTime(2000)
.map((value, index) => index === 0 ? 'potato' : value)
...
.subscribe(({result}) => console.log(result))
Indices are passed for example to filter()
, switchMap()
, takeWhile()
operators and more.
Upvotes: 8
Reputation: 17859
Use publish along with first and concat operators. Here is an example:
Rx.Observable.interval(1000)
.publish(shared => {
return shared.first().mapTo('FIRST')
.concat(shared.skip(1));
})
.subscribe(x=>console.log(x))
Upvotes: 0