Reputation: 905
I'm trying to filter data in dataframe. Dataframe df
has 2 columns - query
+ href
. In one row: query
is random string and href
is a list of strings. I have another list called urls
with strings.
Looking for finding an URL from list urls
inside the href
column lists + position of the url in href
list. I was trying df.filter(col("href")).isin(urls)
but pyspark complains about the list. + I'm not able to do .collect() bcs of the amount of the data.
Thanks in advance!
Basically it should look like this but I'm not really sure how to do it in pyspark:
for url in urls:
if url in "href item list":
print(query + url + "href item list".index(url)) # doesn't matter if index or position
else:
pass
Example:
urls = [url1, url2, url3, url4, url5, url6, url7, url8]
query | href
------------
q1 | [url7, url11, url12, url13, url14]
q2 | [url1, url3, url5, url6]
q3 | [url1, url2, url8]
Output should look like
q2 - url1 - 0
q3 - url1 - 0
q3 - url2 - 1
q2 - url3 - 1
q2 - url5 - 2
q2 - url6 - 3
q1 - url7 - 0
q3 - url8 - 2
Upvotes: 2
Views: 1303
Reputation: 13581
I suggest 1) making a single column DataFrame of your urls
using explode
and 2) use posexplode
to make 3-column DataFrame of your query, href, and index-position of href, then 3) inner join the two
urls
from pyspark.sql.functions import explode, posexplode
urls = [
(['url1', 'url2', 'url3', 'url4', 'url5', 'url6', 'url7', 'url8'],),
]
refs = (
spark.createDataFrame(urls, ['ref']).
select(
explode('ref')
)
)
refs.show(truncate=False)
# +----+
# |col |
# +----+
# |url1|
# |url2|
# |url3|
# |url4|
# |url5|
# |url6|
# |url7|
# |url8|
# +----+
data = [
("q1", ["url7", "url11", "url12", "url13", "url14"]),
("q2", ["url1", "url3", "url5", "url6"]),
("q3", ["url1", "url2", "url8"]),
]
df = spark.createDataFrame(data, ["query", "href"])
df.show(truncate=False)
# +-----+----------------------------------+
# |query|href |
# +-----+----------------------------------+
# |q1 |[url7, url11, url12, url13, url14]|
# |q2 |[url1, url3, url5, url6] |
# |q3 |[url1, url2, url8] |
# +-----+----------------------------------+
(
df.
select(
'query',
posexplode('href')
).
join(
refs,
'col',
'inner'
).
orderBy('col', 'query').
show(truncate=False)
)
# +----+-----+---+
# |col |query|pos|
# +----+-----+---+
# |url1|q2 |0 |
# |url1|q3 |0 |
# |url2|q3 |1 |
# |url3|q2 |1 |
# |url5|q2 |2 |
# |url6|q2 |3 |
# |url7|q1 |0 |
# |url8|q3 |2 |
# +----+-----+---+
Upvotes: 1
Reputation: 1724
The steps in words:
explode
the column href
filter
those rows with a known URLcollect
the results and look up each URL in urls
The code below is broken into small steps to make it easier to inspect the intermediate DataFrames.
Assuming you already have a SparkSession
object called ss
, we can recreate your original DataFrame like this:
df = ss.createDataFrame(
[
("q1", ["url7", "url11", "url12", "url13", "url14"]),
("q2", ["url1", "url3", "url5", "url6"]),
("q3", ["url1", "url2", "url8"]),
],
["query", "href"],
)
urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7", "url8"]
Now we apply the steps described before:
import pyspark.sql.functions as sf
# Exploding the column "href".
exp_df = df.select("query", sf.explode(sf.col("href")).alias("href_sing"))
# Checking if the URL in the DataFrame exists in "urls".
# I suggest to convert "urls" into a "set" before this step: "set(urls)". It might
# improve the performance of "isin", but this is just an optional optimization.
known_df = exp_df.select("*", sf.col("href_sing").isin(urls).alias("is_known"))
# Discard unknown URLs.
true_df = true_df = known_df.filter("is_known = True")
# The final results.
res = [
(r["query"], r["href_sing"], urls.index(r["href_sing"]))
for r in true_df.collect()
]
Inspecting some values:
In [18]: df.show()
+-----+--------------------+
|query| href|
+-----+--------------------+
| q1|[url7, url11, url...|
| q2|[url1, url3, url5...|
| q3| [url1, url2, url8]|
+-----+--------------------+
In [19]: exp_df.show()
+-----+---------+
|query|href_sing|
+-----+---------+
| q1| url7|
| q1| url11|
| q1| url12|
| q1| url13|
| q1| url14|
| q2| url1|
| q2| url3|
| q2| url5|
| q2| url6|
| q3| url1|
| q3| url2|
| q3| url8|
+-----+---------+
In [20]: true_df.show()
+-----+---------+--------+
|query|href_sing|is_known|
+-----+---------+--------+
| q1| url7| true|
| q2| url1| true|
| q2| url3| true|
| q2| url5| true|
| q2| url6| true|
| q3| url1| true|
| q3| url2| true|
| q3| url8| true|
+-----+---------+--------+
In [23]: res
Out[23]:
[('q1', 'url7', 6),
('q2', 'url1', 0),
('q2', 'url3', 2),
('q2', 'url5', 4),
('q2', 'url6', 5),
('q3', 'url1', 0),
('q3', 'url2', 1),
('q3', 'url8', 7)]
Upvotes: 1