Reputation: 5480
I have a data frame like below in PySpark. I want to select serial_num
, devicetype
, device_model
and distinct of timestamp for each serial_num
from the below dataframe:
+-------------+-----------------+---------------+------------------------+
| serial_num | devicetype | device_model | timestamp |
+-------------+-----------------+---------------+------------------------+
| 58172A0396 | | | 2003-01-02 17:37:15.0 |
| 58172A0396 | | | 2003-01-02 17:37:15.0 |
| 46C5Y00693 | Mac Pro | Mac PC | 2018-01-03 17:17:23.0 |
| 1737K7008F | Windows PC | Windows PC | 2018-01-05 11:12:31.0 |
| 1737K7008F | Network Device | Unknown | 2018-01-05 11:12:31.0 |
| 1737K7008F | Network Device | Unknown | 2018-01-05 11:12:31.0 |
| 1737K7008F | Network Device | | 2018-01-06 03:12:52.0 |
| 1737K7008F | Windows PC | Windows PC | 2018-01-06 03:12:52.0 |
| 1737K7008F | Network Device | Unknown | 2018-01-06 03:12:52.0 |
| 1665NF01F3 | Network Device | Unknown | 2018-01-07 03:42:34.0 |
+----------------+-----------------+---------------+---------------------+
I have tried like below
df1 = df.select('serial_num', 'devicetype', 'device_model', f.count('distinct timestamp').over(Window.partitionBy('serial_num')).alias('val')
The result I want is:
+-------------+-----------------+---------------+-----+
| serial_num | devicetype | device_model |count|
+-------------+-----------------+---------------+-----+
| 58172A0396 | | | 1 |
| 58172A0396 | | | 1 |
| 46C5Y00693 | Mac Pro | Mac PC | 1 |
| 1737K7008F | Windows PC | Windows PC | 2 |
| 1737K7008F | Network Device | Unknown | 2 |
| 1737K7008F | Network Device | Unknown | 2 |
| 1737K7008F | Network Device | | 2 |
| 1737K7008F | Windows PC | Windows PC | 2 |
| 1737K7008F | Network Device | Unknown | 2 |
| 1665NF01F3 | Network Device | Unknown | 1 |
+-------------+-----------------+---------------+-----+
How can I achieve that?
Upvotes: 1
Views: 1559
Reputation: 112
Simple groupBy and count will work.
val data=Array(("58172A0396","","","2003-01-02 17:37:15.0"),
("58172A0396","","","2003-01-02 17:37:15.0"),
("46C5Y00693"," Mac Pro","Mac PC","2018-01-03 17:17:23.0"),
("1737K7008F"," Windows PC","Windows PC","2018-01-05 11:12:31.0"),
("1737K7008F"," Network Device","Unknown","2018-01-05 11:12:31.0"),
("1737K7008F"," Network Device","Unknown","2018-01-05 11:12:31.0"),
("1737K7008F"," Network Device","","2018-01-06 03:12:52.0"),
("1737K7008F"," Windows PC","Windows PC","2018-01-06 03:12:52.0"),
("1737K7008F"," Network Device","Unknown","2018-01-06 03:12:52.0"),
("1665NF01F3"," Network Device","Unknown","2018-01-07 03:42:34.0"))
val rdd = sc.parallelize(data)
val df = rdd.toDF("serial_num","devicetype","device_model","timestamp")
val df1 = df.groupBy("timestamp","serial_num","devicetype","device_model").count
Upvotes: 1
Reputation: 28392
Unfortunatly countDistinct
is not supported for windows. However, a combination of collect_set
and size
can be used to acheive the same end result. This is only supported in Spark 2.0+ versions, use as follows:
import pyspark.sql.funcions as F
w = Window.partitionBy('serial_num')
df1 = df.select(..., F.size(F.collect_set('timestamp').over(w)).alias('count'))
For older Spark versions, what you can do is use groupby
and countDistinct
to create a new dataframe with all the counts. Then join
this dataframe together with the original one.
df2 = df.groupby('serial_num').agg(F.countDistinct('timestamp').alias('count'))
df1 = df.join(df2, 'serial_num')
Upvotes: 2