Reputation: 55
I have created a python function for translating short strings using the GCP Translate API. The codes does something like this.
def translateString(inputString, targetLanguage, apiKey):
baseUrl = "https://translation.googleapis.com/language/translate/v2?key="
q = "&q="
gcpKey = apiKey
target = "&target="
sentence = str(inputString)
#Finialize request url
url = baseUrl + gcpKey + q + sentence + target
#SEND REQUEST WITH EXPONENTIAL BACK OFF IN CASE OF ERRORS OF EXCEEDING QUOTA LIMITATIONS API
session = requests.Session()
retry = Retry(connect=3, backoff_factor=100)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
respons = session.get(url, timeout=120)
if respons.status_code == 200:
data = respons.json()
translatedStr = data["data"]["translations"][0]["translatedText"]
returnString = str(translatedStr)
return(returnString)
else:
return("Error with code: " + str(respons.status_code))
udfTrans = F.udf(translateString, StringType())
apiKey = *********
dfTempNo = dfToProcess.withColumn("TRANSLATED_FIELD", udfTrans(lit(dfToProcess.FIELD_TO_PROCESS), lit("no"), lit(apiKey)))
This work great when looping through a pd.DataFrame
and storing return variables as we go! But now I need to apply this function on a spark.DataFrame
so the work can be distributed and have created the following udfTrans = F.udf(translateString, StringType())
so that it can be applied to string
column in spark.DataFrame
.
When I run the UDF on
dfTempNo = dfToProcess.withColumn("TRANSLATED_FIELD", udfTrans(lit(dfToProcess.FIELD_TO_PROCESS), lit("no"), lit(apiKey)))
it returns no errors but takes forever to run on dfToProcess
with more than 1 row.
I am unsure if I have misunderstood how UDF's are applied to columns in spark.DataFrame
. Is it even possible to apply a function like this to a spark.DataFrame
using a UDF or will I be better off doing this in Python/Pandas?
Upvotes: 1
Views: 1255
Reputation: 1962
Python udf
s cannot be parallelised like this, because your executor
needs to call back to the driver
for the execution of your udf
. This unfortunately means that your udf
is going to be blocking for each row and is essentially serial in its execution.
This can be solved more efficiently using different approaches. As your function is heavily IO bound (more specifically network bound), you could look at something like ThreadPool
implementation, storing your output in a Dict
, then calling SparkContext.parallelize()
on your Dict
and going from there.
Alternatively, you could write your udf
in scala
, as it will be automatically parallel in execution.
Alternatively alternatively, have a look at https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf
as pandas
udf
can be vectorized. Hope this helps!
Upvotes: 2