Reputation: 57
I am using a python function to calculate distance between two points given the longitude and latitude.
def haversine(lon1, lat1, lon2, lat2):
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
newlon = lon2 - lon1
newlat = lat2 - lat1
haver_formula = np.sin(newlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(newlon/2.0)**2
dist = 2 * np.arcsin(np.sqrt(haver_formula))
miles = 3958 * dist
return miles
My dataframe has 4 columns - lat, long, merch_lat, merch_long.
When I create a UDF like this, it throws me error. I don't know where I am going wrong.
udf_haversine = udf(haversine, FloatType())
data = data.withColumn("distance", udf_haversine("long", "lat", "merch_long","merch_lat"))
error is:
An error occurred while calling o1499.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure:
How to create a udf that takes multiple columns and returns a single value?
Upvotes: 0
Views: 3561
Reputation: 2946
It's possible that you are having problems with numpy.dtype
and serialization.
Since miles
is of type numpy.float64
try returning float(miles)
.
Full example that works:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import numpy as np
def haversine(lon1, lat1, lon2, lat2):
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
newlon = lon2 - lon1
newlat = lat2 - lat1
haver_formula = (
np.sin(newlat / 2.0) ** 2
+ np.cos(lat1) * np.cos(lat2) * np.sin(newlon / 2.0) ** 2
)
dist = 2 * np.arcsin(np.sqrt(haver_formula))
miles = 3958 * dist
return float(miles)
spark = SparkSession.builder.getOrCreate()
data = [
{
"long": 18.427238,
"lat": 19.510083,
"merch_long": 93.710735,
"merch_lat": 52.182011,
}
]
df = spark.createDataFrame(data)
udf_haversine = F.udf(haversine, DoubleType())
df = df.withColumn("distance", udf_haversine("long", "lat", "merch_long", "merch_lat"))
Upvotes: 0