Alejandro Fuentes
Alejandro Fuentes

Reputation: 55

Spark UDF on Python function

I have created a python function for translating short strings using the GCP Translate API. The codes does something like this.

def translateString(inputString, targetLanguage, apiKey):
    baseUrl = "https://translation.googleapis.com/language/translate/v2?key="
    q = "&q="
    gcpKey = apiKey
    target = "&target="
    sentence = str(inputString)

    #Finialize request url
    url = baseUrl + gcpKey + q + sentence + target

    #SEND REQUEST WITH EXPONENTIAL BACK OFF IN CASE OF ERRORS OF EXCEEDING QUOTA LIMITATIONS API
    session = requests.Session()        
    retry = Retry(connect=3, backoff_factor=100)
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    respons = session.get(url, timeout=120)

    if respons.status_code == 200:
      data = respons.json()       
      translatedStr = data["data"]["translations"][0]["translatedText"]
      returnString = str(translatedStr)
      return(returnString)

    else:
      return("Error with code: " + str(respons.status_code))

udfTrans = F.udf(translateString, StringType())

apiKey = *********

dfTempNo = dfToProcess.withColumn("TRANSLATED_FIELD", udfTrans(lit(dfToProcess.FIELD_TO_PROCESS), lit("no"), lit(apiKey)))

This work great when looping through a pd.DataFrame and storing return variables as we go! But now I need to apply this function on a spark.DataFrame so the work can be distributed and have created the following udfTrans = F.udf(translateString, StringType()) so that it can be applied to string column in spark.DataFrame.

When I run the UDF on dfTempNo = dfToProcess.withColumn("TRANSLATED_FIELD", udfTrans(lit(dfToProcess.FIELD_TO_PROCESS), lit("no"), lit(apiKey))) it returns no errors but takes forever to run on dfToProcess with more than 1 row.

I am unsure if I have misunderstood how UDF's are applied to columns in spark.DataFrame . Is it even possible to apply a function like this to a spark.DataFrame using a UDF or will I be better off doing this in Python/Pandas?

Upvotes: 1

Views: 1255

Answers (1)

Napoleon Borntoparty
Napoleon Borntoparty

Reputation: 1962

Python udfs cannot be parallelised like this, because your executor needs to call back to the driver for the execution of your udf. This unfortunately means that your udf is going to be blocking for each row and is essentially serial in its execution.

This can be solved more efficiently using different approaches. As your function is heavily IO bound (more specifically network bound), you could look at something like ThreadPool implementation, storing your output in a Dict, then calling SparkContext.parallelize() on your Dict and going from there.

Alternatively, you could write your udf in scala, as it will be automatically parallel in execution.

Alternatively alternatively, have a look at https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf as pandas udf can be vectorized. Hope this helps!

Upvotes: 2

Related Questions