knagesh
knagesh

Reputation: 51

How to fit a kernel density estimate on a pyspark dataframe column and use it for creating a new column with the estimates

My use is the following. Consider I have a pyspark dataframe which has the following format: df.columns: 1. hh: Contains the hour of the day (type int) 2. userId : some unique identifier.

What I want to do is I want to figure out list of userIds which have anomalous hits onto the page. So I first do a groupby as so: df=df.groupby("hh","userId).count().alias("LoginCounts)

Now the format of the dataframe would be: 1. hh 2. userId 3.LoginCounts: Number of times a specific user logs in at a particular hour.

I want to use the pyspark kde function as follows:

from pyspark.mllib.stat import KernelDensity
kd=KernelDensity()
kd.setSample(df.select("LoginCounts").rdd)
kd.estimate([13.0,14.0]).

I get the error: Py4JJavaError: An error occurred while calling o647.estimateKernelDensity. : org.apache.spark.SparkException: Job aborted due to stage failure

Now my end goal is to fit a kde on say a day's hour based data and then use the next day's data to get the probability estimates for each login count. Eg: I would like to achieve something of this nature:

df.withColumn("kdeProbs",kde.estimate(col("LoginCounts)))

So the column kdeProbs will contain P(LoginCount=x | estimated kde).

I have tried searching for an example of the same but am always redirected to the standard kde example on the spark.apache.org page, which does not solve my case.

Upvotes: 2

Views: 3291

Answers (1)

user2739472
user2739472

Reputation: 1509

It's not enough to just select one column and convert it to an RDD; you need to also select the actual data in that column for it to work. Try this:

from pyspark.mllib.stat import KernelDensity

dat_rdd = df.select("LoginCounts").rdd

# actually select data from RDD
dat_rdd_data = dat_rdd.map(lambda x: x[0])

kd = KernelDensity()
kd.setSample(dat_rdd_data)
kd.estimate([13.0,14.0])

Upvotes: 0

Related Questions