philMarius
philMarius

Reputation: 694

Extracting URL parameters using Python and PySpark

Say I have a column filled with URLs like in the following:

+------------------------------------------+
|url                                       |
+------------------------------------------+
|https://www.example1.com?param1=1&param2=a|
|https://www.example2.com?param1=2&param2=b|
|https://www.example3.com?param1=3&param2=c|
+------------------------------------------+

What would be the best way of extracting the URL parameters from this column and adding them as columns to the dataframe to produce the below?

+-------------------------------------------+---------------+
|                                        url| param1| param2|
+-------------------------------------------+---------------+
|https://www.example1.com?param1=1&param2=a |      1|      a|
|https://www.example2.com?param1=2&param2=b |      2|      b|
|https://www.example3.com?param1=3&param2=c |      3|      c|
|etc...                                     | etc...| etc...|
+-------------------------------------------+---------------+

My Attempts

I can think of two possible methods of doing this, using functions.regexp_extract from the pyspark library or by using urllib.parse.parse_qs and urllib.parse.urlparse from the standard library. The former solution uses regex which is a finicky method of extracting parameters from strings but the latter would need to be wrapped in a UDF to be used.

from pyspark.sql import *
from pyspark.sql import functions as fn

df = spark.createDataFrame(
  [
    ("https://www.example.com?param1=1&param2=a",),
    ("https://www.example2.com?param1=2&param2=b",),
    ("https://www.example3.com?param1=3&param2=c",)
  ],
  ["url"]
)

Regex solution:

df2 = df.withColumn("param1", fn.regexp_extract('url', 'param1=(\d)', 1))
df2 = df2.withColumn("param2", fn.regexp_extract('url', 'param2=([a-z])', 1))
df2.show()

>> +------------------------------------------+------+------+
>> |url                                       |param1|param2|
>> +------------------------------------------+------+------+
>> |https://www.example1.com?param1=1&param2=a|1     |a     |
>> |https://www.example2.com?param1=2&param2=b|2     |b     |
>> |https://www.example3.com?param1=3&param2=c|3     |c     |
>> +------------------------------------------+------+------+

UDF solution:

from urllib.parse import urlparse, parse_qs
from pyspark.sql.types import MapType, StringType
extract_params = udf(lambda x: {k: v[0] for k, v in parse_qs(urlparse(x).query).items()}, MapType(StringType(), StringType()))

df3 = df.withColumn(
  "params", extract_params(df.url)
)

df3.withColumn(
  "param1", df3.params['param1']
).withColumn(
  "param2", df3.params['param2']
).drop("params").show()

>> +------------------------------------------+------+------+
>> |url                                       |param1|param2|
>> +------------------------------------------+------+------+
>> |https://www.example1.com?param1=1&param2=a|1     |a     |
>> |https://www.example2.com?param1=2&param2=b|2     |b     |
>> |https://www.example3.com?param1=3&param2=c|3     |c     |
>> +------------------------------------------+------+------+

I'd like to use the versatility of a library like urllib but would also like the optimisability of writing it in pyspark functions. Is there a better method than the two I've tried so far?

Upvotes: 4

Views: 9505

Answers (4)

blackbishop
blackbishop

Reputation: 32700

You can use parse_url within SQL expression expr.

Extract specific query parameter

parse_url can take a third parameter to specify the key (param) we want to extract from the URL:

df.selectExpr("*", "parse_url(url,'QUERY', 'param1')").show()

+------------------------------------------+------+
|url                                       |param1|
+------------------------------------------+------+
|https://www.example2.com?param1=2&param2=b|2     |
|https://www.example.com?param1=1&param2=a |1     |
|https://www.example3.com?param1=3&param2=c|3     |
+------------------------------------------+------+

Extract all query parameters to columns

If you want to extract all query parameters as new columns without having to specify their names, you can first, parse the URL then split and explode to get the parameters and their values and finally pivot to get each parameter as a column.

import pyspark.sql.functions as F

df.withColumn("parsed_url", F.explode(F.split(F.expr("parse_url(url, 'QUERY')"), "&"))) \
    .withColumn("parsed_url", F.split("parsed_url", "=")) \
    .select("url",
            F.col("parsed_url").getItem(0).alias("param_name"),
            F.col("parsed_url").getItem(1).alias("value")
            ) \
    .groupBy("url") \
    .pivot("param_name") \
    .agg(F.first("value")) \
    .show()

Gives:

+------------------------------------------+------+------+
|url                                       |param1|param2|
+------------------------------------------+------+------+
|https://www.example2.com?param1=2&param2=b|2     |b     |
|https://www.example.com?param1=1&param2=a |1     |a     |
|https://www.example3.com?param1=3&param2=c|3     |c     |
+------------------------------------------+------+------+

