abhigyan bhushan
abhigyan bhushan

Reputation: 11

Using dictionary in regexp_replace function in pyspark

I want to perform an regexp_replace operation on a pyspark dataframe column using dictionary.

Dictionary : {'RD':'ROAD','DR':'DRIVE','AVE':'AVENUE',....} The dictionary will have around 270 key value pair.

Input Dataframe:

ID  | Address    
1   | 22, COLLINS RD     
2   | 11, HEMINGWAY DR    
3   | AVIATOR BUILDING    
4   | 33, PARK AVE MULLOHAND DR

Desired Output Dataframe:

ID   | Address  | Address_Clean    
1    | 22, COLLINS RD    | 22, COLLINS ROAD    
2    | 11, HEMINGWAY DR     | 11, HEMINGWAY DRIVE    
3    | AVIATOR BUILDING      | AVIATOR BUILDING    
4    | 33, PARK AVE MULLOHAND DR    | 33, PARK AVENUE MULLOHAND DRIVE

I cannot find any documentation on internet. And if trying to pass dictionary as below codes-

data=data.withColumn('Address_Clean',regexp_replace('Address',dict))

Throws an error "regexp_replace takes 3 arguments, 2 given".

Dataset will be around 20 million in size. Hence, UDF solution will be slow (due to row wise operation) and we don't have access to spark 2.3.0 which supports pandas_udf. Is there any efficient method of doing it other than may be using a loop?

Upvotes: 1

Views: 2728

Answers (1)

Nadia Tomova
Nadia Tomova

Reputation: 101

It is trowing you this error because regexp_replace() needs three arguments:

regexp_replace('column_to_change','pattern_to_be_changed','new_pattern')

But you are right, you don't need a UDF or a loop here. You just need some more regexp and a directory table that looks exactly like your original directory :)

Here is my solution for this:

# You need to get rid of all the things you want to replace. 
# You can use the OR (|) operator for that. 
# You could probably automate that and pass it a string that looks like that instead but I will leave that for you to decide.

input_df = input_df.withColumn('start_address', sf.regexp_replace("original_address","RD|DR|etc...",""))


# You will still need the old ends in a separate column
# This way you have something to join on your directory table.

input_df = input_df.withColumn('end_of_address',sf.regexp_extract('original_address',"(.*) (.*)", 2))


# Now we join the directory table that has two columns - ends you want to replace and ends you want to have instead.

input_df = directory_df.join(input_df,'end_of_address')


# And now you just need to concatenate the address with the correct ending.

input_df = input_df.withColumn('address_clean',sf.concat('start_address','correct_end'))

Upvotes: 2

Related Questions