Mark Glass
Mark Glass

Reputation: 386

How can I Use RxJs 6 GroupBy to organize stream? Need to concatenate all emissions from multiple Grouped Observables

Input Observable stream: The data is obtained from an observable stream that is the result of a REST request for projects. The data is obtained as Observable<Project[]>.

     const project1: Project = {
        id: 1,
        title: 'zebra',
        rootId: 1,
      }
    
      const project2: Project = {
        id: 2,
        title: 'algebra',
        rootId: 2,
      }
    
      const project3: Project = {
        id: 3,
        title: 'Bobcats',
        rootId: 1,
      }
    
      const project4: Project = {
        id: 4,
        rootId: 2,
      }
    
      const project5: Project = {
        id: 5,
        title: 'Marigolds',
        rootId: 1,
      }
    
      const project6: Project = {
        id: 6,
        title: 'whatever',
        rootId: null,
      }
    
      const project7: Project = {
        id: 7,
        title: 'peppercorns',
        rootId: null,
      }
    
    let groupProjects: Observable<ProjectSummary[]> 
= getGroupProjects(of([project1, project2, project3, project4, project5, project6, project7]]));
    
      getGroupProjects(projects$: Observable<ProjectSummary[]>): Observable<ProjectSummary[]> {
        const timer$ = timer(5000);
        const data = projects$.pipe(takeUntil(timer$), flatMap(projects => projects));
        const groupedObservables = data.pipe(
          groupBy(projects => projects.rootId),
          tap( a => console.log('groupBy:' + a.key))
        );
        const merged = groupedObservables.pipe(
          mergeMap(a => a.pipe(toArray())),
          shareReplay(1),
          tap( a => console.log('final:' + JSON.stringify(a)))
        );
        return merged;
      }

The desired output is:

Object{  //Root of 1
  id: 1,
  title: 'zebra',
  rootId: null
}
Object{
  id: 3, //child of 1
  title: 'Bobcats',
  rootId: 1
} 
Object{
  id: 5, //child of 1
  title: 'Marigolds',
  rootId: 1
}
Object{
  id: 2, //root of 2
  title: 'algebra',
  rootId: 2
}
Object{
  id: 4,  //child of 2
  title: 'dogs',
  rootId: 2
}
Object{
  id: 6,  //unaffiliated
  title: 'whatever',
  rootId: null
}
Object{
  id: 7, //unaffiliated
  title: 'peppercorns',
  rootId: null
}

The requirement is that groups identified by rootId appear in sequence before their children (children appear after their root) and unaffiliated are listed together. roots are identified when id = rootId, children are identified when rootId != null && id != rootId. Unaffiliated are identified by null root id.

Currently only the last group is emitted. How can I return an observable that emits all groups and in correct sequence? --thanks

Upvotes: 2

Views: 462

Answers (1)

Adrian Brand
Adrian Brand

Reputation: 21638

groupBy takes a stream of objects and emits a single group when the stream completes, it doesn't work with streams of arrays. What you want is a scan. Scan is like a reduce but it emits each time the source stream emits rather than once at the end.

I am not quite understanding what you are trying to achieve from you question but this should get you started

sourceThatEmitsArrays.pipe(
  scan(
   (results, emittedArray) => functionThatAddsEmittedArrayToResults(results, emittedArray),
   [] // Start with an empty array
  )
)

This is the same as a normal reduce function on arrays but emits results each time the source emits.

functionThatAddsEmittedArrayToResults would looks something like

(results, array) => array.reduce(
  (newResults, current) => {
    const group = findCurrentGroupInNewResultsOrCreateNewGroup(newResults, current);
    replacePreviousGroupInResultsOrAddTheNewOne(newResults, group);
    return newResults;
  },
  results // Start with previous results
)

Upvotes: 2

Related Questions