Reputation: 473
I have a spark dataframe like below. It has array of array struct in the zipped_feature column.
+--------------------+
|zipped_feature |
+--------------------+
|[[A, 1], [ABC, 33]] |
|[[A, 1], [ABS, 24]] |
|[[B, 2], [ABE, 17]] |
|[[C, 3], [ABC, 33]] |
+--------------------+
I tried to get an item(which is also an array) on this array of array struct using index . I tried below udf to get the value based on the index. If my index is 0 for first row then I should retrieve "[A, 1]" as array.
val getValueUdf = udf { (zippedFeature: Seq[Seq[String]], index: Int) => zippedFeature(index) }
But i am getting below error
data type mismatch: argument 1 requires array<array<string>> type, however, '`zipped_feature`' is of array<struct<_1:string,_2:string>> type.
When I printed the schema it shows like below
|-- zipped_feature: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: string (nullable = true)
| | |-- _2: string (nullable = true)
Could someone help to identify what I am doing wrong here. I want to get the value(again a array) based on the index.
Upvotes: 0
Views: 1101
Reputation: 42422
You can try using the dataset API map
method:
def getValue(zippedFeature: Seq[(String, String)], index: Int): Seq[String] = {
zippedFeature(index).productIterator.toList.toSeq.map(_.toString)
}
df.as[Seq[(String, String)]].map(x => (x, getValue(x, 0))).show
+-------------------+------+
| _1| _2|
+-------------------+------+
|[[A, 1], [ABC, 33]]|[A, 1]|
|[[A, 1], [ABS, 24]]|[A, 1]|
|[[B, 2], [ABE, 17]]|[B, 2]|
|[[C, 3], [ABC, 33]]|[C, 3]|
+-------------------+------+
Upvotes: 1
Reputation: 32710
From the error message, the column zipped_feature
is of type array of structs and not array of arrays. You don't need UDF to access array elements by index, you can use one of these options:
col("zipped_feature")(idx) // opt1
col("zipped_feature").getItem(idx) // opt2
element_at(col("zipped_feature"), idx) // opt3
To transform the array of structs into array of array you can use transform
function :
val df1 = df.withColumn(
"zipped_feature",
expr("transform(zipped_feature, x -> array(x._1, x._2))")
).select(
col("zipped_feature")(0).as("idx0"),
col("zipped_feature")(1).as("idx1")
)
df1.show
//+------+---------+
//| idx0| idx1|
//+------+---------+
//|[A, 1]|[ABC, 33]|
//|[A, 1]|[ABS, 24]|
//|[B, 2]|[ABE, 17]|
//|[C, 3]|[ABC, 33]|
//+------+---------+
df1.printSchema
//root
// |-- idx0: array (nullable = true)
// | |-- element: string (containsNull = true)
// |-- idx1: array (nullable = true)
// | |-- element: string (containsNull = true)
Or directly without transforming the array :
val df1 = df.select(
expr("array(zipped_feature[0]._1, zipped_feature[0]._2)").as("idx0"),
expr("array(zipped_feature[1]._1, zipped_feature[1]._2)").as("idx1")
)
Upvotes: 1
Reputation: 3008
According to me you don't need a user defined function for this use case. you can easily use withColumn
and select
statements to get the job done.
//Source data
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val df = Seq((Seq(Array("A","1"),Array("ABC","33"))),(Seq(Array("A","1"),Array("ABS","24")))).toDF("zipped_feature")
// 1) getting the value using select statements
val df1 = df.select($"zipped_feature"(0).as("ArrayZero"),$"zipped_feature"(1).as("ArrayOne"))
// 2) getting the values using withColumn
val df2 = df.withColumn("Array_Zero",$"zipped_feature"(0)).withColumn("Array_One",$"zipped_feature"(1))
// 3) Getting the value of the Inner array
val df3 = df1.select($"ArrayZero"(0).as("InnerArrayZero"))
// 4) Getting the value of the first element
val value = df1.select($"ArrayZero"(0)).first.getString(0)
Upvotes: 1
Reputation: 5487
zipped_feature is a column of type array < struct>. if you want to get each nested column value as an array, you need to modify the UDF as shown below.
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// constructing sample dataframe
val rows=
List(Row(Array(Row("A","1"),Row("ABC","33"))),
Row(Array(Row("A","1"),Row("ABS","24"))),
Row(Array(Row("B","2"),Row("ABE","17"))),
Row(Array(Row("C","3"),Row("ABC","33"))))
val rdd=spark.sparkContext.parallelize(rows)
val schema=new StructType().add("zipped_feature",ArrayType(new StructType().add("_1",StringType).add("_2",StringType)))
val df=spark.createDataFrame(rdd,schema)
df.show()
/*
+-------------------+
| zipped_feature|
+-------------------+
|[[A, 1], [ABC, 33]]|
|[[A, 1], [ABS, 24]]|
|[[B, 2], [ABE, 17]]|
|[[C, 3], [ABC, 33]]|
+-------------------+
*/
df.printSchema()
/*
root
|-- zipped_feature: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: string (nullable = true)
| | |-- _2: string (nullable = true)
*/
// udf
val getValueUdf = udf { (zippedFeature: Seq[Row],index:Int) =>zippedFeature(index).toSeq.map(_.toString)}
df.withColumn("first_column",getValueUdf('zipped_feature,lit(0)))
.withColumn("second_column",getValueUdf('zipped_feature,lit(1)))
.show(false)
/* output
+-------------------+------------+-------------+
|zipped_feature |first_column|second_column|
+-------------------+------------+-------------+
|[[A, 1], [ABC, 33]]|[A, 1] |[ABC, 33] |
|[[A, 1], [ABS, 24]]|[A, 1] |[ABS, 24] |
|[[B, 2], [ABE, 17]]|[B, 2] |[ABE, 17] |
|[[C, 3], [ABC, 33]]|[C, 3] |[ABC, 33] |
+-------------------+------------+-------------+
*/
Upvotes: 2