Reputation: 694
Say I have a column filled with URLs like in the following:
+------------------------------------------+
|url |
+------------------------------------------+
|https://www.example1.com?param1=1¶m2=a|
|https://www.example2.com?param1=2¶m2=b|
|https://www.example3.com?param1=3¶m2=c|
+------------------------------------------+
What would be the best way of extracting the URL parameters from this column and adding them as columns to the dataframe to produce the below?
+-------------------------------------------+---------------+
| url| param1| param2|
+-------------------------------------------+---------------+
|https://www.example1.com?param1=1¶m2=a | 1| a|
|https://www.example2.com?param1=2¶m2=b | 2| b|
|https://www.example3.com?param1=3¶m2=c | 3| c|
|etc... | etc...| etc...|
+-------------------------------------------+---------------+
I can think of two possible methods of doing this, using functions.regexp_extract
from the pyspark library or by using urllib.parse.parse_qs
and urllib.parse.urlparse
from the standard library. The former solution uses regex which is a finicky method of extracting parameters from strings but the latter would need to be wrapped in a UDF to be used.
from pyspark.sql import *
from pyspark.sql import functions as fn
df = spark.createDataFrame(
[
("https://www.example.com?param1=1¶m2=a",),
("https://www.example2.com?param1=2¶m2=b",),
("https://www.example3.com?param1=3¶m2=c",)
],
["url"]
)
Regex solution:
df2 = df.withColumn("param1", fn.regexp_extract('url', 'param1=(\d)', 1))
df2 = df2.withColumn("param2", fn.regexp_extract('url', 'param2=([a-z])', 1))
df2.show()
>> +------------------------------------------+------+------+
>> |url |param1|param2|
>> +------------------------------------------+------+------+
>> |https://www.example1.com?param1=1¶m2=a|1 |a |
>> |https://www.example2.com?param1=2¶m2=b|2 |b |
>> |https://www.example3.com?param1=3¶m2=c|3 |c |
>> +------------------------------------------+------+------+
UDF solution:
from urllib.parse import urlparse, parse_qs
from pyspark.sql.types import MapType, StringType
extract_params = udf(lambda x: {k: v[0] for k, v in parse_qs(urlparse(x).query).items()}, MapType(StringType(), StringType()))
df3 = df.withColumn(
"params", extract_params(df.url)
)
df3.withColumn(
"param1", df3.params['param1']
).withColumn(
"param2", df3.params['param2']
).drop("params").show()
>> +------------------------------------------+------+------+
>> |url |param1|param2|
>> +------------------------------------------+------+------+
>> |https://www.example1.com?param1=1¶m2=a|1 |a |
>> |https://www.example2.com?param1=2¶m2=b|2 |b |
>> |https://www.example3.com?param1=3¶m2=c|3 |c |
>> +------------------------------------------+------+------+
I'd like to use the versatility of a library like urllib
but would also like the optimisability of writing it in pyspark functions. Is there a better method than the two I've tried so far?
Upvotes: 4
Views: 9505
Reputation: 32700
You can use parse_url
within SQL expression expr
.
parse_url
can take a third parameter to specify the key (param) we want to extract from the URL:
df.selectExpr("*", "parse_url(url,'QUERY', 'param1')").show()
+------------------------------------------+------+
|url |param1|
+------------------------------------------+------+
|https://www.example2.com?param1=2¶m2=b|2 |
|https://www.example.com?param1=1¶m2=a |1 |
|https://www.example3.com?param1=3¶m2=c|3 |
+------------------------------------------+------+
If you want to extract all query parameters as new columns without having to specify their names, you can first, parse the URL then split and explode to get the parameters and their values and finally pivot to get each parameter as a column.
import pyspark.sql.functions as F
df.withColumn("parsed_url", F.explode(F.split(F.expr("parse_url(url, 'QUERY')"), "&"))) \
.withColumn("parsed_url", F.split("parsed_url", "=")) \
.select("url",
F.col("parsed_url").getItem(0).alias("param_name"),
F.col("parsed_url").getItem(1).alias("value")
) \
.groupBy("url") \
.pivot("param_name") \
.agg(F.first("value")) \
.show()
Gives:
+------------------------------------------+------+------+
|url |param1|param2|
+------------------------------------------+------+------+
|https://www.example2.com?param1=2¶m2=b|2 |b |
|https://www.example.com?param1=1¶m2=a |1 |a |
|https://www.example3.com?param1=3¶m2=c|3 |c |
+------------------------------------------+------+------+
Another solution, as suggested by @jxc in the comments is to use str_to_map
function:
df.selectExpr("*", "explode(str_to_map(split(url,'[?]')[1],'&','='))") \
.groupBy('url') \
.pivot('key') \
.agg(F.first('value'))
Upvotes: 5
Reputation: 7336
Here is one more solution which works for Spark >= 2.4 since it uses high order function filter.
The next solution is based on assumption that all the records have identical number of query parameters:
from pyspark.sql.functions import expr, col
# get the query string for the first non null url
query = df.filter(df["url"].isNotNull()).first()["url"].split("?")[1]
# extract parameters (this should remain the same for all the records)
params = list(map(lambda p: p.split("=")[0], query.split("&")))
# you can also omit the two previous lines (query parameters autodiscovery)
# and replace them with: params = ['param1', 'param2']
# when you know beforehand the query parameters
cols = [col('url')] + [expr(f"split( \
filter( \
split(split(url,'\\\?')[1], '&'), \
p -> p like '{qp}=%' \
)[0], \
'=')[1]").alias(qp)
for qp in params]
df.select(*cols).show(10, False)
# +------------------------------------------+------+------+
# |url |param1|param2|
# +------------------------------------------+------+------+
# |https://www.example.com?param1=1¶m2=a |1 |a |
# |https://www.example2.com?param1=2¶m2=b|2 |b |
# |https://www.example3.com?param1=3¶m2=c|3 |c |
# +------------------------------------------+------+------+
Explanation:
split(split(url,'\\\?')[1], '&')
-> [param1=1,param2=a]
: first split with ? to retrieve the query string then by &. As result we get the array [param1=1,param2=a]
filter(... , p -> p like '{qp}=%')[0]
-> param1=1, param2=a ...
: apply filter function on the items of the array we got from the previous step and apply the filter p -> p like '{qp}=%'
where {qp}=%
the param name i.e param1=%
. qp
stands for the items of the params
array. filter will return an array hence we just access the first item since we know that the particular param
should always exists. For the first parameter this will return param1=1
, for the second param2=a
etc.
split( ... , '=')[1]
-> 1, a, ...
: Finally split by =
to retrieve the value of the query parameter. Here we return the second value since the first one will be the query parameter name.
The basic idea here is that we divide the problem into two sub-problems, first get all the possible query parameters and then we extract the values for all the urls.
Why is that? Well you could indeed use pivot
as @blackbishop brilliantly already implemented although I believe that that wouldn't work when the cardinality of the query parameters is very high i.e 500 or more unique params. This would require a big shuffle which consequently could cause an OOM exception. On the other side if you already know that the cardinality of the data is low then the @blackbishop's solution should be considered the ideal one for all the cases.
In order to face the previous problem is better first to find all the query params (here I just made the assumption that all the queries have identical params but the implementation for this part should be similar to the previous one) and then apply the above expression for each param
to extract the params values. This will generate a select
expression that will contain multiple expr
expressions although this shouldn't cause any performance issues since select
is a narrow transformation and will not cause any shuffle.
Upvotes: -1
Reputation: 9
You can add split function like following.
from pyspark.sql import functions as f
df3 = df3.withColumn("param1", f.split(f.split(df3.url, "param1=")[1], "&")[0])
Upvotes: -1
Reputation: 15283
I'll go with an UDF and a more generic output format using map type.
from urllib.parse import urlparse, parse_qs
from pyspark.sql import functions as F, Types as T
@F.udf(T.MapType(T.StringType(), T.ArrayType(T.StringType())))
def url_param_pars(url):
parsed = urlparse(url)
return parse_qs(parsed.query)
df_params = df.withColumn("params", url_param_pars(F.col('url')))
df_params.show(truncate=False)
+------------------------------------------+------------------------------+
|url |params |
+------------------------------------------+------------------------------+
|https://www.example.com?param1=1¶m2=a |[param1 -> [1], param2 -> [a]]|
|https://www.example2.com?param1=2¶m2=b|[param1 -> [2], param2 -> [b]]|
|https://www.example3.com?param1=3¶m2=c|[param1 -> [3], param2 -> [c]]|
+------------------------------------------+------------------------------+
df_params.printSchema()
root
|-- url: string (nullable = true)
|-- params: map (nullable = true)
| |-- key: string
| |-- value: array (valueContainsNull = true)
| | |-- element: string (containsNull = true)
With this method, you can have any number of params.
Upvotes: 1