sandbar
sandbar

Reputation: 84

Split row into multiple rows to limit length of array in column (spark / scala)

I have a dataframe that looks like this:

+--------------+--------------------+
|id            |           items    |
+--------------+--------------------+
|             1|[a, b, .... x, y, z]|
+--------------+--------------------+
|             1|[q, z, .... x, b, 5]|
+--------------+--------------------+
|             2|[q, z, .... x, b, 5]|
+--------------+--------------------+

I want to split the rows so that the array in the items column is at most length 20. If an array has length greater than 20, I would want to make new rows and split the array up so that each array is of length 20 or less. So for the first row in my example dataframe, if we assume the length is 10 and I want at most length 3 for each row, I would like for it to be split like this:

+--------------+--------------------+
|id            |           items    |
+--------------+--------------------+
|             1|[a, b, c]           |
+--------------+--------------------+
|             1|[z, y, z]           |
+--------------+--------------------+
|             1|[e, f, g]           |
+--------------+--------------------+
|             1|[q]                 |
+--------------+--------------------+

Ideally, all rows should be of length 3 except the last row if the length of the array is not evenly divisible by the max desired length. Note - the id column is not unique

Upvotes: 1

Views: 1453

Answers (3)

m_vemuri
m_vemuri

Reputation: 792

Since this requires a more complex transformation, I've used datasets. This might not be as performant, but it will get what you want.

Setup

Creating some sample data to mimic your data.

val arrayData = Seq(
  Row(1,List(1, 2, 3, 4, 5, 6, 7)),
  Row(2,List(1, 2, 3, 4)),
  Row(3,List(1, 2)),
  Row(4,List(1, 2, 3))
)

val arraySchema = new StructType().add("id",IntegerType).add("values", ArrayType(IntegerType))

val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
/*
 +---+---------------------+
 |id |values               |
 +---+---------------------+
 |1  |[1, 2, 3, 4, 5, 6, 7]|
 |2  |[1, 2, 3, 4]         |
 |3  |[1, 2]               |
 |4  |[1, 2, 3]            |
 +---+---------------------+
*/


Transformations

// encoder for custom type of transformation
implicit val encoder = ExpressionEncoder[(Int, Array[Array[Int]])]

// Here we are using a sliding window of size 3 and step 3. 
// This can be made into a generic function for a window of size k.
val df2 = df.map(r => {
  val id = r.getInt(0)
  val a = r.getSeq[Int](1).toArray
  val arrays = a.sliding(3, 3).toArray
  (id, arrays)
})
/*
 +---+---------------------------------------------------------------+
 |_1 |_2                                                             |
 +---+---------------------------------------------------------------+
 |1  |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
 |2  |[WrappedArray(1, 2, 3), WrappedArray(4)]                       |
 |3  |[WrappedArray(1, 2)]                                           |
 |4  |[WrappedArray(1, 2, 3)]                                        |
 +---+---------------------------------------------------------------+
*/

val df3 = df2
  .withColumnRenamed("_1", "id")
  .withColumnRenamed("_2", "values")
/*
 +---+---------------------------------------------------------------+
 |id |values                                                         |
 +---+---------------------------------------------------------------+
 |1  |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
 |2  |[WrappedArray(1, 2, 3), WrappedArray(4)]                       |
 |3  |[WrappedArray(1, 2)]                                           |
 |4  |[WrappedArray(1, 2, 3)]                                        |
 +---+---------------------------------------------------------------+
*/

Use explode

Expode will create a new element for each array entry in the second column.

val df4 = df3.withColumn("values", functions.explode($"values"))
/*
 +---+---------+
 |id |values   |
 +---+---------+
 |1  |[1, 2, 3]|
 |1  |[4, 5, 6]|
 |1  |[7]      |
 |2  |[1, 2, 3]|
 |2  |[4]      |
 |3  |[1, 2]   |
 |4  |[1, 2, 3]|
 +---+---------+
*/


Limitations

This approach is not without limitations.

Primarily, it will not be as performant on larger datasets since this code is no longer using dataframe built-in optimizations. However, the dataframe API might require the use of window functions, which can also have limited performance based on the size of the data. If it's possible to alter this data at the source, this would be recommended.

This approach also requires defining an encoder for something more complex. If the data schema changes, then different encoders will have to be used.

Upvotes: 1

blackbishop
blackbishop

Reputation: 32660

Using higher-order functions transform + filter along with slice, you can split the array into sub arrays of size 20 then explode it:

val l = 20

val df1 = df.withColumn(
  "items",
  explode(
    expr(
      s"filter(transform(items, (x,i)-> IF(i%$l=0, slice(items,i+1,$l), null)), x-> x is not null)"
    )
  )
)

Upvotes: 1

Daniel Seger
Daniel Seger

Reputation: 86

You could try this:

import pandas as pd

max_item_length = 3

df = pd.DataFrame(
    {"fake_index": [1, 2, 3],
     "items": [["a", "b", "c", "d", "e"], ["f", "g", "h", "i", "j"], ["k", "l"]]}
)

df2 = pd.DataFrame({"fake_index": [], "items": []})

for i in df.index:
    try:
        df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][:max_item_length]},
                         ignore_index=True)
        df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][max_item_length:]},
                         ignore_index=True)
    except:
        df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1]}, ignore_index=True)

df = df2

print(df)

Input:

   fake_index            items
0           1  [a, b, c, d, e]
1           2  [f, g, h, i, j]
2           3           [k, l]

Output:

   fake_index      items
0           1  [a, b, c]
1           1     [d, e]
2           2  [f, g, h]
3           2     [i, j]
4           3     [k, l]

Upvotes: 1

Related Questions