Reputation: 1437
I'm attempting to use the Python port of the Google phonenumbers
library to normalize 50 Million phone numbers. I'm reading into a SparkDataFrame from a Parquet file on S3 and then running operations on the dataframe. The following function, parsePhoneNumber
, is expressed as a UDF:
def isValidNumber(phoneNum):
try:
pn = phonenumbers.parse(phoneNum, "US")
except:
return False
else:
return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)
def parsePhoneNumber(phoneNum):
if isValidNumber(phoneNum):
parsedNumber = phonenumbers.parse(phoneNum, "US")
formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)
return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
else:
return (False, None, None, None)
And below is a sample of how I use the UDF to derive new columns:
newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)
Executing the UDF by running display(newDataFrame)
or newDataFrame.show(5)
or something similar only uses one executer in the cluster, so it doesn't appear that something in the UDF is causing it only run on one worker.
If I'm doing anything that would prevent this from running in parallel, can you provide some insight?
The execution environment is on a cloud cluster controlled by Databricks.
Edit: Below is the output of oldDataFrame.explain
== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...
Upvotes: 6
Views: 8542
Reputation: 35249
You are all good. Display
, with default arguments shows the first 1000 rows at most. Similarly newDataFrame.show(5)
shows only the first five rows.
At the same time execution plain (oldDataFrame.explain
) shows no shuffles so in both cases Spark will evaluate only the minimum number of partitions to get the required number of rows - for these values it is probably one partition.
If you want to be sure:
oldDataFrame.rdd.getNumPartitions()
is larger than one.df.foreach(lambda _: None)
or newDataFrame.foreach(lambda _: None)
.You should see more active executors.
Upvotes: 8