Reputation: 183
can you help me to optimize this code and make it work? this is original data:
+--------------------+-------------+
| original_name|medicine_name|
+--------------------+-------------+
| Venlafaxine| Venlafaxine|
| Lacrifilm 5mg/ml| Lacrifilm|
| Lacrifilm 5mg/ml| null|
| Venlafaxine| null|
|Vitamin D10,000IU...| null|
| paracetamol| null|
| mucolite| null|
I'm expect to get data like this
+--------------------+-------------+
| original_name|medicine_name|
+--------------------+-------------+
| Venlafaxine| Venlafaxine|
| Lacrifilm 5mg/ml| Lacrifilm|
| Lacrifilm 5mg/ml| Lacrifilm|
| Venlafaxine| Venlafaxine|
|Vitamin D10,000IU...| null|
| paracetamol| null|
| mucolite| null|
This is the code:
distinct_df = spark.sql("select distinct medicine_name as medicine_name from medicine where medicine_name is not null")
distinct_df.createOrReplaceTempView("distinctDF")
def getMax(num1, num2):
pmax = (num1>=num2)*num1+(num2>num1)*num2
return pmax
def editDistance(s1, s2):
ed = (getMax(length(s1), length(s2)) - levenshtein(s1,s2))/
getMax(length(s1), length(s2))
return ed
editDistanceUdf = udf(lambda x,y: editDistance(x,y), FloatType())
def getSimilarity(str):
res = spark.sql("select medicine_name, editDistanceUdf('str', medicine_name) from distinctDf where editDistanceUdf('str', medicine_name)>=0.85 order by 2")
res['medicine_name'].take(1)
return res
getSimilarityUdf = udf(lambda x: getSimilarity(x), StringType())
res_df = df.withColumn('m_name', when((df.medicine_name.isNull)|(df.medicine_name.=="null")),getSimilarityUdf(df.original_name)
.otherwise(df.medicine_name)).show()
now i'm getting error:
command_part = REFERENCE_TYPE + parameter._get_object_id() AttributeError: 'function' object has no attribute '_get_object_id'
Upvotes: 2
Views: 1042
Reputation:
There is a bunch of problems with your code:
SparkSession
or distributed objects in the udf
. So getSimilarity
just cannot work. If you want to compare objects like this you have to join
.length
and levenshtein
come from pyspark.sql.functions
there cannot be used inside UserDefinedFunctions
. There are designed to generate SQL expressions, mapping from *Column
to Column
.Column isNull
is a method not property
so should be called:
df.medicine_name.isNull()
Following
df.medicine_name.=="null"
is not a syntactically valid Python (looks like Scala calque) and would throw compiler exceptions.
If SparkSession
access was allowed in an UserDefinedFunction
this wouldn't be a valid substitution
spark.sql("select medicine_name, editDistanceUdf('str', medicine_name) from distinctDf where editDistanceUdf('str', medicine_name)>=0.85 order by 2")
You should use string formatting methods
spark.sql("select medicine_name, editDistanceUdf({str}, medicine_name) from distinctDf where editDistanceUdf({str}, medicine_name)>=0.85 order by 2".format(str=str))
Maybe some other problems, but since you didn't provide a MCVE, anything else would be pure guessing.
When you fix smaller mistakes you have two choices:
Use crossJoin
:
combined = df.alias("left").crossJoin(spark.table("distinctDf").alias("right"))
Then apply udf
, filter, and one of the methods listed in Find maximum row per group in Spark DataFrame to closest match in group.
Use built-in approximate matching tools as explained in Efficient string matching in Apache Spark
Upvotes: 1