Reputation: 66555
In firestore, there are multiple documents that I would like to retrieve. Each document has a unique sourceAddressValue and therefore for the list of N strings, I would like to retrieve potentially N documents.
I tried to do the following:
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
return of(...addressesChunks).pipe(
mergeMap<string[], any>((x) => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)).valueChanges()),
toArray() // when this is removed, the code inside getOrders is triggered multiple times
);
}
public getOrders() {
this.getJSON().subscribe(data => {
this.orders = data.orders;
const addresses = this.orders.map(item => `${item.address}, ${item.postalCode}`);
this.dbService.getLocationAddresses(addresses).subscribe(data => {
console.log('data retrieved');
console.log(data);
});
this.ordersRefreshed.next(this.orders);
});
}
While trying to execute the code above, it seems that the execution is not completed. When I comment out toArray() inside getLocationAddresses, however, the subscribed function is fired multiple times, for each chunk separately.
Does anyone know how to group multiple completions of observable function such that it fires the observer only once?
Upvotes: 0
Views: 633
Reputation: 14740
Let's first explain the behavior you are seeing:
When I comment out
toArray()
insidegetLocationAddresses
, however, the subscribed function is fired multiple times, for each chunk separately.
The code inside the subscribe
is fired any time an emission is received. When you use mergeMap
you are creating an observable that has multiple "inner observables". Whenever any of these inner observables emits, the observable created by mergeMap
will emit.
So, if you pass n
number of emissions to mergeMap
, you can expect at least n
number of emissions (it's possible that these inner observables emit more than one time).
[with toArray] it seems that the execution is not completed.
When you use toArray()
, it will not allow any emissions until its source observable completes; then it emits an array of all received emissions. In this case, the source is the observable created by mergeMap
, which is composed of multiple .valueChanges()
observables.
However, observables created by the firestore .valueChanges()
, will emit whenever any document in the returned collection changes, but will never complete. Since these observables are long lived, toArray()
will never emit anything.
This StackBlitz illustrates the problem.
The solution depends on your desired behavior. Is your intention to call each of these queries once and return the results (one and done) OR is your intention to maintain a reactive stream that emits the most up to date representation of your query?
You can use take(1)
to force an observable to complete after receiving 1 emission, thus allowing toArray()
to also complete (Example - 1A):
return of(...addressesChunks).pipe(
mergeMap(x =>
this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges().pipe(take(1)) // <-- take(1) forces completion of inner observables
),
toArray()
);
Instead of using of
/mergeMap
/toArray
, you could use forkJoin
(Example 1B):
return forkJoin(
addressesChunks.map(
x => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges().pipe(take(1))
)
);
You can use combineLatest
to create an observable from multiple sources that emits whenever any of of the sources emit:
return combineLatest(
addressesChunks.map(
x => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges()
)
);
However, I believe that's pretty much what the firestore .valueChages()
is doing for you already. I understand that you are chunking your query, but I'm curious as to why.
It looks like you are issuing multiple queries only to combine the results back together when you get the return values.
I believe you can simply pass all your addresses
to a single
ref.where('sourceLocation', 'array-contains', addresses)
call and get the results at once. Was there some performance hit in doing it that way?
Upvotes: 2
Reputation: 17752
I think the key point here is that valueChanges()
returns an Observable that will notify any time the document it refers to changes. This basically means that the Observable returned does not complete, hence the streams generated via mergeMap
do not complete, hence toArray
does not fire since it would fire when the source stream completes.
combineLatest
, on the other hand, fires any time one of the observables passed in as parameter fires after all have fired at least once.
Now, if just want to retrieve the documents and are not interested in having an Observable that emits any time such documents change, than you could try to pipe a take(1)
operator into each mergeMap
.
The code would look like this
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
return of(...addressesChunks).pipe(
mergeMap<string[], any>((x) => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)).valueChanges().pipe(
take(1)
)
),
toArray() // when this is removed, the code inside getOrders is triggered multiple times
);
}
take(1)
fires the first notification and then completes the Observable. Since the upstream observables are going to be eventually completed, then toArray
can fire.
I am not familiar with Firebase rxJs library and maybe there are methods/functions that encompass the take(1)
behavior I have tried to describe above, but I hope I have been able to pass the core concept.
Upvotes: 0
Reputation: 66555
By using combineLatest, the function getLocationAddresses() returns merged results:
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks: string[][] = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
const observables = addressesChunks.map(addresses => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'in', addresses)).valueChanges());
return combineLatest(observables)
.pipe(map(arr => arr.reduce((acc, cur) => acc.concat(cur) ) ),);
}
Upvotes: 0