Reputation: 8732
I am building a logger object that asynchronously obtains the IP address and then logs all values with this IP address. It must start collecting logs as early as it is instantiated, but emit them only after the IP address has been obtained; and after that it should emit as normal.
Here is my class:
class LoggerService {
constructor() {
let thisIp;
const getIp = Observable.create(function(observer) {
// doing it with a timeout to emulate bad network
setTimeout(() => {
fetch('https://api.ipify.org?format=json').then(response => response.json()).then(response => {
thisIp = response.ip;
console.log('fetched IP: ', thisIp);
observer.next(response.ip);
observer.complete();
});
}, 5000)
});
// this is where I plan to buffer logs until IP is obtained
this.logStream = new Subject().pipe(buffer(getIp));
// for starters - just log to the console with the IP address
this.logStream.subscribe((value) => console.log(thisIp, value));
}
emit = (message) => this.logStream.next(message);
}
But it does not work as I need; it does output all buffered values as an array but stops emitting them after the IP has been obtained:
const logger = new LoggerService();
setInterval(() => {
logger.emit('Hey ' + Math.random())
}, 1000);
// I get five messages and that's it
How do I make it emit my values even after buffering?
Upvotes: 1
Views: 236
Reputation: 14750
Looking back at this a year and a half later, I notice combineLatest/of aren't necessary; we can simply pipe the ip observable and map it to the desired shape. Here's how I would do it today:
export class LoggerService {
private messages$ = new Subject<string>();
private formattedMessages$ = this.messages$.pipe(
mergeMap(message => this.service.ipAddress$.pipe(
map(ip => `[${ip}] ${message}`)
))
);
constructor(private service: GenericService) {
this.formattedMessages$.subscribe(
message => console.log(message) // actual logging logic goes here...
);
}
public log(message: string) {
this.messages$.next(message);
}
}
You don't need to "buffer" the values per se, but you can rather create a stream that depends on the async ipAddress$
, so the value won't get emitted until the ip address had been emitted. combineLatest
will work well for this purpose.
Let's give the LoggerService
a message stream called message$
and a simple log()
method that pushes the provided string through this stream.
We can construct a stream of messagesWithIpAddresses$
that use combineLatest
to create an observable that emits the provided message along with the ipAddress$
, but only after both have actually emitted a value.
export class LoggerService {
private messages$ = new Subject<string>();
public log(message: string): void {
this.messages$.next(message);
}
constructor(service: GenericService) {
const messagesWithIpAddresses$ = this.messages$.pipe(
mergeMap(message => combineLatest(service.ipAddress$, of(message)))
);
messagesWithIpAddresses$.subscribe(
([ip, message]) => {
// actual logging logic would go here...
console.log(`[${ip}] ${message}`);
}
);
}
}
Since of(message)
will emit immediately, we will just be waiting for ipAddress$
. But, if a value has already been emitted, then it too will be immediate.
Check out this working StackBlitz
Upvotes: 1
Reputation: 840
Update: below answer won't work. After viewing buffer documentation your logger Subject() still won't emit anymore messages because it fires only when getIp fires. In your code, getIp fires only once, after the http request.
I think we need more details of what you want to achieve in order to propose a correct rxjs pipeline.
I would remove this line
observer.complete(); // remove it
That signals the subscriptor the stream is over, that's why you are not receiving any messages more.
Upvotes: 0