hackp0int
hackp0int

Reputation: 4161

Long Polling on HttpClient result and streaming into the CSV file

Question 1: How do I implement the same behavior? But instead of Observable.interval it will be invoked by callbacks.

For example: I have 5000ms interval but my server is extremely slow, and it doesn't get back the result after 5000ms. But the next call is invoked after 5000ms. I don't want it like that. I would like after the result is return from the server it will invoke the next call.

Question 2: How do I stream the result immediately to csv file without creating multiple files one after another. For this current implementation I use FileSaver which works in IE11. I would like to continue using it. Is there a way to stream data to file instead of collecting it into the array, cause I have large dataset's. Like 1m rows and so... Example:

const progress = Observable.interval(1000)
  .switchMap(() => this.messageService.getResults(query))
  .map(messageResult => messageResult)
  .subscribe((data: MessagesResult) => {
    inProcess = true;
    if (!data.isMoreResults && data.auditMessageList.length === 0) {
      this.fileSaver.save(`result.csv`, csvData);
      inProcess = false;
      this.logger.info('Download file finished...');
      progress.unsubscribe();
    }
    const start = this.filterModel.offset * this.filterModel.limit;
    const rows = [...csvData];
    rows.splice(start, 0, ...data.auditMessageList);
    csvData = rows;
    if (inProcess) {
      this.logger.info('Exporting in progress...');
    }
    query.offset++;
  }, error => this.logger.error(error));

}

Upvotes: 4

Views: 1426

Answers (3)

hgiasac
hgiasac

Reputation: 2233

Question 1:

Use forkJoin. It will wait for all Observables completed. When you combine with delay(5000), the minimum time is 5s. If API response isn't returned before 5s, it still wait until the result returned (demo)

const stream1$ = of(1).pipe(
  delay(5000)
);

const intervalTime = Math.random() * 5000 + 5000

// replace with your API stream
const stream2$ = of(intervalTime).pipe(
  delay(intervalTime)
);

forkJoin(stream1$, stream2$)
  .subscribe(([_, s2]) => {
    console.log(s2);
  })

Question 2:

If the file is large, you should let Web Browser handle it. It's better to save file in server, then return a link to download it. For small file, the performance is not the issue. You can store file data in RAM then save file once.

Edit: FileSaver Developers recommend using StreamSaver if the file is large. You should take a look at it

