Shivam Raj
Shivam Raj

Reputation: 64

Search strings from one pyspark dataframe in another pyspark dataframe

i have a pyspark dataframe with a column urls which contains some urls, another pyspark dataframe which also contains urls and id but these urls are inlinks e.g. abc.com in 1st one and abc.com/contact in second one. I want to collect all the ids of inlinks related to a particular domain in a new column in first dataframe. I am currently doing this

url_list = df1.select('url').collect()
all_rows = df2.collect()
ids = list()
urls = list()
for row in all_rows:
  ids.append(row.id)
  urls.append(row.url)

dict_ids = dict([(i.website,"") for i in url_list])

for url,id in zip(urls, ids):
  res = [ele.website for ele in url_list if(ele.website in url)]
  if len(res)>0:
    print(res)
    dict_ids[res[0]]+=('\n\n\n'+id+'\n\n\n')

this is taking a lot of time, I wanted to use the spark processing so I also tried this

def add_id(url, id):
  for i in url_list:
    if i.website in url:
      dict_ids[i.website]+=id

add_id_udf=udf(add_id,StringType())

test = df_crawled_2.withColumn("Test", add_id_udf(df2['url'],df2['id']))
display(test)
input:
df1::
url
http://example.com
http://example2.com/index.html


df2::
url,id
http://example.com/contact, 12
http://example2.com/index.html/pif, 45
http://example.com/about, 68
http://example2.com/index.html/juk/er, 96

expected output:
df1::
url,id
http://example.com, [12,68]
http://example2.com/index.html, [45,96]

or even a dictionary is fine with urls as keys and id as values.

But this dict_ids in the second case remained empty. Can somebody please help me out here?

Upvotes: 0

Views: 188

Answers (1)

user238607
user238607

Reputation: 2468

I could get your example working but on two conditions. I am using crossJoin on the two dataframes and using contains.

from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, to_date,)
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext, SparkConf


config = SparkConf().setAll([('spark.sql.crossJoin.enabled', 'true')])

sc = SparkContext('local', conf=config)
sqlContext = SQLContext(sc)



data1 = [("http://example.com", -1),
       ("http://example2.com/index.html",-1)
      ]

df1Columns = ["url_first", "id"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.drop("id")
df1.show(truncate=False)


data2 = [
    ("http://example.com/contact", 12),
    ("http://example2.com/index.html/pif", 45),
    ("http://example.com/about", 68),
    ("http://example2.com/index.html/juk/er", 96)
      ]

df2Columns = ["url_second", "id"]
df2 = sqlContext.createDataFrame(data=data2, schema = df2Columns)
df2.show(truncate=False)

joined_df = df2.crossJoin(df1)
joined_df.show(truncate=False)

inter_result = joined_df.withColumn("myjoin", col("url_second").contains(col("url_first")))
inter_result.show(n=200, truncate=False)

final_result = inter_result.filter(col("myjoin") == True).groupBy("url_first").agg(collect_list(col("id")).alias("id_list"))
final_result.show(n=200, truncate=False)

Output is as follows.

+------------------------------+
|url_first                     |
+------------------------------+
|http://example.com            |
|http://example2.com/index.html|
+------------------------------+

+-------------------------------------+---+
|url_second                           |id |
+-------------------------------------+---+
|http://example.com/contact           |12 |
|http://example2.com/index.html/pif   |45 |
|http://example.com/about             |68 |
|http://example2.com/index.html/juk/er|96 |
+-------------------------------------+---+

+-------------------------------------+---+------------------------------+
|url_second                           |id |url_first                     |
+-------------------------------------+---+------------------------------+
|http://example.com/contact           |12 |http://example.com            |
|http://example.com/contact           |12 |http://example2.com/index.html|
|http://example2.com/index.html/pif   |45 |http://example.com            |
|http://example2.com/index.html/pif   |45 |http://example2.com/index.html|
|http://example.com/about             |68 |http://example.com            |
|http://example.com/about             |68 |http://example2.com/index.html|
|http://example2.com/index.html/juk/er|96 |http://example.com            |
|http://example2.com/index.html/juk/er|96 |http://example2.com/index.html|
+-------------------------------------+---+------------------------------+

+-------------------------------------+---+------------------------------+------+
|url_second                           |id |url_first                     |myjoin|
+-------------------------------------+---+------------------------------+------+
|http://example.com/contact           |12 |http://example.com            |true  |
|http://example.com/contact           |12 |http://example2.com/index.html|false |
|http://example2.com/index.html/pif   |45 |http://example.com            |false |
|http://example2.com/index.html/pif   |45 |http://example2.com/index.html|true  |
|http://example.com/about             |68 |http://example.com            |true  |
|http://example.com/about             |68 |http://example2.com/index.html|false |
|http://example2.com/index.html/juk/er|96 |http://example.com            |false |
|http://example2.com/index.html/juk/er|96 |http://example2.com/index.html|true  |
+-------------------------------------+---+------------------------------+------+

+------------------------------+--------+
|url_first                     |id_list |
+------------------------------+--------+
|http://example.com            |[12, 68]|
|http://example2.com/index.html|[45, 96]|
+------------------------------+--------+

Upvotes: 1

Related Questions