Reputation:
I have the following dataframe:
+---------------+--------------+--------------+-----+
| column0| column1| column2|label|
+---------------+--------------+--------------+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2|
+---------------+--------------+--------------+-----+
I want to apply groupBy and count on it and make the following result:
+--------------+--------------+-----+
| column1| column2|count|
+--------------+--------------+-----+
|10.0.0.2.54880| 10.0.0.3.5001| 19|
| 10.0.0.3.5001|10.0.0.2.54880| 10|
+--------------+--------------+-----+
I know that I have to use this:
dataFrame_Train.groupBy("column1", "column2").count().show()
However the problem is that I need to add "count" column as a permanent column to my dataframe.
In the above case, if I use dataFrame_Train.show()
after the groupBy
, I see the first dataframe without "count" column. This code:
dataFrame_Train.groupBy("column1", "column2").count().show()
`dataFrame_Train.show()`
Can you help me to add groupBy("column1", "column2").count()
to the dataframe? (Since I need to use "count" column for training the data in future) Thanks.
Upvotes: 3
Views: 3312
Reputation: 74669
@eliasah's answer is fine, but might not be the most effective, code- and performance-wise.
Whenever you see a need for groupBy
and join
, esp. for such a simple use case like this, think of window aggregate functions.
The main difference between groupBy
and window aggregations is that the former gives you at most the number of rows as in the source dataset while the latter (window aggregates) gives you exactly the number of rows as in the source dataset. That seems to match your requirements exactly, doesn't it?
With that, let's see the code.
import org.apache.spark.sql.expressions.Window
val columns1and2 = Window.partitionBy("column1", "column2") // <-- matches groupBy
import org.apache.spark.sql.functions._
// using count aggregate function over entire partition frame
val counts = ips.withColumn("count", count($"label") over columns1and2)
scala> counts.show
+---------------+--------------+--------------+-----+-----+
| column0| column1| column2|label|count|
+---------------+--------------+--------------+-----+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001| 2| 13|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880| 2| 7|
+---------------+--------------+--------------+-----+-----+
Done! Clean and easy. That's my beloved window aggregate functions!
Now, comes the fun part. Is the difference between this and @eliasah's solutions just pure syntax? I don't think so (yet I'm still learning how to draw right conclusions). See the execution plans and judge yourself.
The following is the execution plan for window aggregation.
The following however is the execution plan for groupBy
and join
(I had to take two screenshots as the plan was too big to include in one).
Job-wise groupBy
and join
query beats window aggregation easily, 2 Spark jobs for the former while 5 for the latter.
Operator-wise, their number and most importantly Exchanges (which are Spark SQL's shuffles), window aggregation may have beaten groupBy
with join
.
Upvotes: 7
Reputation: 40370
We will use the same data that you have presented in csv
format.
Let's read those data :
scala> val df = spark.read.format("csv").load("data.txt").toDF("column0","column1","column2","label")
// df: org.apache.spark.sql.DataFrame = [column0: string, column1: string ... 2 more fields]
We can now perform our group by aggregations :
scala> val df2 = df.groupBy("column1","column2").count
df2: org.apache.spark.sql.DataFrame = [column1: string, column2: string ... 1 more field]
All we need to do is an equi-join on the same columns you performed the group by key on :
scala> val df3 = df.join(df2, Seq("column1","column2"))
df3: org.apache.spark.sql.DataFrame = [column1: string, column2: string ... 3 more fields]
Et voilà !
scala> df3.show
+--------------+--------------+---------------+-----+-----+
| column1| column2| column0|label|count|
+--------------+--------------+---------------+-----+-----+
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899| 2| 13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908| 2| 7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900| 2| 13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899| 2| 13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908| 2| 7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900| 2| 13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899| 2| 13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908| 2| 7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900| 2| 13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899| 2| 13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908| 2| 7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900| 2| 13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899| 2| 13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908| 2| 7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900| 2| 13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899| 2| 13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908| 2| 7|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604900| 2| 13|
|10.0.0.2.54880| 10.0.0.3.5001|05:49:56.604899| 2| 13|
| 10.0.0.3.5001|10.0.0.2.54880|05:49:56.604908| 2| 7|
+--------------+--------------+---------------+-----+-----+
Upvotes: 4