Reputation: 623
I'm playing around with Spark in Scala. I have this structure:
case class MovieRatings(movieName: String, rating: Double)
case class MovieCritics(name: String, movieRatings: List[MovieRatings])
The first class a movie and a rating given by some critic. It could be something like this:
MovieRatings("Logan", 1.5)
and a second class that accepts the name of the critic and a list of movies classified by him. With this I've come up with a List of MovieCritics and each element of the list has a name and a list of MovieRatings. So far so good. Now I want to transform this List in a spark dataFrame to display the data in a more user friendly. Something like this:
Critic | Logan | Zoolander | John Wick | ...
Manuel 1.5 3 2.5
John 2 3.5 3
...
The firs column reflects the movie critic and the following columns represent the movie and the respective rating given by the critic. My question is how to transform a
List(MovieCritics(name: String, movieRatings: List[MovieRatings]))
in that view.
Upvotes: 1
Views: 1571
Reputation: 1
ratings.selectExpr("name","explode(movieRatings) as mr").select("name",
"mr.movieName", "mr.rating")
.groupBy("name")
.pivot("movieName")
.agg(first("rating"))
.show(10,false)
Upvotes: 0
Reputation: 74769
how to transform
List(MovieCritics(name: String, movieRatings: List[MovieRatings]))
That's as simple as using toDS
on the List
. That is only available when you have SparkSession
's implicits in scope which is (again) as simple as the following:
val sparkSession = SparkSession.builder.getOrCreate()
import sparkSession.implicits._
If you work with scala.collection.immutable.Iterable[MovieCritics]
or similar collection data structure you have to "map" it using toSeq
or toArray
before toDS
to "escape" from Iterable. Implicits are not available for Iterables.
Given the list is critics
you'd have to do the following:
critics.toDS
Now I want to transform this List in a spark dataFrame to display the data in a more user friendly.
That's the most entertaining part of your question (took me a couple of hours to finally understand and write a solution). I'd appreciate comments to make it prettier.
case class MovieRatings(movieName: String, rating: Double)
case class MovieCritics(name: String, movieRatings: Seq[MovieRatings])
val movies_critics = Seq(
MovieCritics("Manuel", Seq(MovieRatings("Logan", 1.5), MovieRatings("Zoolander", 3), MovieRatings("John Wick", 2.5))),
MovieCritics("John", Seq(MovieRatings("Logan", 2), MovieRatings("Zoolander", 3.5), MovieRatings("John Wick", 3))))
With the input dataset set, here comes the solution.
val ratings = movies_critics.toDF
scala> ratings.show(false)
+------+-----------------------------------------------+
|name |movieRatings |
+------+-----------------------------------------------+
|Manuel|[[Logan,1.5], [Zoolander,3.0], [John Wick,2.5]]|
|John |[[Logan,2.0], [Zoolander,3.5], [John Wick,3.0]]|
+------+-----------------------------------------------+
val ratingsCount = ratings.
withColumn("size", size($"movieRatings")).
select(max("size")).
as[Int].
head
val names_ratings = (0 until ratingsCount).
foldLeft(ratings) { case (ds, counter) => ds.
withColumn(s"name_$counter", $"movieRatings"(counter)("movieName")).
withColumn(s"rating_$counter", $"movieRatings"(counter)("rating")) }
val movieColumns = names_ratings.
columns.
drop(1).
filter(name => name.startsWith("name")).
map(col)
val movieNames = names_ratings.select(movieColumns: _*).head.toSeq.map(_.toString)
val ratingNames = movieNames.indices.map(idx => s"rating_$idx")
val cols = movieNames.zip(ratingNames).map { case (movie, rn) =>
col(rn) as movie
}
val solution = names_ratings.select(($"name" +: cols): _*)
scala> solution.show
+------+-----+---------+---------+
| name|Logan|Zoolander|John Wick|
+------+-----+---------+---------+
|Manuel| 1.5| 3.0| 2.5|
| John| 2.0| 3.5| 3.0|
+------+-----+---------+---------+
Upvotes: 1
Reputation: 41987
If you have data of movies as
val movieCritics = List(
MovieCritics("Manual", List(MovieRatings("Logan", 1.5), MovieRatings("Zoolander", 3), MovieRatings("John Wick", 2.5))),
MovieCritics("John", List(MovieRatings("Logan", 2), MovieRatings("Zoolander", 3.5), MovieRatings("John Wick", 3)))
)
You can create dataframe just by calling toDF
as
import sqlContext.implicits._
val df = movieCritics.toDF
Which should be
+------+-----------------------------------------------+
|name |movieRatings |
+------+-----------------------------------------------+
|Manual|[[Logan,1.5], [Zoolander,3.0], [John Wick,2.5]]|
|John |[[Logan,2.0], [Zoolander,3.5], [John Wick,3.0]]|
+------+-----------------------------------------------+
Now simple select
from above dataframe should get you the output you need
import org.apache.spark.sql.functions._
df.select(col("name"), col("movieRatings")(0)("rating").as("Logan"), col("movieRatings")(1)("rating").as("Zoolander"), col("movieRatings")(2)("rating").as("John Wick")).show(false)
This should result the final dataframe as
+------+-----+---------+---------+
|name |Logan|Zoolander|John Wick|
+------+-----+---------+---------+
|Manual|1.5 |3.0 |2.5 |
|John |2.0 |3.5 |3.0 |
+------+-----+---------+---------+
Upvotes: 2