MLeiria
MLeiria

Reputation: 623

How to use structs for column names and values?

I'm playing around with Spark in Scala. I have this structure:

case class MovieRatings(movieName: String, rating: Double)
case class MovieCritics(name: String, movieRatings: List[MovieRatings])

The first class a movie and a rating given by some critic. It could be something like this:

MovieRatings("Logan", 1.5)

and a second class that accepts the name of the critic and a list of movies classified by him. With this I've come up with a List of MovieCritics and each element of the list has a name and a list of MovieRatings. So far so good. Now I want to transform this List in a spark dataFrame to display the data in a more user friendly. Something like this:

Critic | Logan | Zoolander | John Wick | ...
Manuel    1.5       3            2.5
John      2         3.5          3
...

The firs column reflects the movie critic and the following columns represent the movie and the respective rating given by the critic. My question is how to transform a

List(MovieCritics(name: String, movieRatings: List[MovieRatings]))

in that view.

Upvotes: 1

Views: 1571

Answers (3)

Pritam Agarwala
Pritam Agarwala

Reputation: 1

ratings.selectExpr("name","explode(movieRatings) as mr").select("name", 
"mr.movieName", "mr.rating")
.groupBy("name")
.pivot("movieName")
.agg(first("rating"))
.show(10,false)

Upvotes: 0

Jacek Laskowski
Jacek Laskowski

Reputation: 74769

how to transform List(MovieCritics(name: String, movieRatings: List[MovieRatings]))

That's as simple as using toDS on the List. That is only available when you have SparkSession's implicits in scope which is (again) as simple as the following:

val sparkSession = SparkSession.builder.getOrCreate()
import sparkSession.implicits._

If you work with scala.collection.immutable.Iterable[MovieCritics] or similar collection data structure you have to "map" it using toSeq or toArray before toDS to "escape" from Iterable. Implicits are not available for Iterables.

Given the list is critics you'd have to do the following:

critics.toDS

Now I want to transform this List in a spark dataFrame to display the data in a more user friendly.

That's the most entertaining part of your question (took me a couple of hours to finally understand and write a solution). I'd appreciate comments to make it prettier.

case class MovieRatings(movieName: String, rating: Double)
case class MovieCritics(name: String, movieRatings: Seq[MovieRatings])
val movies_critics = Seq(
  MovieCritics("Manuel", Seq(MovieRatings("Logan", 1.5), MovieRatings("Zoolander", 3), MovieRatings("John Wick", 2.5))),
  MovieCritics("John", Seq(MovieRatings("Logan", 2), MovieRatings("Zoolander", 3.5), MovieRatings("John Wick", 3))))

With the input dataset set, here comes the solution.

val ratings = movies_critics.toDF
scala> ratings.show(false)
+------+-----------------------------------------------+
|name  |movieRatings                                   |
+------+-----------------------------------------------+
|Manuel|[[Logan,1.5], [Zoolander,3.0], [John Wick,2.5]]|
|John  |[[Logan,2.0], [Zoolander,3.5], [John Wick,3.0]]|
+------+-----------------------------------------------+

val ratingsCount = ratings.
  withColumn("size", size($"movieRatings")).
  select(max("size")).
  as[Int].
  head

val names_ratings = (0 until ratingsCount).
  foldLeft(ratings) { case (ds, counter) => ds.
    withColumn(s"name_$counter", $"movieRatings"(counter)("movieName")).
    withColumn(s"rating_$counter", $"movieRatings"(counter)("rating")) }

val movieColumns = names_ratings.
  columns.
  drop(1).
  filter(name => name.startsWith("name")).
  map(col)
val movieNames = names_ratings.select(movieColumns: _*).head.toSeq.map(_.toString)
val ratingNames = movieNames.indices.map(idx => s"rating_$idx")
val cols = movieNames.zip(ratingNames).map { case (movie, rn) =>
  col(rn) as movie
}
val solution = names_ratings.select(($"name" +: cols): _*)
scala> solution.show
+------+-----+---------+---------+
|  name|Logan|Zoolander|John Wick|
+------+-----+---------+---------+
|Manuel|  1.5|      3.0|      2.5|
|  John|  2.0|      3.5|      3.0|
+------+-----+---------+---------+

Upvotes: 1

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

If you have data of movies as

val movieCritics = List(
  MovieCritics("Manual", List(MovieRatings("Logan", 1.5), MovieRatings("Zoolander", 3), MovieRatings("John Wick", 2.5))),
  MovieCritics("John", List(MovieRatings("Logan", 2), MovieRatings("Zoolander", 3.5), MovieRatings("John Wick", 3)))
)

You can create dataframe just by calling toDF as

import sqlContext.implicits._
val df = movieCritics.toDF

Which should be

+------+-----------------------------------------------+
|name  |movieRatings                                   |
+------+-----------------------------------------------+
|Manual|[[Logan,1.5], [Zoolander,3.0], [John Wick,2.5]]|
|John  |[[Logan,2.0], [Zoolander,3.5], [John Wick,3.0]]|
+------+-----------------------------------------------+

Now simple select from above dataframe should get you the output you need

import org.apache.spark.sql.functions._
df.select(col("name"), col("movieRatings")(0)("rating").as("Logan"), col("movieRatings")(1)("rating").as("Zoolander"), col("movieRatings")(2)("rating").as("John Wick")).show(false)

This should result the final dataframe as

+------+-----+---------+---------+
|name  |Logan|Zoolander|John Wick|
+------+-----+---------+---------+
|Manual|1.5  |3.0      |2.5      |
|John  |2.0  |3.5      |3.0      |
+------+-----+---------+---------+

Upvotes: 2

Related Questions