Reputation: 43
This is an example of my spark df:
+------------+-------------------+-------------------+
| OT| Fecha_Fst| Fecha_Lst|
+------------+-------------------+-------------------+
|712268242652|2021-01-30 14:43:00|2021-02-03 13:03:00|
|712268243525|2021-01-30 14:27:00|2021-02-03 14:50:00|
|712268243831|2021-02-02 21:23:00|2021-02-08 17:39:00|
|712268244225|2021-02-01 07:26:00|2021-02-09 11:22:00|
|712268244951|2021-02-01 07:25:00|2021-02-05 16:07:00|
|712268245076|2021-02-01 07:26:00|2021-02-06 13:22:00|
|712268245651|2021-01-28 16:49:00|2021-02-04 13:31:00|
|712268246782|2021-02-01 07:26:00|2021-02-05 12:24:00|
|712268247644|2021-02-02 18:20:00|2021-02-05 16:12:00|
|712268247681|2021-02-09 05:03:00|2021-02-15 14:16:00|
|712268247751|2021-02-02 15:42:00|2021-02-05 13:27:00|
|712268247854|2021-01-30 14:34:00|2021-01-30 14:34:00|
|712268248775|2021-02-02 15:42:00|2021-02-05 12:42:00|
|712268249173|2021-02-02 15:42:00|2021-02-05 15:51:00|
|712268249873|2021-02-02 09:05:00|2021-02-05 19:36:00|
|712268249884|2021-02-02 08:53:00|2021-02-05 19:36:00|
|712268249895|2021-02-02 08:14:00|2021-02-05 19:36:00|
|712268249906|2021-02-02 09:06:00|2021-02-05 19:36:00|
|712268249910|2021-02-02 08:53:00|2021-02-05 19:36:00|
|712268250186|2021-02-02 15:42:00|2021-02-05 18:59:00|
+------------+-------------------+-------------------+
I found this code in the internet:
a = "2021-02-10T23:59:00.000+0000"
b = "2021-03-20T23:59:00.000+0000"
week = {}
def weekday_count(start, end):
start_date = datetime.datetime.strptime(start, "%Y-%m-%dT%H:%M:%S.%f%z")
end_date = datetime.datetime.strptime(end, "%Y-%m-%dT%H:%M:%S.%f%z")
for i in range((end_date - start_date).days):
day = calendar.day_name[(start_date + datetime.timedelta(days=i + 1)).weekday()]
week[day] = week[day] + 1 if day in week else 1
return week["Sunday"] + week["Saturday"]
print(weekday_count(a, b))
11
It works fine and I get what I want but I can´t use it in my spark df I tried many forms but always get errors like:
df = df.withColumn("Number", weekday_count(f.col("Fecha_Fst"),f.col("Fecha_Lst")))
TypeError: strptime() argument 1 must be str, not Column
If I use lambda:
def weekday_count(start, end):
start_date = lambda start :datetime.datetime.strptime(start, "%Y-%m-%dT%H:%M:%S.%f%z")
end_date = lambda end :datetime.datetime.strptime(end, "%Y-%m-%dT%H:%M:%S.%f%z")
for i in range((end_date - start_date).days):
day = calendar.day_name[(start_date + datetime.timedelta(days=i + 1)).weekday()]
week[day] = week[day] + 1 if day in week else 1
return week["Sunday"] + week["Saturday"]
df = df.withColumn("Number", weekday_count(f.col("Fecha_Fst"),f.col("Fecha_Lst")))
TypeError: unsupported operand type(s) for -: 'function' and 'function'
And so on... I tried many forms today and don't get the desired result:
+------------+-------------------+-------------------+-----------+
| OT| Fecha_Fst| Fecha_Lst| Days|
+------------+-------------------+-------------------+-----------+
|712268242652|2021-01-30 14:43:00|2021-02-03 13:03:00| 2|
|712268243831|2021-02-02 21:23:00|2021-02-08 17:39:00| 2|
|712268244225|2021-02-01 07:26:00|2021-02-09 11:22:00| 2|
|712268244951|2021-02-01 07:25:00|2021-02-05 16:07:00| 0|
|712268247681|2021-02-09 05:03:00|2021-02-15 14:16:00| 2|
|712268247854|2021-01-30 14:34:00|2021-01-30 14:34:00| 1|
|712268248775|2021-02-02 15:42:00|2021-02-05 12:42:00| 0|
|712268249173|2021-02-02 15:42:00|2021-02-05 15:51:00| 0|
|712268249873|2021-02-02 09:05:00|2021-02-05 19:36:00| 0|
+------------+-------------------+-------------------+-----------+
I'm confused about how to work with new columns in the pyspark library, I used pandas but I'm currently working on azure databricks environment and pandas is very slow.
Upvotes: 2
Views: 1467
Reputation: 32650
You can't use a python function directly. You need to create a pyspark UDF.
However, you can actually get the desired result using spark built-in functions. Create a sequence of timestamps starting from Fecha_Fst
to Fecha_Lst
and then filter those which correspond to Sat
or Sun
days. The size of the resulting array is the number of weekends days.
from pyspark.sql import functions as F
df1 = df.withColumn(
"Days",
F.expr(
"""
size(
filter(
sequence(Fecha_Fst, Fecha_Lst, interval 1 day),
x -> date_format(x, 'E') in ('Sat', 'Sun')
)
)
"""
)
)
df1.show(truncate=False)
#+------------+-------------------+-------------------+----+
#|OT |Fecha_Fst |Fecha_Lst |Days|
#+------------+-------------------+-------------------+----+
#|712268242652|2021-01-30 14:43:00|2021-02-03 13:03:00|2 |
#|712268243831|2021-02-02 21:23:00|2021-02-08 17:39:00|2 |
#|712268244225|2021-02-01 07:26:00|2021-02-09 11:22:00|2 |
#|712268244951|2021-02-01 07:25:00|2021-02-05 16:07:00|0 |
#|712268247681|2021-02-09 05:03:00|2021-02-15 14:16:00|2 |
#|712268247854|2021-01-30 14:34:00|2021-01-30 14:34:00|1 |
#|712268248775|2021-02-02 15:42:00|2021-02-05 12:42:00|0 |
#|712268249173|2021-02-02 15:42:00|2021-02-05 15:51:00|0 |
#|712268249873|2021-02-02 09:05:00|2021-02-05 19:36:00|0 |
#+------------+-------------------+-------------------+----+
Upvotes: 2