Reputation: 314
I have to develop a Spark script with python that checks some logs and verifies if a user has changed the country of his IP between two events. I have a csv file with IP ranges and associated countries saved on HDFS like this:
startIp, endIp, country
0.0.0.0, 10.0.0.0, Italy
10.0.0.1, 20.0.0.0, England
20.0.0.1, 30.0.0.0, Germany
And a log csv file:
userId, timestamp, ip, event
1, 02-01-17 20:45:18, 10.5.10.3, login
24, 02-01-17 20:46:34, 54.23.16.56, login
I load both files with a Spark Dataframe, and I've already modified the one that contains the logs with a lag function adding a column with the previousIp. The solution I thought is to substitute the ip and previousIp with the associated country in order to compare them and using a dataFrame.filter("previousIp" != "ip"). My question is, there is a way to do that in Spark? Something like:
dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)
In order to have a Dataframe like this:
userId, timestamp, ip, event, previousIp
1, 02-01-17 20:45:18, England, login, Italy
If not, how I can solve my problem? Thank you
Upvotes: 1
Views: 949
Reputation: 13926
It's actually quite easy if you convert IP address to number first. You can write your own UDF or use code from petrabarus and register function like this:
spark.sql("CREATE TEMPORARY FUNCTION iptolong as 'net.petrabarus.hiveudfs.IPToLong'")
Then map countries csv to dataframe with numbers:
>>> ipdb = spark.read.csv('ipdb.csv', header=True).select(
expr('iptolong(startIp)').alias('ip_from'),
expr('iptolong(endIp)').alias('ip_to'),
'country')
>>> ipdb.show()
+---------+---------+-------+
| ip_from| ip_to|country|
+---------+---------+-------+
| 0|167772160| Italy|
|167772161|335544320|England|
|335544321|503316480|Germany|
+---------+---------+-------+
Also, map your log dataframe to numbers:
>>> log = spark.createDataFrame([('15.0.0.1',)], ['ip']) \
.withColumn('ip', expr('iptolong(ip)'))
>>> log.show()
+---------+
| ip|
+---------+
|251658241|
+---------+
Then you can join this dataframe using between
condition:
>>> log.join(broadcast(ipdb), log.ip.between(ipdb.ip_from, ipdb.ip_to)).show()
+---------+---------+---------+-------+
| ip| ip_from| ip_to|country|
+---------+---------+---------+-------+
|251658241|167772161|335544320|England|
+---------+---------+---------+-------+
Upvotes: 1