Reputation: 35
I have a PySpark DataFrame that looks as follows:
+------+-----------+
|src_ip| timestamp|
+------+-----------+
|A |2020-06-19 |
|B |2020-06-19 |
|B |2020-06-20 |
|C |2020-06-20 |
|D |2020-06-21 |
+------+-----------+
I would like to retrieve the count of every distinct IP address, which are broken down into how many distinct IP addresses are seen per day.
I have tried:
df.groupBy(window(df['timestamp'], "1 day")) \
.agg(countDistinct('src_ip')) \
.orderBy("window").show()
However, this does not give me the correct result as it splits the DF into time windows, and gets the distinct count for each of these time windows as shown:
+-----------+-----------------------+
| window | count(DISTINCT(src_ip)|
+-----------+-----------------------+
|2020-06-19 | 2 |
|2020-06-20 | 2 |
|2020-06-21 | 1 |
+-----------+-----------------------+
This is not correct as B has already appeared on 2020-06-19 and should be classified as distinct.
The resulting table I would like to see is :
+-----------+-----------------------+
| window | count(DISTINCT(src_ip)|
+-----------+-----------------------+
|2020-06-19 | 2 |
|2020-06-20 | 1 |
|2020-06-21 | 1 |
+-----------+-----------------------+
Is this even possible with PySpark? Any help is greatly appreciated.
Upvotes: 0
Views: 297
Reputation: 13581
Is this what you want? or please add more explanations.
df.show(10, False)
+------+----------+
|src_ip|timestamp |
+------+----------+
|A |2020-06-19|
|B |2020-06-19|
|B |2020-06-20|
|C |2020-06-20|
|D |2020-06-21|
+------+----------+
from pyspark.sql.functions import min, window, count
df.groupBy('src_ip').agg(min('timestamp').alias('timestamp')) \
.groupBy('timestamp').agg(count('src_ip').alias('count')) \
.orderBy('timestamp').show(10, False)
+----------+-----+
|timestamp |count|
+----------+-----+
|2020-06-19|2 |
|2020-06-20|1 |
|2020-06-21|1 |
+----------+-----+
Upvotes: 1