Leemosh
Leemosh

Reputation: 905

Pyspark filtering items in column of lists

I'm trying to filter data in dataframe. Dataframe df has 2 columns - query + href. In one row: query is random string and href is a list of strings. I have another list called urls with strings.

Looking for finding an URL from list urls inside the href column lists + position of the url in href list. I was trying df.filter(col("href")).isin(urls) but pyspark complains about the list. + I'm not able to do .collect() bcs of the amount of the data.

Thanks in advance!

Basically it should look like this but I'm not really sure how to do it in pyspark:

for url in urls:
    if url in "href item list":
        print(query + url + "href item list".index(url)) # doesn't matter if index or position
    else:
        pass

Example:

urls = [url1, url2, url3, url4, url5, url6, url7, url8]

query | href
------------
q1    | [url7, url11, url12, url13, url14]
q2    | [url1, url3, url5, url6]
q3    | [url1, url2, url8]

Output should look like 

q2 - url1 - 0
q3 - url1 - 0
q3 - url2 - 1
q2 - url3 - 1
q2 - url5 - 2
q2 - url6 - 3
q1 - url7 - 0
q3 - url8 - 2

Upvotes: 2

Views: 1303

Answers (2)

CPak
CPak

Reputation: 13581

I suggest 1) making a single column DataFrame of your urls using explode and 2) use posexplode to make 3-column DataFrame of your query, href, and index-position of href, then 3) inner join the two

  1. Create DataFrame of urls
from pyspark.sql.functions import explode, posexplode

urls = [
    (['url1', 'url2', 'url3', 'url4', 'url5', 'url6', 'url7', 'url8'],),
]
refs = (
    spark.createDataFrame(urls, ['ref']).
        select(
            explode('ref')
        )
)
refs.show(truncate=False)
# +----+
# |col |
# +----+
# |url1|
# |url2|
# |url3|
# |url4|
# |url5|
# |url6|
# |url7|
# |url8|
# +----+
  1. Create Example Data you provided
data = [
    ("q1", ["url7", "url11", "url12", "url13", "url14"]),
    ("q2", ["url1", "url3", "url5", "url6"]),
    ("q3", ["url1", "url2", "url8"]),
]
df = spark.createDataFrame(data, ["query", "href"])
df.show(truncate=False)
# +-----+----------------------------------+
# |query|href                              |
# +-----+----------------------------------+
# |q1   |[url7, url11, url12, url13, url14]|
# |q2   |[url1, url3, url5, url6]          |
# |q3   |[url1, url2, url8]                |
# +-----+----------------------------------+
  1. Solution
(
    df.
        select(
            'query',
            posexplode('href')
        ).
        join(
            refs,
            'col',
            'inner'
        ).
        orderBy('col', 'query').
        show(truncate=False)
)
# +----+-----+---+                                                                
# |col |query|pos|
# +----+-----+---+
# |url1|q2   |0  |
# |url1|q3   |0  |
# |url2|q3   |1  |
# |url3|q2   |1  |
# |url5|q2   |2  |
# |url6|q2   |3  |
# |url7|q1   |0  |
# |url8|q3   |2  |
# +----+-----+---+

Upvotes: 1

boechat107
boechat107

Reputation: 1724

The steps in words:

  1. explode the column href
  2. filter those rows with a known URL
  3. collect the results and look up each URL in urls

The code below is broken into small steps to make it easier to inspect the intermediate DataFrames.

Assuming you already have a SparkSession object called ss, we can recreate your original DataFrame like this:

df = ss.createDataFrame(
    [
        ("q1", ["url7", "url11", "url12", "url13", "url14"]),
        ("q2", ["url1", "url3", "url5", "url6"]),
        ("q3", ["url1", "url2", "url8"]),
    ],
    ["query", "href"],
)
urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7", "url8"]

Now we apply the steps described before:

import pyspark.sql.functions as sf

# Exploding the column "href".
exp_df = df.select("query", sf.explode(sf.col("href")).alias("href_sing"))
# Checking if the URL in the DataFrame exists in "urls".
# I suggest to convert "urls" into a "set" before this step: "set(urls)". It might 
# improve the performance of "isin", but this is just an optional optimization.
known_df = exp_df.select("*", sf.col("href_sing").isin(urls).alias("is_known"))
# Discard unknown URLs.
true_df = true_df = known_df.filter("is_known = True")
# The final results.
res = [
    (r["query"], r["href_sing"], urls.index(r["href_sing"]))
    for r in true_df.collect()
]

Inspecting some values:

In [18]: df.show()      
+-----+--------------------+
|query|                href|
+-----+--------------------+
|   q1|[url7, url11, url...|
|   q2|[url1, url3, url5...|
|   q3|  [url1, url2, url8]|
+-----+--------------------+

In [19]: exp_df.show()                                                                    
+-----+---------+
|query|href_sing|
+-----+---------+
|   q1|     url7|
|   q1|    url11|
|   q1|    url12|
|   q1|    url13|
|   q1|    url14|
|   q2|     url1|
|   q2|     url3|
|   q2|     url5|
|   q2|     url6|
|   q3|     url1|
|   q3|     url2|
|   q3|     url8|
+-----+---------+

In [20]: true_df.show()                                                                   
+-----+---------+--------+
|query|href_sing|is_known|
+-----+---------+--------+
|   q1|     url7|    true|
|   q2|     url1|    true|
|   q2|     url3|    true|
|   q2|     url5|    true|
|   q2|     url6|    true|
|   q3|     url1|    true|
|   q3|     url2|    true|
|   q3|     url8|    true|
+-----+---------+--------+

In [23]: res                                                                              
Out[23]: 
[('q1', 'url7', 6),
 ('q2', 'url1', 0),
 ('q2', 'url3', 2),
 ('q2', 'url5', 4),
 ('q2', 'url6', 5),
 ('q3', 'url1', 0),
 ('q3', 'url2', 1),
 ('q3', 'url8', 7)]

Upvotes: 1

Related Questions