Reputation: 631
I've been slowly learning my way through cp-sat by reading the nurse scheduling examples and made some modifications to the first nurse example to test what I've learned. First I took personal "off" days into account. Then I took into account days that each nurse might "prefer" to work on (weighted so that the least demanding nurses get priority).
Now I'm trying to wrap my head around applying consecutive work day preferences such that we can specify a soft minimum and maximum number of consecutive work days for each nurse, and a hard-cap maximum consecutive work days across the board. From what I understand, I need to utilize a "negated bounded span" function (as per line 29 of shift_scheduling_sat.py) but I could use some guidance from you experts out there on how this works, and what exactly should I be passing to this function?
Many thanks in advance!
Anthony
from ortools.sat.python import cp_model
max_cwork = 4 # hard-cap max consecutive work days
num_nurses = 4
num_shifts = 2
num_days = 9
all_nurses = range(num_nurses)
all_shifts = range(num_shifts)
all_days = range(num_days)
# "off" : hard days off (ie. *cannot* work those days, period)
# "prefer": soft working day preference (ie. prefer to work those days)
# "cwork": soft consecutive working day min,max (eg. 1,2 means at least 1 consecutive days, at most 2 consecutive days)
nurse_preferences = {
0: { "off": { 8 }, "prefer": { 0, 1, 2 }, "cwork": { 3, 3 } },
1: { "off": { 5 }, "prefer": { 0, 1, 2, 6, 7, 8 } },
2: { "off": { 2 }, "prefer": { 3, 4, 5, 6, 7, 8 } },
3: { "off": { 0 }, "prefer": { 3, 4, 5 }, "cwork": { 1, 2 } },
}
nurse_prefer_requests = { kvp[0]: len(kvp[1]["prefer"]) if "prefer" in kvp[1] else 0 for kvp in nurse_preferences.items() }
max_nurse_prefer_requests = sum(nurse_prefer_requests.values())
nurse_prefer_factor = { kvp[0]: 1 if kvp[1] == 0 else max_nurse_prefer_requests / kvp[1] for kvp in nurse_prefer_requests.items() }
print(nurse_prefer_requests)
print(max_nurse_prefer_requests)
print(nurse_prefer_factor)
def is_nurse_available(n, d):
return (n not in nurse_preferences or "off" not in nurse_preferences[n] or d not in nurse_preferences[n]["off"])
def get_nurse_day_preference(n, d):
if (n not in nurse_preferences or "prefer" not in nurse_preferences[n] or d not in nurse_preferences[n]["prefer"]):
return 1
return nurse_prefer_factor[n];
print([get_nurse_day_preference(1, day) for day in range(0, 20)])
print([is_nurse_available(1, day) for day in range(0, 20)])
model = cp_model.CpModel()
shifts = {}
for n in all_nurses:
for d in all_days:
for s in all_shifts:
#if (n not in nurse_preferences or "off" not in nurse_preferences[n] or d not in nurse_preferences[n]["off"]):
if (is_nurse_available(n,d)):
shifts[(n, d, s)] = model.new_bool_var(f"shift_n{n}_d{d}_s{s}")
for d in all_days:
for s in all_shifts:
model.add_exactly_one(shifts[(n, d, s)] for n in all_nurses if (n, d, s) in shifts)
for n in all_nurses:
for d in all_days:
model.add_at_most_one(shifts[(n, d, s)] for s in all_shifts if (n, d, s) in shifts)
# Try to distribute the shifts evenly, so that each nurse works
# min_shifts_per_nurse shifts. If this is not possible, because the total
# number of shifts is not divisible by the number of nurses, some nurses will
# be assigned one more shift.
min_shifts_per_nurse = (num_shifts * num_days) // num_nurses
if num_shifts * num_days % num_nurses == 0:
max_shifts_per_nurse = min_shifts_per_nurse
else:
max_shifts_per_nurse = min_shifts_per_nurse + 1
for n in all_nurses:
shifts_worked = []
for d in all_days:
for s in all_shifts:
if (n, d, s) in shifts:
shifts_worked.append(shifts[(n, d, s)])
model.add(min_shifts_per_nurse <= sum(shifts_worked))
model.add(sum(shifts_worked) <= max_shifts_per_nurse)
model.maximize(
sum(
get_nurse_day_preference(n,d) * shifts[(n, d, s)]
for n in all_nurses
for d in all_days
for s in all_shifts
if (n,d,s) in shifts
)
)
solver = cp_model.CpSolver()
# solver.parameters.linearization_level = 0
# Enumerate all solutions.
solver.parameters.enumerate_all_solutions = True
status = solver.solve(model) #, solution_printer)
if status == cp_model.OPTIMAL:
print("Solution:")
for d in all_days:
print("Day", d)
for n in all_nurses:
for s in all_shifts:
if (n,d,s) in shifts and solver.value(shifts[(n, d, s)]) == 1:
if get_nurse_day_preference(n,d) > 1:
print("Nurse", n, "works shift", s, "(requested).")
else:
print("Nurse", n, "works shift", s, "(not requested).")
print()
print(
f"Number of shift requests met = {solver.objective_value}",
f"(out of {num_nurses * min_shifts_per_nurse})",
)
else:
print("No optimal solution found !")
Upvotes: -1
Views: 50
Reputation: 11064
please have a look at the add_soft_sequence_constraint.
It does exactly what you need. It translates the requirements (soft and hard bounds) into constraints.
def add_soft_sequence_constraint(
model: cp_model.CpModel,
works: list[cp_model.BoolVarT],
hard_min: int,
soft_min: int,
min_cost: int,
soft_max: int,
hard_max: int,
max_cost: int,
prefix: str,
) -> tuple[list[cp_model.BoolVarT], list[int]]:
"""Sequence constraint on true variables with soft and hard bounds.
This constraint look at every maximal contiguous sequence of variables
assigned to true. If forbids sequence of length < hard_min or > hard_max.
Then it creates penalty terms if the length is < soft_min or > soft_max.
Args:
model: the sequence constraint is built on this model.
works: a list of Boolean variables.
hard_min: any sequence of true variables must have a length of at least
hard_min.
soft_min: any sequence should have a length of at least soft_min, or a
linear penalty on the delta will be added to the objective.
min_cost: the coefficient of the linear penalty if the length is less than
soft_min.
soft_max: any sequence should have a length of at most soft_max, or a linear
penalty on the delta will be added to the objective.
hard_max: any sequence of true variables must have a length of at most
hard_max.
max_cost: the coefficient of the linear penalty if the length is more than
soft_max.
prefix: a base name for penalty literals.
Returns:
a tuple (variables_list, coefficient_list) containing the different
penalties created by the sequence constraint.
"""
cost_literals = []
cost_coefficients = []
# Forbid sequences that are too short.
for length in range(1, hard_min):
for start in range(len(works) - length + 1):
model.add_bool_or(negated_bounded_span(works, start, length))
# Penalize sequences that are below the soft limit.
if min_cost > 0:
for length in range(hard_min, soft_min):
for start in range(len(works) - length + 1):
span = negated_bounded_span(works, start, length)
name = f": under_span(start={start}, length={length})"
lit = model.new_bool_var(prefix + name)
span.append(lit)
model.add_bool_or(span)
cost_literals.append(lit)
# We filter exactly the sequence with a short length.
# The penalty is proportional to the delta with soft_min.
cost_coefficients.append(min_cost * (soft_min - length))
# Penalize sequences that are above the soft limit.
if max_cost > 0:
for length in range(soft_max + 1, hard_max + 1):
for start in range(len(works) - length + 1):
span = negated_bounded_span(works, start, length)
name = f": over_span(start={start}, length={length})"
lit = model.new_bool_var(prefix + name)
span.append(lit)
model.add_bool_or(span)
cost_literals.append(lit)
# Cost paid is max_cost * excess length.
cost_coefficients.append(max_cost * (length - soft_max))
# Just forbid any sequence of true variables with length hard_max + 1
for start in range(len(works) - hard_max):
model.add_bool_or([~works[i] for i in range(start, start + hard_max + 1)])
return cost_literals, cost_coefficients
Upvotes: 1