Rustam-Z
Rustam-Z

Reputation: 113

Handle null values with PySpark for each row differently

I have NULL in Latitude and Longitude, I need to search address via API then replace the NULL values.

How I could do that? How to iterate through each row and extract city name, then pass it to API?

+-------------+--------------------+-------+------------+--------------------+--------+---------+
|           Id|                Name|Country|        City|             Address|Latitude|Longitude|
+-------------+--------------------+-------+------------+--------------------+--------+---------+
|  42949672960|Americana Resort ...|     US|      Dillon|         135 Main St|    null|     null|
|  60129542147|Ubaa Old Crawford...|     US| Des Plaines|     5460 N River Rd|    null|     null|
| 455266533383|        Busy B Ranch|     US|   Jefferson|  1100 W Prospect Rd|    null|     null|
|1108101562370|             Motel 6|     US|    Rockport|       106 W 11th St|    null|     null|
|1382979469315|           La Quinta|     US|  Twin Falls|    539 Pole Line Rd|    null|     null|
| 292057776132|        Hyatt Dulles|     US|     Herndon|2300 Dulles Corne...|    null|     null|
| 987842478080|      Dead Broke Inn|     US|       Young|47893 N Arizona H...|    null|     null|
| 300647710720|The Miner's Inn M...|     US|    Viburnum|Highway 49 Saint ...|    null|     null|
| 489626271746|      Alyssa's Motel|     US|       Casco|              Rr 302|    null|     null|
+-------------+--------------------+-------+------------+--------------------+--------+---------+

Upvotes: 1

Views: 226

Answers (2)

pltc
pltc

Reputation: 6082

Agree with @SCouto on the UDF, but I'd suggest returning a tuple instead of a comma-separated string. This will save two extra split transformations later.

def get_latitude_longitude(address):
    #call your api with address as parameter
    #concat the latitude and longitude that the api call returns and return it
    return (lat, lon)

from pyspark.sql import functions as F
from pyspark.sql import types as T

get_latitude_longitude_UDF = F.udf(get_latitude_longitude, T.ArrayType(T.DoubleType()))

(df
    .withColumn('latlon', get_latitude_longitude_UDF('Address'))
    .withColumn('lat', df['latlon'][0])
    .withColumn('lon', df['latlon'][1])
)

Upvotes: 2

SCouto
SCouto

Reputation: 7928

You can create an UDF:

First define a funtction that given an address returns latitude an longitude (for instance, concatenated as a string as follows: "latitude,longitud")

def get_latitude_longitude(address):
    #call your api with address as parameter
    #concat the latitude and longitude that the api call returns and return it
    return lat_longitude_concat

Then register that function as an user defined function

from pyspark.sql.functions import udf
get_latitude_longitude_UDF = udf(lambda z: get_latitude_longitude(z))

Finally call the UDF, and split the output in two columns

from pyspark.sql.functions import col

   df.withColumn('tmp', get_latitude_longitude_UDF('Address'))
     .withColumn('Latitude', split(df['tmp'], ',').getItem(0))
     .withColumn('Longitude', split(df['tmp'], ',').getItem(1))
     .drop("tmp")

Upvotes: 2

Related Questions