Reputation: 155
Short version: How can I convert each entry in the nested array to something different (for example a struct) ?
How to convert this:
+---------+
|column_a |
+---------+
|[A, B, C]|
|[D, E] |
|[F] |
|[] |
+---------+
To this:
+---------+---------------------------+
|column_a |column_b |
+---------+---------------------------+
|[A, B, C]|[[A, aa], [B, bb], [C, cc]]|
|[D, E] |[[D, dd], [E, ee]] |
|[F] |[[F, ff]] |
|[] |[] |
+---------+---------------------------+
Longer version:
Let's assume there is existing DataFrame, with a column_A
containing Array of Strings:
val schema = Seq(
StructField("column_a", ArrayType(StringType), true)
)
val data = Seq(
Row(Array("A", "B", "C")),
Row(Array("D", "E")),
Row(Array("F")),
Row(Array())
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(schema)
)
df.show(false)
+---------+
|column_a |
+---------+
|[A, B, C]|
|[D, E] |
|[F] |
|[] |
+---------+
I would like to end up with following DataFrame:
val schema2 = List(
StructField("column_a", ArrayType(StringType)),
StructField("column_b", ArrayType(StructType(
Array(StructField("colA", StringType), StructField("colB", StringType))
)))
)
val data2 = Seq(
Row(Array("A", "B", "C"),
Array(Row("A", "aa"), Row("B", "bb"), Row("C", "cc"))),
Row(Array("D", "E"),
Array(Row("D", "dd"), Row("E", "ee"))),
Row(Array("F"),
Array(Row("F", "ff"))),
Row(Array(), Array())
)
val df2 = spark.createDataFrame(
spark.sparkContext.parallelize(data2),
StructType(schema2)
)
df2.show(false)
+---------+---------------------------+
|column_a |column_b |
+---------+---------------------------+
|[A, B, C]|[[A, aa], [B, bb], [C, cc]]|
|[D, E] |[[D, dd], [E, ee]] |
|[F] |[[F, ff]] |
|[] |[] |
+---------+---------------------------+
Values in column_b
should be generated using column_A
entries. For each entry in column_A
there is a new struct in column_B
.
What is the valid way in Spark 2.3.1 to achieve this?
(I would prefer to use DSL, although SQL is acceptable.)
Upvotes: 4
Views: 3050
Reputation: 21
Python Version
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
def my_func(arr):
return list(map(lambda s: s.lower()*2, arr))
transform = udf(my_func, ArrayType(StringType()))
df.withColumn("column_b", transform("column_a"))
Upvotes: 0
Reputation: 28322
You can use an UDF
to achieve this:
val transform = udf((arr: Seq[String]) => {
arr.map(v => (v, v.toLowerCase()*2))
})
df.withColumn("column_b", transform($"column_a")).show(false)
This will give:
+---------+------------------------+
|column_a |column_b |
+---------+------------------------+
|[A, B, C]|[[A,aa], [B,bb], [C,cc]]|
|[D, E] |[[D,dd], [E,ee]] |
|[F] |[[F,ff]] |
|[] |[] |
+---------+------------------------+
Upvotes: 4