Shubhank Gupta
Shubhank Gupta

Reputation: 825

Observable in angular is not working parallel

I have a code in which i am calling an observable function using subscribe. I am expecting it to run parallel but it is running in sequence.

makedownloadData() is function to store all variables of "showData" list variable in different variable let's say "downloadData". function takes lots of time to copy all the variables to downloadData and further processing. So I thought of calling this function using observable/subscribe technique so that it should run parallerly without causing delay in current sequence but it is running in sequence only. No enhancement.

First method (without observable)

Calling the function

this.downloadData=this.makeDownloadData() //This step is taking lot's of time as function respond late. 
console.log("print after function call")

Function to be called

public makeDownloadData() {
var main_data = this.showData;
var down_data:any = [];
for (var i=0; i<main_data.length; i++){
  var element: String = "";
  var newDate = new Date(main_data[i]["@timestamp"]);
  element = element.concat(this.convertDateToLocalFormat(newDate)+" ");
  element = element.concat(String(main_data[i]["cu_hostname"])+" ");
  element = element.concat(String(main_data[i]["log_agent"])+".");
  element = element.concat(String(main_data[i]["log_level"])+" ");
  element = element.concat(String(main_data[i]["app_name"])+": ");
  element = element.concat(String(main_data[i]["log_message"])+" ");
  down_data.push(element.concat("\n"));
}
return down_data;

}

Output:

//Execution of function
"print after function call"

Second method (With observable)

Importing the requirements

import { Observable } from 'rxjs';
import 'rxjs/add/observable/of'

Calling an observable function.

this.makeDownloadData().subscribe(response => {
   console.log("Expected to print after") //This should be run in parallel and must printed after upcoming statement as this function is taking time to respond.
   console.log(response); //Expected to print after 
},
error => {
  console.log("Did not got response")
});

console.log("Expected to print before")

Function to be called

public makeDownloadData(): Observable<any> {
var main_data = this.showData;
var down_data:any = [];
for (var i=0; i<main_data.length; i++){
  var element: String = "";
  var newDate = new Date(main_data[i]["@timestamp"]);
  element = element.concat(this.convertDateToLocalFormat(newDate)+" ");
  element = element.concat(String(main_data[i]["cu_hostname"])+" ");
  element = element.concat(String(main_data[i]["log_agent"])+".");
  element = element.concat(String(main_data[i]["log_level"])+" ");
  element = element.concat(String(main_data[i]["app_name"])+": ");
  element = element.concat(String(main_data[i]["log_message"])+" ");
  down_data.push(element.concat("\n"));
}
return Observable.of(down_data)

}

Output :

"Expected to print after"
Printing response
"Expected to print before"

Expected Output :

"Expected to print before"
"Expected to print after"
Printing response

I want to use observable for parallel execution. Kindly help. if anything unclear than i will modify the question to make it more clear. Thanks

Upvotes: 0

Views: 329

Answers (3)

julianobrasil
julianobrasil

Reputation: 9377

There are two problems:

  1. your function makeDownloadData() is not an observable, it just returns one observable (the code that is taking time is before the return statement)
  2. not all observables are asynchronous by default (in fact, just a few of them are really asynchronous by default)

The following code has a completely asynchronous observable (thanks to the asyncScheduler parameter on the of function - without it of is a synchronous function that returns an observable):

const source = of('Hello, subscriber', asyncScheduler).pipe(
  tap(() => console.log('Inside observable')),
  map(x => {
    for(let i = 0; i < 1000000000; i++)
      for(let j = 0; j < 1; j++);

    return x;
  })
);
console.log('After observable')

source.subscribe(x => console.log(x));

By saying this is asynchronous, I mean that After observable will be in the console immediately. After some time you will see 'Inside observable' followed by 'Hello, subscriber'.

If you remove the asyncScheduler of the observable, you will wait for some time and will see the same sequence ('After observable', 'Inside observable', 'Hello, subscriber'), but all the code would be blocked until the loop inside the observable finished and, just after that, you would see the three strings being printed on the console almost at the same time. Take a look at this demo: https://stackblitz.com/edit/rxjs-stackoverflow-556297976433166?file=index.ts

So you can try this:

makeDownloadData(): any { // <== doesn't need to return an observable
  var main_data = this.showData;
  var down_data:any = [];
  for (var i=0; i<main_data.length; i++){
    var element: String = "";
    var newDate = new Date(main_data[i]["@timestamp"]);
    element = element.concat(this.convertDateToLocalFormat(newDate)+" ");
    element = element.concat(String(main_data[i]["cu_hostname"])+" ");
    element = element.concat(String(main_data[i]["log_agent"])+".");
    element = element.concat(String(main_data[i]["log_level"])+" ");
    element = element.concat(String(main_data[i]["app_name"])+": ");
    element = element.concat(String(main_data[i]["log_message"])+" ");
    down_data.push(element.concat("\n"));
  }
  return down_data // <== this doesn't need to return an observable
}

Update: I replaced of(this.makeDownloadData()) by of(1), otherwise it'll take the same time to create the operator that was needed to run the function in the original problem. Then you can map it to the desired function.

import { of, asyncScheduler } from 'rxjs'; 
import { map } from 'rxjs/operators';

...

this.downloadData=of(1, asyncScheduler)
    .pipe(map(() => this.makeDownloadData());

Using of here is not the best approach for creating an observable, but it's the most clear, by far, as I need the asyncScheduler to make it asynchronous. The '1' as the first parameter is a dumb data. You can use anything. The of function needs a parameter.

Upvotes: 1

Sangar Lal
Sangar Lal

Reputation: 50

first you need to call the subscribe method that is your before print after the subscribe get data then call the normal method that is your after print method so now you can get the correct answer i thing it will help you any clarification please rely this answer

Upvotes: 1

mbojko
mbojko

Reputation: 14679

Unlike with promises, where code like

Promise.resolve("one").then(console.log);
console.log("two");

results in "two" being printed before "one", observables aren't async in nature, just like callbacks aren't. So of("one").subscribe(console.log) will print immediately, not after the end of the event loop tick.

Here's a longer lecture: https://www.syntaxsuccess.com/viewarticle/rxjs-subjects-emit-synchronous-values

Upvotes: 0

Related Questions