decebal
decebal

Reputation: 1211

RxJs - Push data into an observable

I want to implement an infinite table in Angular, so when I scroll down into the existing table, I want to get the next set of data from the API. I am trying to do this with a BehaviorSubject, like this:

page: number = 0;
size: number = 10;
clientsData$: BehaviorSubject<any> = new BehaviorSubject<any>([]);

onTableScroll(e): void {
    // get the next set of data
    this.getData();
}

getData(): void {
    this.clientsService.getAllClients().pipe(
      tap((data) => {
        this.clientsData$.next(data);
        console.log('this.clientsData$: ', this.clientsData$);    // it never reaches this line
      })
    );
}

ngOnInit(): void {
    this.getData();
}

But the problem is that, even if I used the async pipe in the template, it seems that it doesn't subscribe to the BehaviorSubject. So on init, this doesn't work at all, I don't even get the initial data). I need to get a new set of data each time I call getData() and update the table. I don't want to use subscribe, but to use the async pipe in the t emplate. Please advise, thanks!

Upvotes: 1

Views: 5359

Answers (3)

Jonathan Stellwag
Jonathan Stellwag

Reputation: 4267

Your approach can work but its much easier to implement observables in the following way:

  • Think about what is an event (here onTableScroll)
  • Write a function for your event and just next the value or void/nothing into a subject
  • Use your subject in the observables and combine it with your datasource
  • Only subscribe to the data that is needed in the UI or to communicate with some service (here clientData$)

const { Subject, Observable, merge } = rxjs;
const { first, switchMapTo } = rxjs.operators;

const tableScroll$$ = new Subject();
const clientData$ = getClientData$(tableScroll$$)

function onTableScroll(e) {
  tableScroll$$.next()
}

function getClientData$(scroll$) {
  // Observable for the ngOnInit getData$() call
  const initialData$ = getData$().pipe(first())

  // Observable for the scroll$ getData$() calls
  const scrollData$ = scroll$.pipe(
    switchMapTo(getData$())
  )

  return merge(
    initialData$,
    scrollData$
  )
}

function getData$() {
  return new Observable(obs => obs.next('foo'))
}

clientData$.subscribe(v => console.log('clientData$: ', v))

onTableScroll()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

Upvotes: 0

Mrk Sef
Mrk Sef

Reputation: 8022

You subscribe to clientData$ via angular's aynsc pipe. This means that new values emitted by clientData$ are automatically picked up. But you never send your behaviorSubject a new value (next(value)), so of course you won't see any data for it.

In your getData() function you create an observable (you don't run it) and then you return. So getData() doesn't do anything. You could re-write it as getData():void{} and this should all run roughly the same. It should be rather clear, then, that the call to clientData$.next(data) is never executed.

So how do you "fix" this? Well, creating an observable is like defining a function and calling a function is like subscribing to an observable --- you need to 'run' the observable you create in getData(). Subjects are both observables and observers, you can subscribe your subject to your observable in getData()

page: number = 0;
size: number = 10;
clientsData$: BehaviorSubject<any> = new BehaviorSubject<any>([]);

onTableScroll(e): void {
    // get the next set of data
    this.getData();
}

getData(): void {
    this.clientsService.getAllClients().pipe(
      tap(data => 
        console.log('this.clientsData$: ', this.clientsData$)
      )
    ).subscribe(this.clientsData$);
}

ngOnInit(): void {
    this.getData();
}

This should at least be running now. It hasn't accounted for error handling. I'm also assuming clientsService.getAllClients() returns an observable that will complete (or error) - it won't just stay running forever - otherwise you'd need a way to unsubscribe or risk a memory leak.

Upvotes: 1

AliF50
AliF50

Reputation: 18809

The reason it never reaches this is because you haven't subscribed to it. An observable only "takes flight" when you subscribe to it.

destructionSubject: Subject<any> = new Subject<any>();
getData(): void {
    this.clientsService.getAllClients().pipe(
      tap((data) => {
        this.clientsData$.next(data);
        console.log('this.clientsData$: ', this.clientsData$);    // it never reaches this line
      }),
      takeUntil(this.destructionSubject), // take until this destructionSubject emits
    ).subscribe(data => {
      console.log(data);
    });
}
...

ngOnDestroy(): void {
  this.destructionSubject.next(); // unsubscribe/destroy the subscription
}

Upvotes: 1

Related Questions