Babu
Babu

Reputation: 5260

Python Spark Dataframes : how to update column based on conditions from different column

I would like to do very simple thing, but cannot figure out how to do it in Python/Spark(1.5)/Dataframe (it's all new for me).

original dataset:

code| ISO | country
1   | AFG | Afghanistan state
2   | BOL | Bolivia Plurinational State

new dataset:

code| ISO | country
1   | AFG | Afghanistan
2   | BOL | Bolivia

I would like to do something like this (in pseudo Python?):

iso_to_country_dict = {'AFG': 'Afghanistan', 'BOL': 'Bolivia'}

def mapCountry(iso,country):
    if(iso_to_country_dict[iso] is not empty):
        return iso_to_country_dict[iso]
    return country

dfg = df.select(mapCountry(df['ISO'],df['country']))

Just for simplicity the mapCountry could look like this:

def mapCountry(iso,country):
    if(iso=='AFG'):
        return 'Afghanistan'
    return country

but with this is there error: ValueError: Cannot convert column into bool:

Upvotes: 1

Views: 302

Answers (2)

Elior Malul
Elior Malul

Reputation: 691

I would like to offer a different approach ; UDFs are always an option, but they are somewhat inefficient and cumbersome IMO. The when and otherwise paradigm can solve this issue. First, for efficiency - represent the dictionary by a DataFrame:

df_iso = spark.createDataFrame([('bol', 'Bolivia'),
                                ('hti', 'Cape-Verde'),
                                ('fra', 'France')], ['iso', 'country'])

Then lets consider the following data:

df_data = spark.createDataFrame(
    map(lambda x: (x, ),
    ['fra', 'esp', 'eng', 'usa', 'bol']), ['data'])

Then we make the ISO lookup by a join:

df_data = df_data.join(df_iso, F.col('data') == F.col('iso'),
                       'left_outer')

And finally, we add the desired column (I named it result) based on the match:

df_data = df_data.select(
    F.col('data'),
    F.when(F.col('iso').isNull(), F.col('data'))
    .otherwise(F.col('country')).alias('result'))

The result would then be:

+----+-------+
|data|    res|
+----+-------+
| esp|    esp|
| bol|Bolivia|
| eng|    eng|
| fra| France|
| usa|    usa|
+----+-------+

Upvotes: 0

Babu
Babu

Reputation: 5260

Well, I found solution, but don't know if this is the cleanest way how to do that. Any other ideas?

iso_to_country_dict = {'BOL': 'Bolivia', 'HTI': 'Cape Verde','COD':'Congo','PRK':'Korea','LAO':'Laos'}

def mapCountry(iso,country):
    if(iso in iso_to_country_dict):
        return iso_to_country_dict[iso]
    return country

mapCountry=udf(mapCountry)

dfg = df.select(df['iso'],mapCountry(df['iso'],df['country']).alias('country'),df['C2'],df['C3'],df['C4'],df['C5'])

note: C1,C2,..C5 are names of all other columns

Upvotes: 1

Related Questions