Reputation: 193
I'm new to pyspark. I usually work with pandas. I to iterate through row by row using a column in pyspark. My dataset looks like:-
+-------------------+--------------------+--------+-----+
| DateTime| user_name|keyboard|mouse|
+-------------------+--------------------+--------+-----+
|2019-10-21 08:35:01|prathameshsalap@g...| 333.0|658.0|
|2019-10-21 08:35:01|vaishusawant143@g...| 447.5| 0.0|
|2019-10-21 08:35:01| [email protected]| 0.5| 1.0|
|2019-10-21 08:40:01| [email protected]| 0.0| 0.0|
|2019-10-21 08:40:01|prathameshsalap@g...| 227.0|366.0|
|2019-10-21 08:40:02|vaishusawant143@g...| 472.0| 0.0|
|2019-10-21 08:45:01| [email protected]| 0.0| 0.0|
|2019-10-21 08:45:01|prathameshsalap@g...| 35.0|458.0|
|2019-10-21 08:45:01|vaishusawant143@g...| 1659.5| 0.0|
|2019-10-21 08:50:01| [email protected]| 0.0| 0.0|
+-------------------+--------------------+--------+-----+
In the pandas data frame it also has a given index but in spark not. In pandas:-
## pandas
usr_log = pd.read_csv("data.csv")
unique_users = usr_log.user_name.unique()
usr_log.sort_values(by='DateTime', inplace=True)
users_new_data = dict()
users_new_data[user] = {'start_time': None}
for user in unique_users:
count_idle = 0
## first part of the question
for index in usr_log.index:
if user == usr_log['user_name'][index]:
if users_new_data[user]['start_time'] is None:
users_new_data[user]['start_time'] = usr_log['DateTime'][index]
## Second part of the question
if usr_log['keyboard'][index] == 0 and usr_log['mouse'][index] == 0:
count_idle += 1
else:
count_idle = 0
if count_idle >= 5:
if count_idle == 5:
users_new_data[usr_log['user_name'][index]]['idle_time'] \
= users_new_data[usr_log['user_name'][index]].get('idle_time') \
+ datetime.timedelta(0, 1500)
else:
users_new_data[usr_log['user_name'][index]]['idle_time'] \
= users_new_data[usr_log['user_name'][index]].get('idle_time') \
+ datetime.timedelta(0, 300)
Same thing how can do it in spark?
For each user data generated after 5 mins(Like if the user starts at 8:30:01 the next log generated at 8:35:01). In the second question in I want to find an idle hour for each user. The calculation of idle hours is if he does not move the mouse or use the keyboard the next 30 mins(1500) then I add in user idle hours.
After converting dictionary value into data frame my expected output like:-
+--------------------+-------------------+-------------------+
| user_name| start_time| idle_time|
+--------------------+-------------------+-------------------+
|prathameshsalap@g...|2019-10-21 08:35:01|2019-10-21 05:05:00|
|vaishusawant143@g...|2019-10-21 08:35:01|2019-10-21 02:15:00|
| [email protected]|2019-10-21 08:35:01|2019-10-21 01:30:00|
+--------------------+-------------------+-------------------+
Upvotes: 1
Views: 1392
Reputation: 1525
Here is solution on same,
dataFrame = (spark.read.format("csv").option("sep", ",").option("header", "true").load("data.csv"))
df.show()
+-------------------+--------------------+--------+-----+
| DateTime| user_name|keyboard|mouse|
+-------------------+--------------------+--------+-----+
|2019-10-21 08:35:01|prathameshsalap@g...| 333.0|658.0|
|2019-10-21 08:35:01|vaishusawant143@g...| 447.5| 0.0|
|2019-10-21 08:35:01| [email protected]| 0.5| 1.0|
|2019-10-21 08:40:01|prathameshsalap@g...| 227.0|366.0|
|2019-10-21 08:40:02|vaishusawant143@g...| 472.0| 0.0|
|2019-10-21 08:45:01| [email protected]| 0.0| 0.0|
|2019-10-21 08:45:01|prathameshsalap@g...| 35.0|458.0|
|2019-10-21 08:45:01|vaishusawant143@g...| 1659.5| 0.0|
|2019-10-21 08:50:01| [email protected]| 0.0| 0.0|
+-------------------+--------------------+--------+-----+
df1 = df.groupBy("user_name").agg(min("DateTime"))
df1.show()
+--------------------+-------------------+
| user_name| min(DateTime)|
+--------------------+-------------------+
|prathameshsalap@g...|2019-10-21 08:35:01|
|vaishusawant143@g...|2019-10-21 08:35:01|
| [email protected]|2019-10-21 08:35:01|
+--------------------+-------------------+
Other Part -
df1 = df.withColumn("count",when(((col("keyboard")==0.0) & (col("mouse")==0.0)), count_idle+1).otherwise(0))
df2 = df1.withColumn("Idle_Sec",when((col("count")==0), 300).otherwise(1500))
df2.show()
+-------------------+--------------------+--------+-----+-----+--------+
| DateTime| user_name|keyboard|mouse|count|Idle_Sec|
+-------------------+--------------------+--------+-----+-----+--------+
|2019-10-21 08:35:01|prathameshsalap@g...| 333.0|658.0| 0| 300|
|2019-10-21 08:35:01|vaishusawant143@g...| 447.5| 0.0| 0| 300|
|2019-10-21 08:35:01| [email protected]| 0.5| 1.0| 0| 300|
|2019-10-21 08:40:01| [email protected]| 0.0| 0.0| 1| 1500|
|2019-10-21 08:40:01|prathameshsalap@g...| 227.0|366.0| 0| 300|
|2019-10-21 08:40:02|vaishusawant143@g...| 472.0| 0.0| 0| 300|
|2019-10-21 08:45:01| [email protected]| 0.0| 0.0| 1| 1500|
|2019-10-21 08:45:01|prathameshsalap@g...| 35.0|458.0| 0| 300|
|2019-10-21 08:45:01|vaishusawant143@g...| 1659.5| 0.0| 0| 300|
|2019-10-21 08:50:01| [email protected]| 0.0| 0.0| 1| 1500|
+-------------------+--------------------+--------+-----+-----+--------+
df3 = df2.groupBy("user_name").agg(min("DateTime").alias("start_time"),sum("Idle_Sec").alias("Sum_Idle_Sec"))
+--------------------+-------------------+------------+
| user_name| start_time|Sum_Idle_Sec|
+--------------------+-------------------+------------+
|prathameshsalap@g...|2019-10-21 08:35:01| 900|
|vaishusawant143@g...|2019-10-21 08:35:01| 900|
| [email protected]|2019-10-21 08:35:01| 4800|
+--------------------+-------------------+------------+
df3.withColumn("Idle_time",(F.unix_timestamp("start_time") + col("Sum_Idle_Sec")).cast('timestamp')).show()
+--------------------+-------------------+---------+----------------------+
| user_name| start_time|Sum_Idle_Sec| Idle_time|
+--------------------+-------------------+---------+----------------------+
|prathameshsalap@g...|2019-10-21 08:35:01| 900|2019-10-21 08:50:01|
|vaishusawant143@g...|2019-10-21 08:35:01| 900|2019-10-21 08:50:01|
| [email protected]|2019-10-21 08:35:01| 4800|2019-10-21 09:55:01|
+--------------------+-------------------+---------+----------------------+
Upvotes: 1
Reputation: 1118
If you want to find for each user the first timestamp that they have you can simplify it first in pandas, do this:
usr_log[['user_name','DateTime']].groupby(['user_name']).min()
And for spark will be very similar
urs_log = sparkSession.read.csv(...)
urs_log.groupBy("user_name").agg(min("DateTime"))
you only will have to rename DateTime
column to the one you want, and try to not use for loops in pandas.
In spark, you have a distributed collection and it's impossible to do a for loop, you have to apply transformations to columns, never apply logic to a single row of data.
Upvotes: 3
Reputation: 31
You should do as the following example:
"do_something" can be any function that you define.
Upvotes: 0