Reputation: 413
I am quite new in Spark and i have a problem with dataframe. I need to group the unique categorical variables from two columns (estado, producto) and then count and sort(asc) the unique values of the second column (producto). I can to do this in Pandas but i can't reproduce it in Spark.
My original dataframe is
+--------------------+--------------------+
| estado| producto|
+--------------------+--------------------+
| MÉXICO|TINTE PARA EL CAB...|
| MÉXICO| TELEVISORES|
| MÉXICO| ACELGA|
| MÉXICO| QUESO. COTIJA|
| DISTRITO FEDERAL| AZUCAR|
| MÉXICO| DESENFRIOL-ITO|
| JALISCO| ARROZ|
| OAXACA|PEDIALYTE. ELECTR...|
| TLAXCALA| AGUA SIN GAS|
|VERACRUZ DE IGNAC...| TOMATE|
| MICHOACÁN DE OCAMPO| PAN DE CAJA|
| YUCATÁN| FLAGENASE 400|
| MICHOACÁN DE OCAMPO| ECTIVA|
| YUCATÁN| SALSA CATSUP|
| YUCATÁN| CLAVULIN|
| YUCATÁN| CAPOTENA|
| JALISCO| FLAGENASE 400|
| HIDALGO| VERMOX|
| OAXACA| MAIZ POZOLERO|
| OAXACA| AJO|
+--------------------+--------------------+
only showing top 20 rows
and my dummy attempt:
df.groupBy('estado','producto').agg({'producto':'count'}).show()
+--------------------+--------------------+---------------+
| estado| producto|count(producto)|
+--------------------+--------------------+---------------+
| MÉXICO|TINTE PARA EL CAB...| 44007|
| MÉXICO| TELEVISORES| 29702|
| MÉXICO| ACELGA| 7691|
| MÉXICO| QUESO. COTIJA| 4414|
| DISTRITO FEDERAL| AZUCAR| 18078|
| MÉXICO| DESENFRIOL-ITO| 642|
| JALISCO| ARROZ| 11735|
| OAXACA|PEDIALYTE. ELECTR...| 302|
| TLAXCALA| AGUA SIN GAS| 14505|
|VERACRUZ DE IGNAC...| TOMATE| 652|
| MICHOACÁN DE OCAMPO| PAN DE CAJA| 13003|
| YUCATÁN| FLAGENASE 400| 313|
| MICHOACÁN DE OCAMPO| ECTIVA| 39|
| YUCATÁN| SALSA CATSUP| 6549|
| YUCATÁN| CLAVULIN| 183|
| YUCATÁN| CAPOTENA| 271|
| JALISCO| FLAGENASE 400| 699|
| HIDALGO| VERMOX| 121|
| OAXACA| MAIZ POZOLERO| 1387|
| OAXACA| AJO| 783|
+--------------------+--------------------+---------------+
but i need a similar output as
+--------------------+--------------------+---------------+
| estado| producto|count(producto)|
+--------------------+--------------------+---------------+
| DISTRITO FEDERAL| AZUCAR| 18078|
| HIDALGO| VERMOX| 121|
| JALISCO| ARROZ| 11735|
| JALISCO| FLAGENASE 400| 699|
| MÉXICO|TINTE PARA EL CAB...| 44007|
| MÉXICO| TELEVISORES| 29702|
| MÉXICO| ACELGA| 7691|
| MÉXICO| QUESO. COTIJA| 4414|
| MÉXICO| DESENFRIOL-ITO| 642|
| MICHOACÁN DE OCAMPO| PAN DE CAJA| 13003|
| MICHOACÁN DE OCAMPO| ECTIVA| 39|
| OAXACA| MAIZ POZOLERO| 1387|
| OAXACA| AJO| 783|
| OAXACA|PEDIALYTE. ELECTR...| 302|
| TLAXCALA| AGUA SIN GAS| 14505|
|VERACRUZ DE IGNAC...| TOMATE| 652|
| YUCATÁN| SALSA CATSUP| 6549|
| YUCATÁN| FLAGENASE 400| 313|
| YUCATÁN| CLAVULIN| 183|
| YUCATÁN| CAPOTENA| 271|
+--------------------+--------------------+---------------+
or even better, showing the unique categorical variable of second column for each unique variable of the first column
+--------------------+--------------------+---------------+
| estado| producto|count(producto)|
+--------------------+--------------------+---------------+
| DISTRITO FEDERAL| AZUCAR| 18078|
| HIDALGO| VERMOX| 121|
| JALISCO| ARROZ| 11735|
| | FLAGENASE 400| 699|
| MÉXICO|TINTE PARA EL CAB...| 44007|
| | TELEVISORES| 29702|
| | ACELGA| 7691|
| | QUESO. COTIJA| 4414|
| | DESENFRIOL-ITO| 642|
| MICHOACÁN DE OCAMPO| PAN DE CAJA| 13003|
| | ECTIVA| 39|
| OAXACA| MAIZ POZOLERO| 1387|
| | AJO| 783|
| |PEDIALYTE. ELECTR...| 302|
| TLAXCALA| AGUA SIN GAS| 14505|
|VERACRUZ DE IGNAC...| TOMATE| 652|
| YUCATÁN| SALSA CATSUP| 6549|
| | FLAGENASE 400| 313|
| | CLAVULIN| 183|
| | CAPOTENA| 271|
+--------------------+--------------------+---------------+
I hope I have been clear with my question, I am sorry for my bad English. thank's
UPDATE: i forgot to say that in the third column (count(producto)) only show the first n largest, let's say, the first three largest, similar to the function nlargest() in Pandas.
I think to use something like this
df.groupBy('estado','producto').count().filter("'count'>=3").sort(asc("count"))
but i haven't tried it.
UPDATE 2
I have tried a suggestion in the comments with this code
df.groupBy("estado","producto").count()\
.withColumn("row_num",F.row_number()\
.over(Window.partitionBy("estado","producto")\
.orderBy(F.col("count").desc())))\
.filter(F.col("row_num") < 3)\
.drop("row_num")\
.orderBy(F.col("estado"), F.col("producto").desc(),F.col("count")\
.desc()).show()
but the output is not the desired
+------+-------------+-----+
|estado| producto|count|
+------+-------------+-----+
| null|ZWAN. PREMIUM| 55|
| null| ZWAN| 55|
| null| ZUCARITAS| 20|
| null| ZOFILIP| 9|
| null| ZINTREPID| 9|
| null| ZINNAT| 9|
| null| ZANAHORIA| 14|
| null| ZACTOS| 9|
| null| YOGHURT| 203|
| null| YASMIN 24/4| 9|
| null| YASMIN| 9|
| null| XATRAL-OD| 8|
| null|VYTORIN 10/20| 8|
| null| VINO DE MESA| 7|
| null| VINAGRE| 66|
| null| VIDEOJUEGOS| 7|
| null| VIDEOCAMARAS| 1|
| null| VICK VAPORUB| 16|
| null| VIAGRA| 17|
| null| VERMOX PLUS| 9|
+------+-------------+-----+
only showing top 20 rows
I try to modify the code a bit but I get this
+----------------+--------------------+------+
| estado| producto| count|
+----------------+--------------------+------+
|DISTRITO FEDERAL| REFRESCO|287463|
|DISTRITO FEDERAL| FUD|207569|
| MÉXICO| REFRESCO|194939|
|DISTRITO FEDERAL|LECHE ULTRAPASTEU...|175640|
|DISTRITO FEDERAL| DETERGENTE P/ROPA|173452|
| MÉXICO| FUD|149141|
|DISTRITO FEDERAL| YOGHURT|136720|
|DISTRITO FEDERAL| CERVEZA|136686|
| MÉXICO| DETERGENTE P/ROPA|132862|
|DISTRITO FEDERAL| MAYONESA|131103|
|DISTRITO FEDERAL| CHILES EN LATA|130598|
|DISTRITO FEDERAL| JABON DE TOCADOR|129889|
|DISTRITO FEDERAL| SHAMPOO|125603|
|DISTRITO FEDERAL| LECHE EN POLVO|116827|
| MÉXICO|LECHE ULTRAPASTEU...|116522|
|DISTRITO FEDERAL| DESODORANTE|113779|
|DISTRITO FEDERAL| HUEVO|111412|
|DISTRITO FEDERAL| TOALLA FEMENINA|102356|
|DISTRITO FEDERAL| MARGARINA| 98235|
| MÉXICO| JABON DE TOCADOR| 97330|
+----------------+--------------------+------+
only showing top 20 rows
Upvotes: 2
Views: 2672
Reputation: 1076
You are counting properly. You just need to use row_number window to select the top 3 and order the results properly. Have look at the code below.
df.groupBy("estado","producto").count()
.withColumn("row_num",row_number().over(Window.partitionBy("estado","producto").orderBy(col("count").desc)))
.filter(col("row_num") <= 3)
.drop("row_num")
.orderBy(col("estado"), col("producto"), col("count").desc)
Check this link for properly using row_number in Pyspark. Spark SQL Row_number() PartitionBy Sort Desc
Upvotes: 1