user8131063
user8131063

Reputation:

How to add groupBy().count() to the source DataFrame?

I have the following dataframe:

+---------------+--------------+--------------+-----+
|        column0|       column1|       column2|label|
+---------------+--------------+--------------+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
+---------------+--------------+--------------+-----+

I want to apply groupBy and count on it and make the following result:

+--------------+--------------+-----+
|       column1|       column2|count|
+--------------+--------------+-----+
|10.0.0.2.54880| 10.0.0.3.5001|   19|
| 10.0.0.3.5001|10.0.0.2.54880|   10|
+--------------+--------------+-----+

I know that I have to use this:

dataFrame_Train.groupBy("column1", "column2").count().show()

However the problem is that I need to add "count" column as a permanent column to my dataframe. In the above case, if I use dataFrame_Train.show() after the groupBy, I see the first dataframe without "count" column. This code:

dataFrame_Train.groupBy("column1", "column2").count().show()
`dataFrame_Train.show()`

Can you help me to add groupBy("column1", "column2").count() to the dataframe? (Since I need to use "count" column for training the data in future) Thanks.

Upvotes: 3

Views: 3312

Answers (2)

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

@eliasah's answer is fine, but might not be the most effective, code- and performance-wise.

Window Aggregate Functions (aka Window Aggregations)

Whenever you see a need for groupBy and join, esp. for such a simple use case like this, think of window aggregate functions.

The main difference between groupBy and window aggregations is that the former gives you at most the number of rows as in the source dataset while the latter (window aggregates) gives you exactly the number of rows as in the source dataset. That seems to match your requirements exactly, doesn't it?

With that, let's see the code.

import org.apache.spark.sql.expressions.Window
val columns1and2 = Window.partitionBy("column1", "column2") // <-- matches groupBy

import org.apache.spark.sql.functions._
// using count aggregate function over entire partition frame
val counts = ips.withColumn("count", count($"label") over columns1and2)
scala> counts.show
+---------------+--------------+--------------+-----+-----+
|        column0|       column1|       column2|label|count|
+---------------+--------------+--------------+-----+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
+---------------+--------------+--------------+-----+-----+

Done! Clean and easy. That's my beloved window aggregate functions!

Performance Comparison

Now, comes the fun part. Is the difference between this and @eliasah's solutions just pure syntax? I don't think so (yet I'm still learning how to draw right conclusions). See the execution plans and judge yourself.

The following is the execution plan for window aggregation.

enter image description here

The following however is the execution plan for groupBy and join (I had to take two screenshots as the plan was too big to include in one).

enter image description here enter image description here

Job-wise groupBy and join query beats window aggregation easily, 2 Spark jobs for the former while 5 for the latter.

Operator-wise, their number and most importantly Exchanges (which are Spark SQL's shuffles), window aggregation may have beaten groupBy with join.

Upvotes: 7

eliasah
eliasah

Reputation: 40370

We will use the same data that you have presented in csv format.

Let's read those data :

scala> val df = spark.read.format("csv").load("data.txt").toDF("column0","column1","column2","label")
// df: org.apache.spark.sql.DataFrame = [column0: string, column1: string ... 2 more fields]

We can now perform our group by aggregations :

scala> val df2 = df.groupBy("column1","column2").count
df2: org.apache.spark.sql.DataFrame = [column1: string, column2: string ... 1 more field]

All we need to do is an equi-join on the same columns you performed the group by key on :

scala> val df3 = df.join(df2, Seq("column1","column2"))
df3: org.apache.spark.sql.DataFrame = [column1: string, column2: string ... 3 more fields]

Et voilà !

scala> df3.show
+--------------+--------------+---------------+-----+-----+                     
|       column1|       column2|        column0|label|count|
+--------------+--------------+---------------+-----+-----+
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899|    2|   13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908|    2|    7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900|    2|   13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899|    2|   13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908|    2|    7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900|    2|   13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899|    2|   13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908|    2|    7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900|    2|   13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899|    2|   13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908|    2|    7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900|    2|   13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899|    2|   13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908|    2|    7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900|    2|   13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899|    2|   13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908|    2|    7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900|    2|   13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899|    2|   13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908|    2|    7|
+--------------+--------------+---------------+-----+-----+

Upvotes: 4

Related Questions