Reputation: 11
Can someone please provide an example of submitJob method call
Found reference here: How to execute async operations (i.e. returning a Future) from map/filter/etc.?
I believe i can implement it for my use case
In my current Implementation I am using paritions to invoke parallel calls, but they are waiting for the response before invoking the next call
Dataframe.rdd.reparition(TPS allowed on API)
.map(row => {
val response = callApi(row)
parse(response)
})
But as there is latency at the API end, I am waiting 10 seconds for the response before parsing and then make the next call. I have a 100 TPS but current logic i see only 4-7 TPS
If someone has used SparkContext.submitJob , to make asynchronous calls please provide an example as I am new spark and scala
I want to invoke the calls without waiting for response, ensuring 100 TPS and then once I receive response I want to parse and create Dataframe on top of it.
I had previously tried collecting the rows and invoking API calls from master node, seems to be limited by hardware for creating large thread pool
submitJob[T, U, R](rdd: RDD[T], processPartition: (Iterator[T]) ⇒ U, partitions: Seq[Int], resultHandler: (Int, U) ⇒ Unit, resultFunc: ⇒ R): SimpleFutureAction[R]
Rdd - rdd out of my Dataframe
paritition - my rdd is already partitioned, do i provide range 0 to No.of.partitions in my rdd ?
processPartition - is it my callApi() ?
resultHandler - not sure what is to be done here
resultFunc - I believe this would be parsing my response
How to I create Dataframe after SimpleFutureAction
Can someone please assist
Upvotes: 1
Views: 743
Reputation: 74355
submitJob
won't make your API calls automatically faster. It is part of the low-level implementation of Spark's parallel processing - Spark splits actions into jobs and then submits them to whatever cluster scheduler is in place. Calling submitJob
is like starting a Java thread - the job will run asynchronously, but not faster than if you simply call the action on the dataframe/RDD.
IMHO your best option is to use mapPartitions
which allows you to run a function within the context of each partition. You already have your data partitioned so to ensure maximum concurrency, just make sure you have enough Spark executors to actually have those partitions running simultaneously:
df.rdd.repartition(#concurrent API calls)
.mapPartitions(partition => {
partition.map(row => {
val response = callApi(row)
parse(response)
})
})
.toDF("col1", "col2", ...)
mapPartitions
expects a function that maps Iterator[T]
(all data in a single partition) to Iterator[U]
(transformed partition) and returns RDD[U]
. Converting back to a dataframe is a matter of chaining a call to toDF()
with the appropriate column names.
You may wish to implement some sort of per-thread rate limiting in callApi
to make sure no single executor fires a large number of requests per second. Keep in mind that executors may run in both separate threads and/or separate JVMs.
Of course, just calling mapPartitions
does nothing. You need to trigger an action on the resulting dataframe for the API calls to actually fire.
Upvotes: 1