user13598286
user13598286

Reputation: 11

select array of struct spark

The question is part of complex problem that I am working on.I am stuck at a particular point. To minimize the problem statement let's say I have a dataframe created from json. Let's say to minimize the structure

The raw data is let's say somewhat like

{"person":[{"name":"david", "email": "[email protected]"}, {"name":"steve", "email":"[email protected]"}]}

You can save this as person.json and create dataset as

Dataset<Row> df =  spark.read().json("person.json")

The schema / printSchema() has output-

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |


df.show(false);

+------------------------------------------------------------+
|       person                                               |
+------------------------------------------------------------+
|[[david, [email protected]],[steve, [email protected]]]         |
+------------------------------------------------------------+

Now the problem. As the part of code I have to do

df.select(array(struct(person.name, reverse(person.email)))

It's giving output like

+------------------------------------------------------------+
|       array(named_struct(person.name as `name`, person.e...|
+------------------------------------------------------------+
|[[[david, steve],[[email protected], [email protected]]]]       |
+------------------------------------------------------------+

The schema get's updated to -

root
 |-- array(named_struct(name, person.name as `name`, email, person.email as `email`)): array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |--  name: array(nullable=true)
 |    |    |-- element: string (containsNull = true)
 |    |--  email: array(nullable=true)
 |    |    |-- element: string (containsNull = true)

I do not want the schema and data to be changed. What should I change in above df.select

I am using Spark 2.3.0_2.11

On suggestion of user Someshwar Tried using transform on it but its not available in lower version

df = df.withColumn("person_processed", expr("transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))"));

Below is stack trace for same -

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '>' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'DIRECTORY', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'BOTH', 'LEADING', 'TRAILING', 'IF', 'POSITION', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 21)

== SQL ==
transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))
---------------------^^^

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:239)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:115)
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseExpression(ParseDriver.scala:44)
    at org.apache.spark.sql.functions$.expr(functions.scala:1308)
    at org.apache.spark.sql.functions.expr(functions.scala)
    at com.mywork.jspark.JSparkMain1.main(JSparkMain1.java:43)

Upvotes: 1

Views: 7497

Answers (2)

Som
Som

Reputation: 6338

I tried to approach this problem as following-

  1. Load the data
  val spark = sqlContext.sparkSession
    val implicits = spark.implicits
    import implicits._
    val data =
      """
        |{"person":[{"name":"david", "email": "[email protected]"}, {"name":"steve", "email": "[email protected]"}]}
      """.stripMargin
    val df = spark.read
      .json(data.split(System.lineSeparator()).toSeq.toDS())
    df.show(false)
    df.printSchema()

Result-

+----------------------------------------------------+
|person                                              |
+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|
+----------------------------------------------------+

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
  1. Process the array<struct>

    This is tested for spark-2.4

 val answer1 = df.withColumn("person_processed",
      expr("transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))"))
    answer1.show(false)
    answer1.printSchema()

Result-

+----------------------------------------------------+----------------------------------------------------+
|person                                              |person_processed                                    |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[moc.liamg@divad, david], [moc.liamg@evets, steve]]|
+----------------------------------------------------+----------------------------------------------------+

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- person_processed: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)

Please observe both input "person" and "person_processed" column are of same type

Edit-1 (As per comments, with case class)

User is on spark 2.3 where all higher order functions for map and array are no available, Below solution is for spark 2.3

 // spark < 2.3
    case class InfoData(name: String, email: String)
    val infoDataSchema =
    ArrayType(StructType(Array(StructField("name", StringType), StructField("email", StringType))))

    val reverseEmailUDF = udf((arr1: mutable.WrappedArray[String], arr2: mutable.WrappedArray[String]) => {
      if (arr1.length != arr2.length) null
      else arr1.zipWithIndex.map(t => InfoData(t._1, arr2(t._2).reverse))
    }, infoDataSchema)

    val spark2_3Processed = df
      .withColumn("person_processed",
          reverseEmailUDF(
            col("person.name").cast("array<string>"),
            col("person.email").cast("array<string>")
          )
      )

    spark2_3Processed.show(false)
    spark2_3Processed.printSchema()

Output-

+----------------------------------------------------+----------------------------------------------------+
|person                                              |person_processed                                    |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[david, moc.liamg@divad], [steve, moc.liamg@evets]]|
+----------------------------------------------------+----------------------------------------------------+

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- person_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- email: string (nullable = true)

Edit-2 (As per comments, without case class)

User is on spark 2.3 where all higher order functions for map and array are no available and case class creation is difficult. Below solution is for spark 2.3

   val subSchema = df.schema("person").dataType

    val reverseEmailUDF_withoutCaseClass = //udf((nameArrayRow: Row, emailArrayRow: Row) => {
      udf((nameArray: mutable.WrappedArray[String], emailArray: mutable.WrappedArray[String]) => {
      if (nameArray.length != emailArray.length) null
      else nameArray.zipWithIndex.map(t => (t._1, emailArray(t._2).reverse))
    }, subSchema)

    val withoutCaseClasDF = df
      .withColumn("person_processed",
          reverseEmailUDF_withoutCaseClass(
            col("person.name").cast("array<string>"),
            col("person.email").cast("array<string>")
          )
      )

    withoutCaseClasDF.show(false)
    withoutCaseClasDF.printSchema()
    withoutCaseClasDF.select("person_processed.email").show(false)

Output-

+----------------------------------------------------+----------------------------------------------------+
|person                                              |person_processed                                    |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[david, moc.liamg@divad], [steve, moc.liamg@evets]]|
+----------------------------------------------------+----------------------------------------------------+

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- person_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)

+--------------+
|email         |
+--------------+
|[david, steve]|
+--------------+


Upvotes: 2

s.polam
s.polam

Reputation: 10382

Try below code.

scala> df.show(false)
+----------------------------------------------------+
|person                                              |
+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|
+----------------------------------------------------+

scala> df.printSchema
root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)


scala> val finalDF = df
.select(explode($"person").as("person"))
.groupBy(lit(1).as("id"))
.agg(
    collect_list(
        struct(
            reverse($"person.email").as("email"),
            $"person.name").as("person")
        ).as("person")
    )
.drop("id")

finalDF: org.apache.spark.sql.DataFrame = [person: array<struct<email:string,name:string>>]

scala> finalDF.show(false)
+----------------------------------------------------+
|person                                              |
+----------------------------------------------------+
|[[moc.liamg@divad, david], [moc.liamg@evets, steve]]|
+----------------------------------------------------+

scala> finalDF.printSchema
root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)

scala>

Upvotes: 0

Related Questions