Reputation: 467
I am doing this below thing in my 8gigs of laptop and running code in Intellij. I am calling 3 apis in parallel with map
function and scalaj
library and calculating time to call each api as follows :
val urls = spark.sparkContext.parallelize(Seq("url1", "url2", "url3"))
//for each API call,execute them in different executor and collate data
val actual_data = urls.map(x => spark.time(HTTPRequestParallel.ds(x)))
When spark.time
is executed,i have expected 3 sets of time but it gives me 6 sets of time
Time taken: 14945 ms
Time taken: 21773 ms
Time taken: 22446 ms
Time taken: 6438 ms
Time taken: 6877 ms
Time taken: 7107 ms
What am i missing here and is it really parallel calls to api in nature?
Upvotes: 0
Views: 2144
Reputation: 3863
Actually that piece of code alone won't execute any spark.time
at all, the map function is lazy so it won't be executed until you perform an action on the RDD
. You should also consider that if you don't persist your transformed RDD
it will re-compute all transformations for every action. What this mean is that if you are doing something like this:
val urls = spark.sparkContext.parallelize(Seq("url1", "url2", "url3"))
//for each API call,execute them in different executor and collate data
val actual_data = urls.map(x => spark.time(HTTPRequestParallel.ds(x)))
val c = actual_data.count()
actual_data.collect()
There will be 6 executions of what is defined inside the map
(two for each element in the RDD
, first one for the count
and the second for the collect
). To avoid this re-computation you can cache or persist the RDD
as follows
val urls = spark.sparkContext.parallelize(Seq("url1", "url2", "url3"))
//for each API call,execute them in different executor and collate data
val actual_data = urls.map(x => spark.time(HTTPRequestParallel.ds(x))).cache()
val c = actual_data.count()
actual_data.collect()
In this second example you will only see 3 logs instead of 6
Upvotes: 1