Reputation: 11
The question is part of complex problem that I am working on.I am stuck at a particular point. To minimize the problem statement let's say I have a dataframe created from json. Let's say to minimize the structure
The raw data is let's say somewhat like
{"person":[{"name":"david", "email": "[email protected]"}, {"name":"steve", "email":"[email protected]"}]}
You can save this as person.json and create dataset as
Dataset<Row> df = spark.read().json("person.json")
The schema / printSchema()
has output-
root
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- email: string (nullable = true)
| |
df.show(false);
+------------------------------------------------------------+
| person |
+------------------------------------------------------------+
|[[david, [email protected]],[steve, [email protected]]] |
+------------------------------------------------------------+
Now the problem. As the part of code I have to do
df.select(array(struct(person.name, reverse(person.email)))
It's giving output like
+------------------------------------------------------------+
| array(named_struct(person.name as `name`, person.e...|
+------------------------------------------------------------+
|[[[david, steve],[[email protected], [email protected]]]] |
+------------------------------------------------------------+
The schema get's updated to -
root
|-- array(named_struct(name, person.name as `name`, email, person.email as `email`)): array (nullable = true)
| |-- element: struct (containsNull = true)
| |-- name: array(nullable=true)
| | |-- element: string (containsNull = true)
| |-- email: array(nullable=true)
| | |-- element: string (containsNull = true)
I do not want the schema and data to be changed. What should I change in above df.select
I am using Spark 2.3.0_2.11
On suggestion of user Someshwar Tried using transform on it but its not available in lower version
df = df.withColumn("person_processed", expr("transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))"));
Below is stack trace for same -
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '>' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'DIRECTORY', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'BOTH', 'LEADING', 'TRAILING', 'IF', 'POSITION', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 21)
== SQL ==
transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))
---------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:239)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:115)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseExpression(ParseDriver.scala:44)
at org.apache.spark.sql.functions$.expr(functions.scala:1308)
at org.apache.spark.sql.functions.expr(functions.scala)
at com.mywork.jspark.JSparkMain1.main(JSparkMain1.java:43)
Upvotes: 1
Views: 7497
Reputation: 6338
I tried to approach this problem as following-
Load the data
val spark = sqlContext.sparkSession
val implicits = spark.implicits
import implicits._
val data =
"""
|{"person":[{"name":"david", "email": "[email protected]"}, {"name":"steve", "email": "[email protected]"}]}
""".stripMargin
val df = spark.read
.json(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
Result-
+----------------------------------------------------+
|person |
+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|
+----------------------------------------------------+
root
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- email: string (nullable = true)
| | |-- name: string (nullable = true)
Process the array<struct>
This is tested for spark-2.4
val answer1 = df.withColumn("person_processed",
expr("transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))"))
answer1.show(false)
answer1.printSchema()
Result-
+----------------------------------------------------+----------------------------------------------------+
|person |person_processed |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[moc.liamg@divad, david], [moc.liamg@evets, steve]]|
+----------------------------------------------------+----------------------------------------------------+
root
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- email: string (nullable = true)
| | |-- name: string (nullable = true)
|-- person_processed: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- email: string (nullable = true)
| | |-- name: string (nullable = true)
Please observe both input "person" and "person_processed" column are of same type
User is on spark 2.3 where all higher order functions for map and array are no available, Below solution is for spark 2.3
// spark < 2.3
case class InfoData(name: String, email: String)
val infoDataSchema =
ArrayType(StructType(Array(StructField("name", StringType), StructField("email", StringType))))
val reverseEmailUDF = udf((arr1: mutable.WrappedArray[String], arr2: mutable.WrappedArray[String]) => {
if (arr1.length != arr2.length) null
else arr1.zipWithIndex.map(t => InfoData(t._1, arr2(t._2).reverse))
}, infoDataSchema)
val spark2_3Processed = df
.withColumn("person_processed",
reverseEmailUDF(
col("person.name").cast("array<string>"),
col("person.email").cast("array<string>")
)
)
spark2_3Processed.show(false)
spark2_3Processed.printSchema()
Output-
+----------------------------------------------------+----------------------------------------------------+
|person |person_processed |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[david, moc.liamg@divad], [steve, moc.liamg@evets]]|
+----------------------------------------------------+----------------------------------------------------+
root
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- email: string (nullable = true)
| | |-- name: string (nullable = true)
|-- person_processed: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- email: string (nullable = true)
User is on spark 2.3 where all higher order functions for map and array are no available and case class creation is difficult. Below solution is for spark 2.3
val subSchema = df.schema("person").dataType
val reverseEmailUDF_withoutCaseClass = //udf((nameArrayRow: Row, emailArrayRow: Row) => {
udf((nameArray: mutable.WrappedArray[String], emailArray: mutable.WrappedArray[String]) => {
if (nameArray.length != emailArray.length) null
else nameArray.zipWithIndex.map(t => (t._1, emailArray(t._2).reverse))
}, subSchema)
val withoutCaseClasDF = df
.withColumn("person_processed",
reverseEmailUDF_withoutCaseClass(
col("person.name").cast("array<string>"),
col("person.email").cast("array<string>")
)
)
withoutCaseClasDF.show(false)
withoutCaseClasDF.printSchema()
withoutCaseClasDF.select("person_processed.email").show(false)
Output-
+----------------------------------------------------+----------------------------------------------------+
|person |person_processed |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[david, moc.liamg@divad], [steve, moc.liamg@evets]]|
+----------------------------------------------------+----------------------------------------------------+
root
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- email: string (nullable = true)
| | |-- name: string (nullable = true)
|-- person_processed: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- email: string (nullable = true)
| | |-- name: string (nullable = true)
+--------------+
|email |
+--------------+
|[david, steve]|
+--------------+
Upvotes: 2
Reputation: 10382
Try below code.
scala> df.show(false)
+----------------------------------------------------+
|person |
+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|
+----------------------------------------------------+
scala> df.printSchema
root
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- email: string (nullable = true)
| | |-- name: string (nullable = true)
scala> val finalDF = df
.select(explode($"person").as("person"))
.groupBy(lit(1).as("id"))
.agg(
collect_list(
struct(
reverse($"person.email").as("email"),
$"person.name").as("person")
).as("person")
)
.drop("id")
finalDF: org.apache.spark.sql.DataFrame = [person: array<struct<email:string,name:string>>]
scala> finalDF.show(false)
+----------------------------------------------------+
|person |
+----------------------------------------------------+
|[[moc.liamg@divad, david], [moc.liamg@evets, steve]]|
+----------------------------------------------------+
scala> finalDF.printSchema
root
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- email: string (nullable = true)
| | |-- name: string (nullable = true)
scala>
Upvotes: 0