Khaoula Arfaoui
Khaoula Arfaoui

Reputation: 75

while loop on a map spark dataframe

i have this problem with a mapType in spark using scala API for each session we are sending a map in which you can find the categories the user visited associated with the number of events in each category

[ home & personal items > interior -> 1, vehicles > cars -> 1] 

Not all the user visit the same number of categories so the size of the map changes based on the user_id

i need to calculate the number of session grouped by category in order to that i need to loop over the map and while its not it's not empty things i've tried before

while (size(col("categoriesRaw")) !== 0) {
    df.select(
        explode(col("categoriesRaw"))
    )
    .select(
        col("key").alias("categ"),
        col("value").alias("number_of_events")
    )
}

but i'm facing some errors like :

type mismatch;
 found   : org.apache.spark.sql.Column
 required: Booleansbt

Upvotes: 0

Views: 2489

Answers (1)

Oli
Oli

Reputation: 10406

I'm not sure what you are trying to do with the while loop. Anyway, you can check with the REPL that the expression you use as a condition is a Column and not a Boolean, hence the Exception.

> size(col("categoriesRaw")) !== 0
res1: org.apache.spark.sql.Column = (NOT (size(categoriesRaw) = 0))

Basically, this is an expression that needs to be evaluated by SparkSQL within a where, select or any other function that uses Columns.

Nevertheless, with your spark code you are almost there, you just need to add a groupBy to get where you want. Let's start by creating your data.

import spark.implicits._
val users = Seq( "user 1" -> Map("home & personal items > interior" -> 1,
                                 "vehicles > cars" -> 1), 
                 "user 2" -> Map("vehicles > cars" -> 3)) 
val df = users.toDF("user", "categoriesRaw")

Then, you don't need a while loop to iterate over all the values of the maps. explode does exactly that for you:

val explodedDf = df.select( explode('categoriesRaw) )
explodedDf.show(false)

+--------------------------------+-----+
|key                             |value|
+--------------------------------+-----+
|home & personal items > interior|1    |        
|vehicles > cars                 |1    |
|vehicles > cars                 |3    |
+--------------------------------+-----+ 

Finally, you can use groupBy add get what you want.

explodedDf
    .select('key as "categ", 'value as "number_of_events")
    .groupBy("categ")
    .agg(count('*), sum('number_of_events))
    .show(false)

+--------------------------------+--------+---------------------+
|categ                           |count(1)|sum(number_of_events)|
+--------------------------------+--------+---------------------+
|home & personal items > interior|1       |1                    |
|vehicles > cars                 |2       |4                    |
+--------------------------------+--------+---------------------+

NB: I was not sure if you wanted to count the sessions (1st column) or the events (2nd column) so I computed both.

Upvotes: 1

Related Questions