Ryan Weiss
Ryan Weiss

Reputation: 1318

RxJs - What is the proper pattern/way to create an Observable array of Observables?

Am fairly new to RxJs, and trying to wrap my head around what the proper pattern is to simply create an Observable array of Observables.

I want to retrieve a list of User's Posts. The Posts themselves should be Observables, and I want to keep them all in an Observable array, so that when the array changes the calling code should be notified and update anything subscribed to the post "list". This is simple enough, but I also would like each of the Posts to be Observables, so if I retrieve a specific posts[i] from it, I should also be able to subscribe to these individual objects. What is the proper way to do this?

Am using Angular 9, I have:

public getPosts(): Observable<Array<Post>> {
    return new Promise((resolve, reject) = {
        let posts: Observable<Array<Post>> = new Observable<Array<Post>>();
        this.get<Array<Post>>('posts').subscribe(r => {
            posts = from(r);
            return resolve(posts);
        });
    });
}

This gives me an Observable<Array<Post>>, but how should I create an Observable<Array<Observable<Post>>>? Is this an anti-pattern?

Upvotes: 2

Views: 404

Answers (3)

Jonathan Stellwag
Jonathan Stellwag

Reputation: 4287

Given informations:

!If any of this statements is wrong, please tell me and I will update the answer!

  1. get function that returns an observable with one array that filled with posts
  2. the get observable emits always when the posts are changing
  3. the value inside the observable (Array>) is no observable and does not change over time
this.get<Array<Post>>('posts')

Possible functions

  1. () => getPostById$
// This function returns you an observable with the post related to your id.
// If the id is not found the observable will not emit
// If the id is found the observable will only emit if the interface values have been changed
function getPostById$(id: string): Observable<Post> {
  // Returns you either the post or undefined if not found
  const findId = (id: string) => (posts: Array<Post>): Post | undefined =>
     posts.find(post => post.id === id);

  // Allows you only to emit, if id has been found
  const existingPost = (post: Post | undefined): boolean => post != null;

  // Allows you only to emit if your id has been changed
  const postComparator = (prevPost: Post, currPost: Post): boolean =>
    prevPost.value === currPost.value && prevPost.name === currPost.name;

  return this.get('posts').pipe(
    map(findId(id)),
    filter(existingPost),
    distinctUntilChanged(postComparator)
  );
}

  1. () => getPosts$
function getPosts$(): Observable<Array<Post>> {
  return this.get('posts');
}
  1. () => getStatePosts$
// This function allows to manage your own state
// 1. posts$: overwrites all posts
// 2. clear$: empties your posts$ observable
// 3. add$: adds one observable to the end of your posts
function statePosts$(posts$: Observable<Array<Posts>>, clear$: Observable<void>, add$: Observable<Post>): Observable<Array<Post>> {
  const updatePosts = (newPosts: Array<Posts>) => (oldPosts: Array<Posts>) => newPosts;

  const clearPosts = () => (oldPosts: Array<Posts>) => [];

  const addPost = (post: Post) => (oldPosts: Array<Posts>) => [...oldPosts, post];

  return merge(
    // You can add as much update functions as you need/want (eg: deleteId, addPostAtStart, sortPosts, ...)
    posts$.pipe(map(updatePosts)),
    clear$.pipe(map(clearPosts)),
    add$.pipe(map(addPost))
  ).pipe(
    // The fn in you scan is the (oldPosts: Array<Posts>) function from one of your three update functions (updatePosts, clearPosts and addPosts).
    // Whenever one of those three observables emits it first calls the left side of the function inside the map (post: Post) and returns a new function
    // When this function reaches the scan it gets the oldPosts and is able to update it
    scan((oldPosts, fn) => fn(oldPosts), [])
  )
}

// Usage
private posts$: Observable<Array<Post>> = this.get('posts');
private clear$: Subject<void> = new Subject();
private add$: Subject<Post> = new Subject();

public statePosts$ = getStatePosts(posts$, clear$, add$);

Hint: Try to read the functions from the return statement first. And then check what is happening in the mapping/filtering or other operations. Hopefully I did not confuse you too much. If you have questions, feel free to ask.

Upvotes: 1

bryan60
bryan60

Reputation: 29355

it's unclear what you're trying to accomplish here, but you might want something more like this:

@Injectable({providedIn:'root'})
export class PostService {
  // private replay subject will cache one value
  private postSource = new ReplaySubject<Post[]>(1)
  // public list of posts observable
  posts$ = this.postSource.asObservable();
  // function to select item by id out of list
  post$ = (id) => this.posts$.pipe(map(posts => posts.find(p => p.id === id)))

  getPosts() {
    // function to get remote posts
    return this.get<Post[]>('posts');
  }

  loadPosts() {
    // function to load posts and set the subject value
    this.getPosts().subscribe(posts => this.postSource.next(posts));
  }
}

you'll have to define that get function and call loadPosts everytime you want to update the list.

Upvotes: 1

Akxe
Akxe

Reputation: 11595

It all comes to convenience, if your server serves you differential data of what changed in post, then go ahead and create Observable<Observable<Post>[]>.


In your post, however, there are multiple problems. You cannot mix Observables with Promises. The method getPosts will return only the first post you get from API.


This is the solution ask for, but I am not sure, it is what you actually wanted...

public getPosts(): Observable<Array<Observable<Post>>> {
  return this.get('posts').pipe(
    switchMap(posts => combineLatest(
      posts.map(post => this.get('post', post.id))
    )),
  );
}

Upvotes: 1

Related Questions