user1835010
user1835010

Reputation: 11

How to use SparkContext.submitJob to call REST API

Can someone please provide an example of submitJob method call

Found reference here: How to execute async operations (i.e. returning a Future) from map/filter/etc.?

I believe i can implement it for my use case

In my current Implementation I am using paritions to invoke parallel calls, but they are waiting for the response before invoking the next call

Dataframe.rdd.reparition(TPS allowed on API)
.map(row => {
            val response = callApi(row)
            parse(response)
    })

But as there is latency at the API end, I am waiting 10 seconds for the response before parsing and then make the next call. I have a 100 TPS but current logic i see only 4-7 TPS

If someone has used SparkContext.submitJob , to make asynchronous calls please provide an example as I am new spark and scala

I want to invoke the calls without waiting for response, ensuring 100 TPS and then once I receive response I want to parse and create Dataframe on top of it.

I had previously tried collecting the rows and invoking API calls from master node, seems to be limited by hardware for creating large thread pool

submitJob[T, U, R](rdd: RDD[T], processPartition: (Iterator[T]) ⇒ U, partitions: Seq[Int], resultHandler: (Int, U) ⇒ Unit, resultFunc: ⇒ R): SimpleFutureAction[R]

Rdd - rdd out of my Dataframe

paritition - my rdd is already partitioned, do i provide range 0 to No.of.partitions in my rdd ?

processPartition - is it my callApi() ?

resultHandler - not sure what is to be done here

resultFunc - I believe this would be parsing my response

How to I create Dataframe after SimpleFutureAction

Can someone please assist

Upvotes: 1

Views: 743

Answers (1)

Hristo Iliev
Hristo Iliev

Reputation: 74355

submitJob won't make your API calls automatically faster. It is part of the low-level implementation of Spark's parallel processing - Spark splits actions into jobs and then submits them to whatever cluster scheduler is in place. Calling submitJob is like starting a Java thread - the job will run asynchronously, but not faster than if you simply call the action on the dataframe/RDD.

IMHO your best option is to use mapPartitions which allows you to run a function within the context of each partition. You already have your data partitioned so to ensure maximum concurrency, just make sure you have enough Spark executors to actually have those partitions running simultaneously:

df.rdd.repartition(#concurrent API calls)
  .mapPartitions(partition => {
    partition.map(row => {
      val response = callApi(row)
      parse(response)
    })
  })
  .toDF("col1", "col2", ...)

mapPartitions expects a function that maps Iterator[T] (all data in a single partition) to Iterator[U] (transformed partition) and returns RDD[U]. Converting back to a dataframe is a matter of chaining a call to toDF() with the appropriate column names.

You may wish to implement some sort of per-thread rate limiting in callApi to make sure no single executor fires a large number of requests per second. Keep in mind that executors may run in both separate threads and/or separate JVMs.

Of course, just calling mapPartitions does nothing. You need to trigger an action on the resulting dataframe for the API calls to actually fire.

Upvotes: 1

Related Questions