ELI
ELI

Reputation: 369

Spark: efficient way to search another dataframe

I have one dataframe (df) with ip addresses and their corresponding long value (ip_int) and now I want to search in an another dataframe (ip2Country) which contains geolocation information to find their corresponding country name. How should I do it in Scala. My code currently didnt work out: Memory limit exceed.

  val ip_ints=df.select("ip_int").distinct.collect().flatMap(_.toSeq)
  val df_list = ListBuffer[DataFrame]()
  for(v <- ip_ints){
    var ip_int=v.toString.toLong
    df_list +=ip2Country.filter(($"network_start_integer"<=ip_int)&&($"network_last_integer">=ip_int)).select("country_name").withColumn("ip_int", lit(ip_int))
  }
  var df1 = df_list.reduce(_ union _)
  df=df.join(df1,Seq("ip_int"),"left")

Basically I try to iterate through every ip_int value and search them in ip2Country and merge them back with df.

Any help is much appreciated!

Upvotes: 0

Views: 164

Answers (3)

Robin
Robin

Reputation: 695

I feel puzzled here.

df1("network_start_integer")<=df("ip_int") && df1("network_last_integer")>=df("ip_int")

Can we use the df1("network_start_integer")===df("ip_int")

here please?

Upvotes: 0

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

A simple join should do the trick for you

df.join(df1, df1("network_start_integer")<=df("ip_int") && df1("network_last_integer")>=df("ip_int"), "left")
    .select("ip", "ip_int", "country_name")

If you want to remove the null country_name then you can add filter too

df.join(df1, df1("network_start_integer")<=df("ip_int") && df1("network_last_integer")>=df("ip_int"), "left")
    .select("ip", "ip_int", "country_name")
    .filter($"country_name".isNotNull)

I hope the answer is helpful

Upvotes: 1

TaylerJones
TaylerJones

Reputation: 242

You want to do a non-equi join, which you can implement by cross joining and then filtering, though it is resource heavy to do so. Assuming you are using Spark 2.1:

df.createOrReplaceTempView("ip_int")
df.select("network_start_integer", "network_start_integer", "country_name").createOrReplaceTempView("ip_int_lookup")
// val spark: SparkSession
val result: DataFrame = spark.sql("select a.*, b.country_name from ip_int a, ip_int_lookup b where b.network_start_integer <= a.ip_int and b.network_last_integer >= a.ip_int)

If you want to include null ip_int, you will need to right join df to result.

Upvotes: 0

Related Questions