ssobocik
ssobocik

Reputation: 155

How to map/convert each element in the ArrayType in Apache Spark 2.3.1

Short version: How can I convert each entry in the nested array to something different (for example a struct) ?

How to convert this:

+---------+
|column_a |
+---------+
|[A, B, C]|
|[D, E]   |
|[F]      |
|[]       |
+---------+

To this:

+---------+---------------------------+
|column_a |column_b                   |
+---------+---------------------------+
|[A, B, C]|[[A, aa], [B, bb], [C, cc]]|
|[D, E]   |[[D, dd], [E, ee]]         |
|[F]      |[[F, ff]]                  |
|[]       |[]                         |
+---------+---------------------------+

Longer version:

Let's assume there is existing DataFrame, with a column_A containing Array of Strings:

val schema = Seq(
   StructField("column_a", ArrayType(StringType), true)
)
val data = Seq(
   Row(Array("A", "B", "C")),
   Row(Array("D", "E")),
   Row(Array("F")),
   Row(Array())
)
val df = spark.createDataFrame(
   spark.sparkContext.parallelize(data),
   StructType(schema)
)
df.show(false)

+---------+
|column_a |
+---------+
|[A, B, C]|
|[D, E]   |
|[F]      |
|[]       |
+---------+

I would like to end up with following DataFrame:

val schema2 = List(
  StructField("column_a", ArrayType(StringType)),
  StructField("column_b", ArrayType(StructType(
  Array(StructField("colA", StringType), StructField("colB", StringType))
  )))
)

val data2 = Seq(
  Row(Array("A", "B", "C"),
    Array(Row("A", "aa"), Row("B", "bb"), Row("C", "cc"))),
  Row(Array("D", "E"),
    Array(Row("D", "dd"), Row("E", "ee"))),
  Row(Array("F"),
    Array(Row("F", "ff"))),
  Row(Array(), Array())
)

val df2 = spark.createDataFrame(
  spark.sparkContext.parallelize(data2),
  StructType(schema2)
)

df2.show(false)

+---------+---------------------------+
|column_a |column_b                   |
+---------+---------------------------+
|[A, B, C]|[[A, aa], [B, bb], [C, cc]]|
|[D, E]   |[[D, dd], [E, ee]]         |
|[F]      |[[F, ff]]                  |
|[]       |[]                         |
+---------+---------------------------+

Values in column_b should be generated using column_A entries. For each entry in column_A there is a new struct in column_B.

What is the valid way in Spark 2.3.1 to achieve this?

(I would prefer to use DSL, although SQL is acceptable.)

Upvotes: 4

Views: 3050

Answers (2)

Zhangrong.Huang
Zhangrong.Huang

Reputation: 21

Python Version

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def my_func(arr):
    return list(map(lambda s: s.lower()*2, arr))

transform = udf(my_func, ArrayType(StringType()))
df.withColumn("column_b", transform("column_a"))

Upvotes: 0

Shaido
Shaido

Reputation: 28322

You can use an UDF to achieve this:

val transform = udf((arr: Seq[String]) => {
  arr.map(v => (v, v.toLowerCase()*2))
})

df.withColumn("column_b", transform($"column_a")).show(false)

This will give:

+---------+------------------------+
|column_a |column_b                |
+---------+------------------------+
|[A, B, C]|[[A,aa], [B,bb], [C,cc]]|
|[D, E]   |[[D,dd], [E,ee]]        |
|[F]      |[[F,ff]]                |
|[]       |[]                      |
+---------+------------------------+

Upvotes: 4

Related Questions