Alessio Rossotti
Alessio Rossotti

Reputation: 314

Spark Udf function with Dataframe in input

I have to develop a Spark script with python that checks some logs and verifies if a user has changed the country of his IP between two events. I have a csv file with IP ranges and associated countries saved on HDFS like this:

startIp, endIp, country
0.0.0.0, 10.0.0.0, Italy
10.0.0.1, 20.0.0.0, England
20.0.0.1, 30.0.0.0, Germany

And a log csv file:

userId, timestamp, ip, event
1, 02-01-17 20:45:18, 10.5.10.3, login
24, 02-01-17 20:46:34, 54.23.16.56, login

I load both files with a Spark Dataframe, and I've already modified the one that contains the logs with a lag function adding a column with the previousIp. The solution I thought is to substitute the ip and previousIp with the associated country in order to compare them and using a dataFrame.filter("previousIp" != "ip"). My question is, there is a way to do that in Spark? Something like:

dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)

In order to have a Dataframe like this:

userId, timestamp, ip, event, previousIp
1, 02-01-17 20:45:18, England, login, Italy

If not, how I can solve my problem? Thank you

Upvotes: 1

Views: 949

Answers (1)

Mariusz
Mariusz

Reputation: 13926

It's actually quite easy if you convert IP address to number first. You can write your own UDF or use code from petrabarus and register function like this:

spark.sql("CREATE TEMPORARY FUNCTION iptolong as 'net.petrabarus.hiveudfs.IPToLong'")

Then map countries csv to dataframe with numbers:

>>> ipdb = spark.read.csv('ipdb.csv', header=True).select(
             expr('iptolong(startIp)').alias('ip_from'),
             expr('iptolong(endIp)').alias('ip_to'), 
             'country')
>>> ipdb.show()
+---------+---------+-------+
|  ip_from|    ip_to|country|
+---------+---------+-------+
|        0|167772160|  Italy|
|167772161|335544320|England|
|335544321|503316480|Germany|
+---------+---------+-------+

Also, map your log dataframe to numbers:

>>> log = spark.createDataFrame([('15.0.0.1',)], ['ip']) \
            .withColumn('ip', expr('iptolong(ip)'))
>>> log.show()
+---------+
|       ip|
+---------+
|251658241|
+---------+

Then you can join this dataframe using between condition:

>>> log.join(broadcast(ipdb), log.ip.between(ipdb.ip_from, ipdb.ip_to)).show()
+---------+---------+---------+-------+
|       ip|  ip_from|    ip_to|country|
+---------+---------+---------+-------+
|251658241|167772161|335544320|England|
+---------+---------+---------+-------+

Upvotes: 1

Related Questions