Another solution, as suggested by @jxc in the comments is to use str_to_map function:

df.selectExpr("*", "explode(str_to_map(split(url,'[?]')[1],'&','='))") \
    .groupBy('url') \
    .pivot('key') \
    .agg(F.first('value'))

Upvotes: 5

abiratsis
abiratsis

Reputation: 7336

Here is one more solution which works for Spark >= 2.4 since it uses high order function filter.

The next solution is based on assumption that all the records have identical number of query parameters:

from pyspark.sql.functions import expr, col

# get the query string for the first non null url
query = df.filter(df["url"].isNotNull()).first()["url"].split("?")[1]

# extract parameters (this should remain the same for all the records)
params = list(map(lambda p: p.split("=")[0], query.split("&")))

# you can also omit the two previous lines (query parameters autodiscovery)
# and replace them with: params = ['param1', 'param2']
# when you know beforehand the query parameters

cols = [col('url')] + [expr(f"split( \
                                    filter( \
                                          split(split(url,'\\\?')[1], '&'), \
                                          p -> p like '{qp}=%' \
                                    )[0], \
                            '=')[1]").alias(qp) 
                       for qp in params]

df.select(*cols).show(10, False)

# +------------------------------------------+------+------+
# |url                                       |param1|param2|
# +------------------------------------------+------+------+
# |https://www.example.com?param1=1&param2=a |1     |a     |
# |https://www.example2.com?param1=2&param2=b|2     |b     |
# |https://www.example3.com?param1=3&param2=c|3     |c     |
# +------------------------------------------+------+------+

Explanation:

  1. split(split(url,'\\\?')[1], '&') -> [param1=1,param2=a]: first split with ? to retrieve the query string then by &. As result we get the array [param1=1,param2=a]

  2. filter(... , p -> p like '{qp}=%')[0] -> param1=1, param2=a ...: apply filter function on the items of the array we got from the previous step and apply the filter p -> p like '{qp}=%' where {qp}=% the param name i.e param1=%. qp stands for the items of the params array. filter will return an array hence we just access the first item since we know that the particular param should always exists. For the first parameter this will return param1=1, for the second param2=a etc.

  3. split( ... , '=')[1] -> 1, a, ... : Finally split by = to retrieve the value of the query parameter. Here we return the second value since the first one will be the query parameter name.

The basic idea here is that we divide the problem into two sub-problems, first get all the possible query parameters and then we extract the values for all the urls.

Why is that? Well you could indeed use pivot as @blackbishop brilliantly already implemented although I believe that that wouldn't work when the cardinality of the query parameters is very high i.e 500 or more unique params. This would require a big shuffle which consequently could cause an OOM exception. On the other side if you already know that the cardinality of the data is low then the @blackbishop's solution should be considered the ideal one for all the cases.

In order to face the previous problem is better first to find all the query params (here I just made the assumption that all the queries have identical params but the implementation for this part should be similar to the previous one) and then apply the above expression for each param to extract the params values. This will generate a select expression that will contain multiple expr expressions although this shouldn't cause any performance issues since select is a narrow transformation and will not cause any shuffle.

Upvotes: -1

pkatta
pkatta

Reputation: 9

You can add split function like following.

from pyspark.sql import functions as f
df3 = df3.withColumn("param1", f.split(f.split(df3.url, "param1=")[1], "&")[0])

Upvotes: -1

Steven
Steven

Reputation: 15283

I'll go with an UDF and a more generic output format using map type.

from urllib.parse import urlparse, parse_qs

from pyspark.sql import functions as F, Types as T

@F.udf(T.MapType(T.StringType(), T.ArrayType(T.StringType())))
def url_param_pars(url):
    parsed = urlparse(url) 
    return parse_qs(parsed.query)

df_params = df.withColumn("params", url_param_pars(F.col('url')))

df_params.show(truncate=False)
+------------------------------------------+------------------------------+
|url                                       |params                        |
+------------------------------------------+------------------------------+
|https://www.example.com?param1=1&param2=a |[param1 -> [1], param2 -> [a]]|
|https://www.example2.com?param1=2&param2=b|[param1 -> [2], param2 -> [b]]|
|https://www.example3.com?param1=3&param2=c|[param1 -> [3], param2 -> [c]]|
+------------------------------------------+------------------------------+

df_params.printSchema()                                                                                                         
root
 |-- url: string (nullable = true)
 |-- params: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: string (containsNull = true)

With this method, you can have any number of params.

Upvotes: 1

Related Questions