Reputation: 33
I have a pyspark.sql.dataframe.DataFrame
, where one of the columns has an array of Row
objects:
+------------------------------------------------------------------------------------------------+
|column |
+------------------------------------------------------------------------------------------------+
|[Row(arrival='2019-12-25 19:55', departure='2019-12-25 18:22'), |
| Row(arrival='2019-12-26 14:56', departure='2019-12-26 08:52')] |
+------------------------------------------------------------------------------------------------+
Not all the rows in the column have the same quantity of elements (in this case, we have 2 but we could have more).
What I am trying to do is to generate a concatenation of the hours of each date, to have something like this:
18:22_19:55_08:52_14:56
This means, the departure time of the first element, concatenated with the arrival time of the first element, concatenated again with the departure time of the second element and once again with the arrival time of the second element.
Is there a simple way to do so using pyspark
?
Upvotes: 0
Views: 388
Reputation: 13998
Assume the column name is col1
which is an array of structs:
df.printSchema()
root
|-- col1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- arrival: string (nullable = true)
| | |-- departure: string (nullable = true)
Method-1: for spark 2.4+, use array_join + transform
from pyspark.sql.functions import expr
df.withColumn('new_list', expr("""
array_join(
transform(col1, x -> concat(right(x.departure,5), '_', right(x.arrival,5)))
, '_'
)
""")
).show(truncate=False)
+----------------------------------------------------------------------------+-----------------------+
|col1 |new_list |
+----------------------------------------------------------------------------+-----------------------+
|[[2019-12-25 19:55, 2019-12-25 18:22], [2019-12-26 14:56, 2019-12-26 08:52]]|18:22_19:55_08:52_14:56|
+----------------------------------------------------------------------------+-----------------------+
Method-2: Use udf:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def arrays_join(arr):
return '_'.join('{}_{}'.format(x.departure[-5:], x.arrival[-5:]) for x in arr) if isinstance(arr, list) else arr
udf_array_join = udf(arrays_join, StringType())
df.select(udf_array_join('col1')).show(truncate=False)
Method-3: use posexplode + groupby + collect_list:
from pyspark.sql.functions import monotonically_increasing_id, posexplode, regexp_replace, expr
(df.withColumn('id', monotonically_increasing_id())
.select('*', posexplode('col1').alias('pos', 'col2'))
.select('id', 'pos', 'col2.*')
.selectExpr('id', "concat(pos, '+', right(departure,5), '_', right(arrival,5)) as dt")
.groupby('id')
.agg(expr("concat_ws('_', sort_array(collect_list(dt))) as new_list"))
.select(regexp_replace('new_list', r'(?:^|(?<=_))\d+\+', '').alias('new_list'))
.show(truncate=False))
Method-4: use string operations:
for this particular problem only, convert the array into string and then do a bunch of string operations (split + concat_ws + regexp_replace + trim) to get desired sub-strings:
from pyspark.sql.functions import regexp_replace, concat_ws, split, col
(df.select(
regexp_replace(
concat_ws('_', split(col('col1').astype('string'), r'[^0-9 :-]+'))
, r'[_ ]+\d\d\d\d-\d\d-\d\d '
, '_'
).alias('new_list')
).selectExpr('trim(both "_" from new_list) as new_list')
.show(truncate=False))
Upvotes: 2