Steven
Steven

Reputation: 15258

Join RDD using python conditions

I have two RDD. First one contains information related IP address (see col c_ip):

[Row(unic_key=1608422, idx=18, s_date='2016-12-31', s_time='15:00:07', c_ip='119.228.181.78', c_session='3hyj0tb434o23uxegpnmvzr0', origine_file='inFile', process_date='2017-03-13'),
 Row(unic_key=1608423, idx=19, s_date='2016-12-31', s_time='15:00:08', c_ip='119.228.181.78', c_session='3hyj0tb434o23uxegpnmvzr0', origine_file='inFile', process_date='2017-03-13'),
]

And another RDD which is IP geolocation.

network,geoname_id,registered_country_geoname_id,represented_country_geoname_id,is_anonymous_proxy,is_satellite_provider,postal_code,latitude,longitude,accuracy_radius
1.0.0.0/24,2077456,2077456,,0,0,,-33.4940,143.2104,1000
1.0.1.0/24,1810821,1814991,,0,0,,26.0614,119.3061,50
1.0.2.0/23,1810821,1814991,,0,0,,26.0614,119.3061,50
1.0.4.0/22,2077456,2077456,,0,0,,-33.4940,143.2104,1000

I would like to match these two but the problem is that I dont have a strict equivalent between the column in both RDD.

I would like to use the Python3 Package ipaddress and do a check like this:

> import ipaddress
> ipaddress.IPv4Address('1.0.0.5') in ipaddress.ip_network('1.0.0.0/24')
True

Is it possible to use a python function to perform the join (left outer join to not exclude any lines from my first RDD)? How can I do that?

Upvotes: 1

Views: 436

Answers (1)

Alex
Alex

Reputation: 21766

When using Apache Spark 1.6, you can still use an UDF function as a predicate in a join. After generating some test data:

import ipaddress
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

sessions = sc.parallelize([(1608422,'119.228.181.78'),(1608423, '119.228.181.78')]).toDF(['unic_key','c_ip'])

geo_ip = sc.parallelize([('1.0.0.0/24',2077456,2077456),
                        ('1.0.1.0/24',1810821,1814991),
                        ('1.0.2.0/23',1810821,1814991),
                        ('1.0.4.0/22',2077456,2077456)]).toDF(['network','geoname_id','registered_country_geoname_id'])

You can create the UDF predicate as follows:

def ip_range(ip, network_range):
    return ipaddress.IPv4Address(unicode(ip)) in ipaddress.ip_network(unicode(network_range))

pred = udf(lambda ip, network_range:ipaddress.IPv4Address(unicode(ip)) in ipaddress.ip_network(unicode(network_range)), BooleanType())

And then you can use the UDF if the following join:

sessions.join(geo_ip).where(pred(sessions.c_ip, geo_ip.network))

Unfortunately this currently doesn't work in Spark 2.x, see https://issues.apache.org/jira/browse/SPARK-19728

Upvotes: 1

Related Questions