Reputation: 700
Ok, so I'm new to RxJs and I can't figure out some thing. I need to implement image processing in which the user adds multiple images at a time and for each image, among others, following actions must occur:
But I don't want all thumbnails to be generated at once, I'd like them to generate sequentially. Here's what I tried: https://codesandbox.io/s/funny-mountain-tm0jj?expanddevtools=1&fontsize=14
I'll also paste the code here:
import { of } from "rxjs";
import { map } from "rxjs/operators";
const imageObservable = of(1, 2, 3, 4, 5);
function generateThumbnail(image) {
console.log(`Generating thumbnail ${image}`);
return new Promise(resolve => {
setTimeout(() => {
console.log(`Finished generating thumbnail ${image}`);
resolve({
image,
thumbnail: `This is a thumbnail of image ${image}`
});
}, 1500);
});
}
async function upload(imagePromise) {
const image = await imagePromise;
console.log("Uploading", image);
return new Promise(resolve => {
setTimeout(() => {
console.log("Finished uploading", image);
resolve();
}, 1500);
});
}
imageObservable.pipe(map(generateThumbnail)).subscribe(upload);
I'd like RxJS to wait for previous generateThumbnail
in order to perform current call (that's why it returns a promise object) and also wait for previous upload
in order to perform the current one (so upload
is also returning promise). I have no idea how to achieve that. Any RxJS ninja would like to help me with that?
Upvotes: 1
Views: 2070
Reputation: 21442
To achieve the following:
So the final code could be
imageObservable
.pipe(
// preserve orders and wait for previous observable
concatMap(generateThumbnail),
// map the generate
map(upload)
)
.subscribe();
Note: upload does not wait for the generateThumbnail promise since this should be handled by concatMap itself.
async function upload(image) {
console.log(`[upload] start ${image.image}`);
return new Promise(resolve => {
setTimeout(() => {
console.log(`[upload] done ${image.image}`);
resolve();
}, 1000);
});
}
Upvotes: 2
Reputation: 10127
Do you only want generating thumbnails to be sequential or does generation of a second thumbnail need to wait for upload of the first one to finish as well?
If you don't want to wait for upload, you just need to concat them:
import { of, from, } from 'rxjs';
import { concatMap } from 'rxjs/operators';
const imageObservable = of(1, 2, 3, 4, 5);
function generateThumbnail(image) {
console.log(`Generating thumbnail ${image}`);
return new Promise(resolve => {
setTimeout(() => {
console.log(`Finished generating thumbnail ${image}`);
resolve({
image,
thumbnail: `This is a thumbnail of image ${image}`
});
}, 1500);
});
}
async function upload(imagePromise) {
const image = await imagePromise;
console.log('Uploading', image);
return new Promise(resolve => {
setTimeout(() => {
console.log('Finished uploading', image);
resolve();
}, 1500);
});
}
// imageObservable.pipe(map(generateThumbnail)).subscribe(upload);
imageObservable.pipe(concatMap(image => from(generateThumbnail(image)))).subscribe();
concatMap does what you want, and I just made an Observable
out of your Promise
with from
creational operator, which I don't think is needed. But it's usually easier to work with Observables
instead of Promises
when you use RxJs.
function generateThumbnail(image): Observable<any> {
console.log(`Generating thumbnail ${image}`);
return from(
new Promise(resolve => {
setTimeout(() => {
console.log(`Finished generating thumbnail ${image}`);
resolve({
image,
thumbnail: `This is a thumbnail of image ${image}`
});
}, 1500);
})
);
}
Edit: Can you try this?
imageObservable
.pipe(
concatMap(image => generateThumbnail(image)),
concatMap(image => upload(image))
)
.subscribe();
Upvotes: 1