StreamSaver.js takes a different approach. Instead of saving data in client-side storage or in memory you could now actually create a writable stream directly to the file system (I'm not talking about chromes sandboxed file system)

StreamSaver.js is the solution to saving streams on the client-side. It is perfect for webapps that need to save really large amounts of data created on the client-side, where the RAM is really limited, like on mobile devices.

Upvotes: 2

madjaoue
madjaoue

Reputation: 5224

Question 1

Here's an example implementing a function that calls itself when it gets a response.

Backend:

  1. Simulate a slow backend that responds within 5s and 10s
  2. At each response, the server gives the current request_number and a state
  3. For the 3 first responses, the state is active, after that, the state is closed

code:

/* Mocked backend. I'm slow, like really slow */
class SlowBackend {
  MAX_ITERATIONS = 3; // suppose you're reading a table and you have pagination, with 3 pages
  currentIteration = 0;

  constructor() {}

  getStuff() {
    console.log(`**Request N. ${this.currentIteration}**\n[Back] : received a request from the front`);
    const responseDelay = Math.random() * 5000 + 5000; // response between 5s and 10s
    let state = "open";
    if(++this.currentIteration > this.MAX_ITERATIONS)
      state = "closed";

    return Observable
      .timer(responseDelay)
      .map( () => {
      console.log(`[Back] : Responding after ${responseDelay} ms`)
        return {
          request_number : this.currentIteration,
          state : state
        };

      })
  }
}

Front:

This is basically your component.

class Frontend {

  isPollingActivated = true;
  responses = [];


  constructor(private backendService) {
    this.backendService = new SlowBackend(); // connection to backend
    this.requestOnRegularBasis();
  }

  requestOnRegularBasis() {
    if (!this.isPollingActivated)
      return;

    this.backendService.getStuff()
      .subscribe(response => {
        console.log(`[Front] : received response from server. State : ${response.state}`);

        // Choose one of the following blocks, comment the other according to what you need

        // Block 1 : Sync processing example
        console.log(`[Front] : doing some sync processing`);
        this.doSomeSyncProcessing(response);
        this.requestOnRegularBasis();

        // Block 2 : Async processing example
        // console.log(`[Front] : doing some async processing`);
        // this.doSomeAsyncProcessing(response)
        //    .subscribe(this.requestOnRegularBasis);

      })
  }

  private doSomeSyncProcessing(response){
    if(response.state == 'closed'){
      this.isPollingActivated = false; // stop polling
      this.saveDataToCsv();
    }
    else
      this.responses.push(Object.values(response).join(';')) // csv line separated by ';'
  }

  private saveDataToCsv(){
    const headers = ['current_request;state']
    this.responses = headers.concat(this.responses)
    console.log('saving to csv : ', this.responses.join('\n'));

    // Uncomment this to use FileSaver API
    /*
    const blob = new Blob(headers.concat(this.responses), {type: "text/csv;charset=utf-8"});
    saveAs(blob, "my_responses.csv");*
    */
  }

  private doSomeAsyncProcessing(response){
    return Observable.timer(1000).map(() => this.doSomeSyncProcessing(response));
  }

}

output:

**Request N. 0**
[Back] : received a request from the front
[Back] : Responding after 5482 ms
[Front] : received response from server. State : open
[Front] : doing some sync processing
**Request N. 1**
[Back] : received a request from the front
[Back] : Responding after 7489 ms
[Front] : received response from server. State : open
[Front] : doing some sync processing
**Request N. 2**
[Back] : received a request from the front
[Back] : Responding after 9627 ms
[Front] : received response from server. State : open
[Front] : doing some sync processing
**Request N. 3**
[Back] : received a request from the front
[Back] : Responding after 5806 ms
[Front] : received response from server. State : closed
[Front] : doing some sync processing
saving to csv :
current_request;state
1;open
2;open
3;open

Question 2

You can't.

At least not using FileSaver. Because it doesn't support writing chunk by chunk. When you instanciate your Blob, you have to have all your data ready. There are some libraries that support chunks but they are either meant for server side (node.js for example), or extremely browser specific.

Check this : Save client generated data as file in JavaScript in chunks

Note :

if you are trying to store a 1M rows csv in client's machine using js, then maybe something is wrong with the architecture. Because this is not a common usecase for browsers. Clients are supposed to have weak machines, an thus receive processed, light, easy to parse information. For that matter, you can for example construct the csv in the server side which will have all the rights to write stream files, and decent processing/memory capacity.

Demo : Question 1

http://jsbin.com/rojutudayu/2/edit?html,js,console

Demo : How to download the blob ?

    <script src="https://cdn.rawgit.com/eligrey/FileSaver.js/e9d941381475b5df8b7d7691013401e171014e89/FileSaver.min.js"> </script>

<script> 
var blob = new Blob(["Hello, world!"], {type: "text/plain;charset=utf-8"});
saveAs(blob, "hello world.txt");
</script>

Upvotes: 2

m1ch4ls
m1ch4ls

Reputation: 3435

As you have found out using Observable.interval won't 'wait' for the rest of the stream.

I generally use repeatWhen with delay

const progress = Observable.defer(() => this.messageService.getResults(query))
  .repeatWhen(notifications => notifications.delay(1000)) 
  ...

Here is working example: https://jsfiddle.net/a0rz6nLv/19/

I don't understand the rest of you code very well.

Don't use progress.unsubscribe(); in subscribe method. Instead consider using takeWhile or takeUntil - both will complete the observable for you.

.takeWhile(data => data.isMoreResults  data.auditMessageList.length > 0)

Also buffering results can be done for example by using reduce or toArray

.reduce((accumulator, data) => data.auditMessageList.concat(accumulator), [])

Side effects are best handled by do operator

.do({
  next: () => {
    inProgress = true;
    this.logger.info('Exporting in progress...');
  },
  complete: () => {
    inProgress = false;
    this.logger.info('Download file finished...');
  }
})

Regarding second question - I don't know - you should be able to stream csv from the server. If you cannot modify the server maybe someone else will know how to do it on client...

Upvotes: 3

Related Questions