Reputation: 846
I have a rowin PySpark I would like to break into smaller rows given one of the values inside a column.
Given a df:
input_df = spark.createDataFrame([
(2,[1,2,3,4,5],),
], ("id", "list"))
+---+------------+
| id| list|
+---+------------+
| 2|[1, 2, 3, 4]|
+---+------------+
I would like to break each single row into multiple subsist with a fixed size sliding window. Resultant df would be like this:
output_df = spark.createDataFrame([
(2, [0,0], 1), (2, [0,1], 2), (2, [1,2], 3), (2, [2,3], 4), (2, [3,4], 5),
], ("id", "past", "future"))
+---+------+------+
| id| past|future|
+---+------+------+
| 2|[0, 0]| 1|
| 2|[0, 1]| 2|
| 2|[1, 2]| 3|
| 2|[2, 3]| 4|
| 2|[3, 4]| 5|
+---+------+------+
The logic on how to break the list to have a pointer looking at each element of the list, use the previous N elements (N=2 in this case) as past (fill with 0s if not enough elements) and use the current pointer element as future. Doing this on every element creates the dataframe.
I can not think on a way to do this with pyspark, I would do an iterative for loop per each row with a pandas dataframe. Is there a way to do this with pyspark?
Upvotes: 1
Views: 481
Reputation: 4059
The best way to handle array columns is using higher-order functions.
import pyspark.sql.functions as f
output_df = (input_df
.withColumn('list', f.expr('TRANSFORM(list, (element, i) -> STRUCT(ARRAY(COALESCE(list[i - 2], 0), COALESCE(list[i - 1], 0)) AS past, element AS future))'))
.selectExpr('id', 'inline(list)'))
+---+------+------+
|id |past |future|
+---+------+------+
|2 |[0, 0]|1 |
|2 |[0, 1]|2 |
|2 |[1, 2]|3 |
|2 |[2, 3]|4 |
|2 |[3, 4]|5 |
+---+------+------+
Passing N
dynamically before creating the expression:
N = 2
expr = 'TRANSFORM(list, (element, i) -> STRUCT(TRANSFORM(sequence({N}, 1), k -> COALESCE(list[i - k], 0)) AS past, element AS future))'.format(N=N)
output_df = (input_df
.withColumn('list', f.expr(expr))
.selectExpr('id', 'inline(list)'))
Upvotes: 3