Reputation: 64
i have a pyspark dataframe with a column urls which contains some urls, another pyspark dataframe which also contains urls and id but these urls are inlinks e.g. abc.com in 1st one and abc.com/contact in second one. I want to collect all the ids of inlinks related to a particular domain in a new column in first dataframe. I am currently doing this
url_list = df1.select('url').collect()
all_rows = df2.collect()
ids = list()
urls = list()
for row in all_rows:
ids.append(row.id)
urls.append(row.url)
dict_ids = dict([(i.website,"") for i in url_list])
for url,id in zip(urls, ids):
res = [ele.website for ele in url_list if(ele.website in url)]
if len(res)>0:
print(res)
dict_ids[res[0]]+=('\n\n\n'+id+'\n\n\n')
this is taking a lot of time, I wanted to use the spark processing so I also tried this
def add_id(url, id):
for i in url_list:
if i.website in url:
dict_ids[i.website]+=id
add_id_udf=udf(add_id,StringType())
test = df_crawled_2.withColumn("Test", add_id_udf(df2['url'],df2['id']))
display(test)
input:
df1::
url
http://example.com
http://example2.com/index.html
df2::
url,id
http://example.com/contact, 12
http://example2.com/index.html/pif, 45
http://example.com/about, 68
http://example2.com/index.html/juk/er, 96
expected output:
df1::
url,id
http://example.com, [12,68]
http://example2.com/index.html, [45,96]
or even a dictionary is fine with urls as keys and id as values.
But this dict_ids in the second case remained empty. Can somebody please help me out here?
Upvotes: 0
Views: 188
Reputation: 2468
I could get your example working but on two conditions. I am using crossJoin
on the two dataframes and using contains
.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, to_date,)
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext, SparkConf
config = SparkConf().setAll([('spark.sql.crossJoin.enabled', 'true')])
sc = SparkContext('local', conf=config)
sqlContext = SQLContext(sc)
data1 = [("http://example.com", -1),
("http://example2.com/index.html",-1)
]
df1Columns = ["url_first", "id"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.drop("id")
df1.show(truncate=False)
data2 = [
("http://example.com/contact", 12),
("http://example2.com/index.html/pif", 45),
("http://example.com/about", 68),
("http://example2.com/index.html/juk/er", 96)
]
df2Columns = ["url_second", "id"]
df2 = sqlContext.createDataFrame(data=data2, schema = df2Columns)
df2.show(truncate=False)
joined_df = df2.crossJoin(df1)
joined_df.show(truncate=False)
inter_result = joined_df.withColumn("myjoin", col("url_second").contains(col("url_first")))
inter_result.show(n=200, truncate=False)
final_result = inter_result.filter(col("myjoin") == True).groupBy("url_first").agg(collect_list(col("id")).alias("id_list"))
final_result.show(n=200, truncate=False)
Output is as follows.
+------------------------------+
|url_first |
+------------------------------+
|http://example.com |
|http://example2.com/index.html|
+------------------------------+
+-------------------------------------+---+
|url_second |id |
+-------------------------------------+---+
|http://example.com/contact |12 |
|http://example2.com/index.html/pif |45 |
|http://example.com/about |68 |
|http://example2.com/index.html/juk/er|96 |
+-------------------------------------+---+
+-------------------------------------+---+------------------------------+
|url_second |id |url_first |
+-------------------------------------+---+------------------------------+
|http://example.com/contact |12 |http://example.com |
|http://example.com/contact |12 |http://example2.com/index.html|
|http://example2.com/index.html/pif |45 |http://example.com |
|http://example2.com/index.html/pif |45 |http://example2.com/index.html|
|http://example.com/about |68 |http://example.com |
|http://example.com/about |68 |http://example2.com/index.html|
|http://example2.com/index.html/juk/er|96 |http://example.com |
|http://example2.com/index.html/juk/er|96 |http://example2.com/index.html|
+-------------------------------------+---+------------------------------+
+-------------------------------------+---+------------------------------+------+
|url_second |id |url_first |myjoin|
+-------------------------------------+---+------------------------------+------+
|http://example.com/contact |12 |http://example.com |true |
|http://example.com/contact |12 |http://example2.com/index.html|false |
|http://example2.com/index.html/pif |45 |http://example.com |false |
|http://example2.com/index.html/pif |45 |http://example2.com/index.html|true |
|http://example.com/about |68 |http://example.com |true |
|http://example.com/about |68 |http://example2.com/index.html|false |
|http://example2.com/index.html/juk/er|96 |http://example.com |false |
|http://example2.com/index.html/juk/er|96 |http://example2.com/index.html|true |
+-------------------------------------+---+------------------------------+------+
+------------------------------+--------+
|url_first |id_list |
+------------------------------+--------+
|http://example.com |[12, 68]|
|http://example2.com/index.html|[45, 96]|
+------------------------------+--------+
Upvotes: 1