Ignacio Alorre
Ignacio Alorre

Reputation: 7605

Spark - How to combine/merge elements in Dataframe which are in Seq[Row] to generate a Row

I want to start by saying I am forced to use Spark 1.6

I am generating a DataFrame from a JSON file like this:

{"id" : "1201", "name" : "satish", "age" : "25"},
{"id" : "1202", "name" : "krishna", "age" : "28"},
{"id" : "1203", "name" : "amith", "age" : "28"},
{"id" : "1204", "name" : "javed", "age" : "23"},
{"id" : "1205", "name" : "mendy", "age" : "25"},
{"id" : "1206", "name" : "rob", "age" : "24"},
{"id" : "1207", "name" : "prudvi", "age" : "23"}

The DataFrame looks like:

+---+----+-------+
|age|  id|   name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 28|1203|  amith|
| 23|1204|  javed|
| 25|1205|  mendy|
| 24|1206|    rob|
| 23|1207| prudvi|
+---+----+-------+

What I do with this DataFrame is to group by age, order by id and filter all age group with more than 1 student. I use the following script:

import sqlContext.implicits._

val df = sqlContext.read.json("students.json")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val arrLen = udf {a: Seq[Row] => a.length > 1 }

val mergedDF = df.withColumn("newCol", collect_set(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("newCol","age")

val filterd = mergedDF.filter(arrLen(col("newCol")))

And now the current result is:

[WrappedArray([28,1203,amith], [28,1202,krishna]),28]
[WrappedArray([25,1201,satish], [25,1205,mendy]),25]
[WrappedArray([23,1204,javed], [23,1207,prudvi]),23]

What I want now is to merge those two students rows inside the WrappedArray into one, taking for example the id of the first student and the name of the second student.

To achieve that I wrote the following function:

def PrintOne(List : Seq[Row], age : String):Row  ={ 
      val studentsDetails = Array(age, List(0).getAs[String]("id"), List(1).getAs[String]("name")) 
      val mergedStudent= new GenericRowWithSchema(studentsDetails .toArray,List(0).schema)

      mergedStudent
    }

I know this function do the trick, because when I test it using a foreach it prints out the expected values:

filterd.foreach{x => val student = PrintOne(x.getAs[Seq[Row]](0), x.getAs[String]("age"))
                         println("merged student: "+student)
                   }

OutPut:

merged student: [28,1203,krishna]
merged student: [23,1204,prudvi]
merged student: [25,1201,mendy]

But when I try to do the same inside a map to collect the returned values the problems begin.

If I run without encoder:

val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}

I get the following exception:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row - field (class: "org.apache.spark.sql.Row", name: "_2") - root class: "scala.Tuple2"

And when I try to generate an Econder on my own, I fail as well:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
    implicit val encoder = RowEncoder(filterd.schema)

    val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}(encoder)

type mismatch; found : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] required: org.apache.spark.sql.Encoder[(String, org.apache.spark.sql.Row)]

How can I provide the correct encoder or even better, avoid it?

I have been told to avoid using map + a custom function, but the logic I need to apply is more complex than just pick up one field from each row. It will be more to combine fields from several, checking the order of the rows and if the values are null or not. And as far as I know just by using a custom function I can solve it.

Upvotes: 1

Views: 1509

Answers (1)

zero323
zero323

Reputation: 330093

The output from the map is of type (String, Row) therefore it cannot be encoded using RowEncoder alone. You have to provide matching tuple encoder:

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val encoder = Encoders.tuple(
  Encoders.STRING,
  RowEncoder(
    // The same as df.schema in your case
    StructType(Seq(
      StructField("age", StringType), 
      StructField("id", StringType),
      StructField("name", StringType)))))

filterd.map{row => (
  row.getAs[String]("age"),
  PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))
}(encoder)

Overall this approach looks like an anti-pattern. If you want to use more functional style you should avoid Dataset[Row]:

case class Person(age: String, id: String, name: String)

filterd.as[(Seq[Person], String)].map { 
  case (people, age)  => (age, (age, people(0).id, people(1).name))
}

or udf.

Also please note that o.a.s.sql.catalyst package, including GenericRowWithSchema, is intended mostly for internal usage. Unless necessary otherwise, prefer o.a.s.sql.Row.

Upvotes: 3

Related Questions