Reputation: 84
I have a dataframe that looks like this:
+--------------+--------------------+
|id | items |
+--------------+--------------------+
| 1|[a, b, .... x, y, z]|
+--------------+--------------------+
| 1|[q, z, .... x, b, 5]|
+--------------+--------------------+
| 2|[q, z, .... x, b, 5]|
+--------------+--------------------+
I want to split the rows so that the array in the items
column is at most length 20. If an array has length greater than 20, I would want to make new rows and split the array up so that each array is of length 20 or less. So for the first row in my example dataframe, if we assume the length is 10 and I want at most length 3 for each row, I would like for it to be split like this:
+--------------+--------------------+
|id | items |
+--------------+--------------------+
| 1|[a, b, c] |
+--------------+--------------------+
| 1|[z, y, z] |
+--------------+--------------------+
| 1|[e, f, g] |
+--------------+--------------------+
| 1|[q] |
+--------------+--------------------+
Ideally, all rows should be of length 3 except the last row if the length of the array is not evenly divisible by the max desired length. Note - the id
column is not unique
Upvotes: 1
Views: 1453
Reputation: 792
Since this requires a more complex transformation, I've used datasets. This might not be as performant, but it will get what you want.
Creating some sample data to mimic your data.
val arrayData = Seq(
Row(1,List(1, 2, 3, 4, 5, 6, 7)),
Row(2,List(1, 2, 3, 4)),
Row(3,List(1, 2)),
Row(4,List(1, 2, 3))
)
val arraySchema = new StructType().add("id",IntegerType).add("values", ArrayType(IntegerType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
/*
+---+---------------------+
|id |values |
+---+---------------------+
|1 |[1, 2, 3, 4, 5, 6, 7]|
|2 |[1, 2, 3, 4] |
|3 |[1, 2] |
|4 |[1, 2, 3] |
+---+---------------------+
*/
// encoder for custom type of transformation
implicit val encoder = ExpressionEncoder[(Int, Array[Array[Int]])]
// Here we are using a sliding window of size 3 and step 3.
// This can be made into a generic function for a window of size k.
val df2 = df.map(r => {
val id = r.getInt(0)
val a = r.getSeq[Int](1).toArray
val arrays = a.sliding(3, 3).toArray
(id, arrays)
})
/*
+---+---------------------------------------------------------------+
|_1 |_2 |
+---+---------------------------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
|2 |[WrappedArray(1, 2, 3), WrappedArray(4)] |
|3 |[WrappedArray(1, 2)] |
|4 |[WrappedArray(1, 2, 3)] |
+---+---------------------------------------------------------------+
*/
val df3 = df2
.withColumnRenamed("_1", "id")
.withColumnRenamed("_2", "values")
/*
+---+---------------------------------------------------------------+
|id |values |
+---+---------------------------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6), WrappedArray(7)]|
|2 |[WrappedArray(1, 2, 3), WrappedArray(4)] |
|3 |[WrappedArray(1, 2)] |
|4 |[WrappedArray(1, 2, 3)] |
+---+---------------------------------------------------------------+
*/
Expode will create a new element for each array entry in the second column.
val df4 = df3.withColumn("values", functions.explode($"values"))
/*
+---+---------+
|id |values |
+---+---------+
|1 |[1, 2, 3]|
|1 |[4, 5, 6]|
|1 |[7] |
|2 |[1, 2, 3]|
|2 |[4] |
|3 |[1, 2] |
|4 |[1, 2, 3]|
+---+---------+
*/
This approach is not without limitations.
Primarily, it will not be as performant on larger datasets since this code is no longer using dataframe built-in optimizations. However, the dataframe API might require the use of window functions, which can also have limited performance based on the size of the data. If it's possible to alter this data at the source, this would be recommended.
This approach also requires defining an encoder for something more complex. If the data schema changes, then different encoders will have to be used.
Upvotes: 1
Reputation: 32660
Using higher-order functions transform
+ filter
along with slice
, you can split the array into sub arrays of size 20 then explode it:
val l = 20
val df1 = df.withColumn(
"items",
explode(
expr(
s"filter(transform(items, (x,i)-> IF(i%$l=0, slice(items,i+1,$l), null)), x-> x is not null)"
)
)
)
Upvotes: 1
Reputation: 86
You could try this:
import pandas as pd
max_item_length = 3
df = pd.DataFrame(
{"fake_index": [1, 2, 3],
"items": [["a", "b", "c", "d", "e"], ["f", "g", "h", "i", "j"], ["k", "l"]]}
)
df2 = pd.DataFrame({"fake_index": [], "items": []})
for i in df.index:
try:
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][:max_item_length]},
ignore_index=True)
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1][max_item_length:]},
ignore_index=True)
except:
df2 = df2.append({"fake_index": int(df.iloc[i, 0]), "items": df.iloc[i, 1]}, ignore_index=True)
df = df2
print(df)
Input:
fake_index items
0 1 [a, b, c, d, e]
1 2 [f, g, h, i, j]
2 3 [k, l]
Output:
fake_index items
0 1 [a, b, c]
1 1 [d, e]
2 2 [f, g, h]
3 2 [i, j]
4 3 [k, l]
Upvotes: 1