eljiwo
eljiwo

Reputation: 846

Pyspark explode array column into sublist with sliding window

I have a rowin PySpark I would like to break into smaller rows given one of the values inside a column.

Given a df:

input_df = spark.createDataFrame([
    (2,[1,2,3,4,5],),
    ], ("id", "list"))

+---+------------+
| id|        list|
+---+------------+
|  2|[1, 2, 3, 4]|
+---+------------+

I would like to break each single row into multiple subsist with a fixed size sliding window. Resultant df would be like this:

output_df = spark.createDataFrame([
    (2, [0,0], 1), (2, [0,1], 2), (2, [1,2], 3), (2, [2,3], 4), (2, [3,4], 5),
    ], ("id", "past", "future"))

+---+------+------+
| id|  past|future|
+---+------+------+
|  2|[0, 0]|     1|
|  2|[0, 1]|     2|
|  2|[1, 2]|     3|
|  2|[2, 3]|     4|
|  2|[3, 4]|     5|
+---+------+------+

The logic on how to break the list to have a pointer looking at each element of the list, use the previous N elements (N=2 in this case) as past (fill with 0s if not enough elements) and use the current pointer element as future. Doing this on every element creates the dataframe.

I can not think on a way to do this with pyspark, I would do an iterative for loop per each row with a pandas dataframe. Is there a way to do this with pyspark?

Upvotes: 1

Views: 481

Answers (1)

Kafels
Kafels

Reputation: 4059

The best way to handle array columns is using higher-order functions.

import pyspark.sql.functions as f

output_df = (input_df
             .withColumn('list', f.expr('TRANSFORM(list, (element, i) -> STRUCT(ARRAY(COALESCE(list[i - 2], 0), COALESCE(list[i - 1], 0)) AS past, element AS future))'))
             .selectExpr('id', 'inline(list)'))

+---+------+------+
|id |past  |future|
+---+------+------+
|2  |[0, 0]|1     |
|2  |[0, 1]|2     |
|2  |[1, 2]|3     |
|2  |[2, 3]|4     |
|2  |[3, 4]|5     |
+---+------+------+

UPDATE

Passing N dynamically before creating the expression:

N = 2

expr = 'TRANSFORM(list, (element, i) -> STRUCT(TRANSFORM(sequence({N}, 1), k -> COALESCE(list[i - k], 0)) AS past, element AS future))'.format(N=N)
output_df = (input_df
             .withColumn('list', f.expr(expr))
             .selectExpr('id', 'inline(list)'))

Upvotes: 3

Related Questions