Mohan
Mohan

Reputation: 473

Issue with inferring the datatype of complex struct field spark

I have a spark dataframe like below. It has array of array struct in the zipped_feature column.

+--------------------+
|zipped_feature      |
+--------------------+
|[[A, 1], [ABC, 33]] |
|[[A, 1], [ABS, 24]] |
|[[B, 2], [ABE, 17]] |
|[[C, 3], [ABC, 33]] |
+--------------------+

I tried to get an item(which is also an array) on this array of array struct using index . I tried below udf to get the value based on the index. If my index is 0 for first row then I should retrieve "[A, 1]" as array.

val getValueUdf = udf { (zippedFeature: Seq[Seq[String]], index: Int) => zippedFeature(index) }

But i am getting below error

data type mismatch: argument 1 requires array<array<string>> type, however, '`zipped_feature`' is of array<struct<_1:string,_2:string>> type.

When I printed the schema it shows like below

|-- zipped_feature: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

Could someone help to identify what I am doing wrong here. I want to get the value(again a array) based on the index.

Upvotes: 0

Views: 1101

Answers (4)

mck
mck

Reputation: 42422

You can try using the dataset API map method:

def getValue(zippedFeature: Seq[(String, String)], index: Int): Seq[String] = {
    zippedFeature(index).productIterator.toList.toSeq.map(_.toString)
}

df.as[Seq[(String, String)]].map(x => (x, getValue(x, 0))).show
+-------------------+------+
|                 _1|    _2|
+-------------------+------+
|[[A, 1], [ABC, 33]]|[A, 1]|
|[[A, 1], [ABS, 24]]|[A, 1]|
|[[B, 2], [ABE, 17]]|[B, 2]|
|[[C, 3], [ABC, 33]]|[C, 3]|
+-------------------+------+

Upvotes: 1

blackbishop
blackbishop

Reputation: 32710

From the error message, the column zipped_feature is of type array of structs and not array of arrays. You don't need UDF to access array elements by index, you can use one of these options:

col("zipped_feature")(idx) // opt1
col("zipped_feature").getItem(idx)  // opt2
element_at(col("zipped_feature"), idx) // opt3

To transform the array of structs into array of array you can use transform function :

val df1 = df.withColumn(
    "zipped_feature",
    expr("transform(zipped_feature, x -> array(x._1, x._2))")
  ).select(
    col("zipped_feature")(0).as("idx0"),
    col("zipped_feature")(1).as("idx1")
  )

df1.show
//+------+---------+
//|  idx0|     idx1|
//+------+---------+
//|[A, 1]|[ABC, 33]|
//|[A, 1]|[ABS, 24]|
//|[B, 2]|[ABE, 17]|
//|[C, 3]|[ABC, 33]|
//+------+---------+

df1.printSchema
//root
// |-- idx0: array (nullable = true)
// |    |-- element: string (containsNull = true)
// |-- idx1: array (nullable = true)
// |    |-- element: string (containsNull = true)

Or directly without transforming the array :

val df1 = df.select(
  expr("array(zipped_feature[0]._1, zipped_feature[0]._2)").as("idx0"),
  expr("array(zipped_feature[1]._1, zipped_feature[1]._2)").as("idx1")
)

Upvotes: 1

Nikunj Kakadiya
Nikunj Kakadiya

Reputation: 3008

According to me you don't need a user defined function for this use case. you can easily use withColumn and select statements to get the job done.

//Source data
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val df = Seq((Seq(Array("A","1"),Array("ABC","33"))),(Seq(Array("A","1"),Array("ABS","24")))).toDF("zipped_feature")
// 1) getting the value using select statements
val df1 = df.select($"zipped_feature"(0).as("ArrayZero"),$"zipped_feature"(1).as("ArrayOne"))
// 2) getting the values using withColumn
val df2 = df.withColumn("Array_Zero",$"zipped_feature"(0)).withColumn("Array_One",$"zipped_feature"(1))
// 3) Getting the value of the Inner array
val df3 = df1.select($"ArrayZero"(0).as("InnerArrayZero"))
// 4) Getting the value of the first element
val value = df1.select($"ArrayZero"(0)).first.getString(0)

Output 1: enter image description here

Output 2: enter image description here

Output 3: enter image description here

Output 4: enter image description here

Upvotes: 1

Mohana B C
Mohana B C

Reputation: 5487

zipped_feature is a column of type array < struct>. if you want to get each nested column value as an array, you need to modify the UDF as shown below.

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// constructing sample dataframe
val rows=
    List(Row(Array(Row("A","1"),Row("ABC","33"))),
    Row(Array(Row("A","1"),Row("ABS","24"))),
    Row(Array(Row("B","2"),Row("ABE","17"))),
    Row(Array(Row("C","3"),Row("ABC","33"))))
val rdd=spark.sparkContext.parallelize(rows)

val schema=new StructType().add("zipped_feature",ArrayType(new StructType().add("_1",StringType).add("_2",StringType)))
val df=spark.createDataFrame(rdd,schema)
df.show()
/*
+-------------------+
|     zipped_feature|
+-------------------+
|[[A, 1], [ABC, 33]]|
|[[A, 1], [ABS, 24]]|
|[[B, 2], [ABE, 17]]|
|[[C, 3], [ABC, 33]]|
+-------------------+
*/
df.printSchema()
/*
root
|-- zipped_feature: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- _1: string (nullable = true)
|    |    |-- _2: string (nullable = true)
*/

// udf
 val getValueUdf = udf { (zippedFeature: Seq[Row],index:Int) =>zippedFeature(index).toSeq.map(_.toString)}

 df.withColumn("first_column",getValueUdf('zipped_feature,lit(0)))
  .withColumn("second_column",getValueUdf('zipped_feature,lit(1)))
  .show(false)
 
 /* output
 +-------------------+------------+-------------+
 |zipped_feature     |first_column|second_column|
 +-------------------+------------+-------------+
 |[[A, 1], [ABC, 33]]|[A, 1]      |[ABC, 33]    |
 |[[A, 1], [ABS, 24]]|[A, 1]      |[ABS, 24]    |
 |[[B, 2], [ABE, 17]]|[B, 2]      |[ABE, 17]    |
 |[[C, 3], [ABC, 33]]|[C, 3]      |[ABC, 33]    |
 +-------------------+------------+-------------+


*/

Upvotes: 2

Related Questions