Reputation: 7605
I want to start by saying I am forced to use Spark 1.6
I am generating a DataFrame
from a JSON file like this:
{"id" : "1201", "name" : "satish", "age" : "25"},
{"id" : "1202", "name" : "krishna", "age" : "28"},
{"id" : "1203", "name" : "amith", "age" : "28"},
{"id" : "1204", "name" : "javed", "age" : "23"},
{"id" : "1205", "name" : "mendy", "age" : "25"},
{"id" : "1206", "name" : "rob", "age" : "24"},
{"id" : "1207", "name" : "prudvi", "age" : "23"}
The DataFrame
looks like:
+---+----+-------+
|age| id| name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 28|1203| amith|
| 23|1204| javed|
| 25|1205| mendy|
| 24|1206| rob|
| 23|1207| prudvi|
+---+----+-------+
What I do with this DataFrame
is to group by age, order by id and filter all age group with more than 1 student. I use the following script:
import sqlContext.implicits._
val df = sqlContext.read.json("students.json")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val arrLen = udf {a: Seq[Row] => a.length > 1 }
val mergedDF = df.withColumn("newCol", collect_set(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("newCol","age")
val filterd = mergedDF.filter(arrLen(col("newCol")))
And now the current result is:
[WrappedArray([28,1203,amith], [28,1202,krishna]),28]
[WrappedArray([25,1201,satish], [25,1205,mendy]),25]
[WrappedArray([23,1204,javed], [23,1207,prudvi]),23]
What I want now is to merge those two students rows inside the WrappedArray
into one, taking for example the id
of the first student and the name
of the second student.
To achieve that I wrote the following function:
def PrintOne(List : Seq[Row], age : String):Row ={
val studentsDetails = Array(age, List(0).getAs[String]("id"), List(1).getAs[String]("name"))
val mergedStudent= new GenericRowWithSchema(studentsDetails .toArray,List(0).schema)
mergedStudent
}
I know this function do the trick, because when I test it using a foreach it prints out the expected values:
filterd.foreach{x => val student = PrintOne(x.getAs[Seq[Row]](0), x.getAs[String]("age"))
println("merged student: "+student)
}
OutPut:
merged student: [28,1203,krishna]
merged student: [23,1204,prudvi]
merged student: [25,1201,mendy]
But when I try to do the same inside a map to collect the returned values the problems begin.
If I run without encoder:
val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}
I get the following exception:
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row - field (class: "org.apache.spark.sql.Row", name: "_2") - root class: "scala.Tuple2"
And when I try to generate an Econder
on my own, I fail as well:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(filterd.schema)
val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}(encoder)
type mismatch; found : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] required: org.apache.spark.sql.Encoder[(String, org.apache.spark.sql.Row)]
How can I provide the correct encoder or even better, avoid it?
I have been told to avoid using map + a custom function, but the logic I need to apply is more complex than just pick up one field from each row. It will be more to combine fields from several, checking the order of the rows and if the values are null or not. And as far as I know just by using a custom function I can solve it.
Upvotes: 1
Views: 1509
Reputation: 330093
The output from the map
is of type (String, Row)
therefore it cannot be encoded using RowEncoder
alone. You have to provide matching tuple encoder:
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
val encoder = Encoders.tuple(
Encoders.STRING,
RowEncoder(
// The same as df.schema in your case
StructType(Seq(
StructField("age", StringType),
StructField("id", StringType),
StructField("name", StringType)))))
filterd.map{row => (
row.getAs[String]("age"),
PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))
}(encoder)
Overall this approach looks like an anti-pattern. If you want to use more functional style you should avoid Dataset[Row]
:
case class Person(age: String, id: String, name: String)
filterd.as[(Seq[Person], String)].map {
case (people, age) => (age, (age, people(0).id, people(1).name))
}
or udf
.
Also please note that o.a.s.sql.catalyst
package, including GenericRowWithSchema
, is intended mostly for internal usage. Unless necessary otherwise, prefer o.a.s.sql.Row
.
Upvotes: 3