Reputation: 239810
I'm looking for a data type to help me model resource availability over fluid time.
I've come at this problem from many directions but always come back to the fundamental problem of not knowing a data type to model something as simple as an integer over time.
I could convert my appointments into time series events (eg appointment arrives means -1 availability, appointment leaves means +1) but I still don't know how to manipulate that data so that I can distil out periods where the availability is greater than zero.
Somebody's left a close-vote citing lack of focus, but my goal here seems pretty singular so I'll try to explain the problem graphically. I'm trying to infer the periods of time where the number of active jobs falls below a given capacity.
Turning a range of known parallel capacity (eg 3 between 9-6) and a list of jobs with variable start/ends, into a list of time ranges of available time.
Upvotes: 8
Views: 1123
Reputation: 36249
You can use a dedicated class representing a lane that can run jobs. These objects can keep track of jobs and correspondingly of their availability:
import bisect
from datetime import time
from functools import total_ordering
import math
@total_ordering
class TimeSlot:
def __init__(self, start, stop, lane):
self.start = start
self.stop = stop
self.lane = lane
def __contains__(self, other):
return self.start <= other.start and self.stop >= other.stop
def __lt__(self, other):
return (self.start, -self.stop.second) < (other.start, -other.stop.second)
def __eq__(self, other):
return (self.start, -self.stop.second) == (other.start, -other.stop.second)
def __str__(self):
return f'({self.lane}) {[self.start, self.stop]}'
__repr__ = __str__
class Lane:
@total_ordering
class TimeHorizon:
def __repr__(self):
return '...'
def __lt__(self, other):
return False
def __eq__(self, other):
return False
@property
def second(self):
return math.inf
@property
def timestamp(self):
return math.inf
time_horizon = TimeHorizon()
del TimeHorizon
def __init__(self, start, nr):
self.nr = nr
self.availability = [TimeSlot(start, self.time_horizon, self)]
def add_job(self, job):
if not isinstance(job, TimeSlot):
job = TimeSlot(*job, self)
# We want to bisect_right but only on the start time,
# so we need to do it manually if they are equal.
index = bisect.bisect_left(self.availability, job)
if index < len(self.availability):
index += (job.start == self.availability[index].start)
index -= 1 # select the corresponding free slot
slot = self.availability[index]
if slot.start > job.start or slot.stop is not self.time_horizon and job.stop > slot.stop:
raise ValueError('Requested time slot not available')
if job == slot:
del self.availability[index]
elif job.start == slot.start:
slot.start = job.stop
elif job.stop == slot.stop:
slot.stop = job.start
else:
slot_end = slot.stop
slot.stop = job.start
self.availability.insert(index+1, TimeSlot(job.stop, slot_end, self))
A Lane
object can be used as follows:
lane = Lane(start=time(9), nr=1)
print(lane.availability)
lane.add_job([time(11), time(14)])
print(lane.availability)
which outputs:
[(1) [datetime.time(9, 0), ...]]
[(1) [datetime.time(9, 0), datetime.time(11, 0)],
(1) [datetime.time(14, 0), ...]]
After adding the job, the availability gets updated as well.
Now we could use multiple of these lane objects together to represent a full schedule. Jobs can be added as required and the availability will be updated automatically:
class Schedule:
def __init__(self, n_lanes, start):
self.lanes = [Lane(start, nr=i) for i in range(n_lanes)]
def add_job(self, job):
for lane in self.lanes:
try:
lane.add_job(job)
except ValueError:
pass
else:
break
from pprint import pprint
# Example jobs from OP.
jobs = [(time(10), time(15)),
(time(9), time(11)),
(time(12, 30), time(16)),
(time(10), time(18))]
schedule = Schedule(3, start=time(9))
for job in jobs:
schedule.add_job(job)
for lane in schedule.lanes:
pprint(lane.availability)
which outputs:
[(0) [datetime.time(9, 0), datetime.time(10, 0)],
(0) [datetime.time(15, 0), ...]]
[(1) [datetime.time(11, 0), datetime.time(12, 30)],
(1) [datetime.time(16, 0), ...]]
[(2) [datetime.time(9, 0), datetime.time(10, 0)],
(2) [datetime.time(18, 0), ...]]
We can create a dedicated tree-like structure which keeps track of time slots of all lanes for selecting the best suited slot when registering a new job. A node in the tree represents a single time slot and its children are all time slots that are contained within that slot. Then, when registering a new job, we can search the tree to find an optimal slot. The tree and the lanes share the same time slots so we only need to adjust slots manually when they are either deleted or new ones are inserted. Here is the relevant code, it is a bit lengthy (just a quick draft):
import itertools as it
class OneStepBuffered:
"""Can back up elements that are consumed by `it.takewhile`.
From: https://stackoverflow.com/a/30615837/3767239
"""
_sentinel = object()
def __init__(self, it):
self._it = iter(it)
self._last = self._sentinel
self._next = self._sentinel
def __iter__(self):
return self
def __next__(self):
sentinel = self._sentinel
if self._next is not sentinel:
next_val, self._next = self._next, sentinel
return next_val
try:
self._last = next(self._it)
return self._last
except StopIteration:
self._last = self._next = sentinel
raise
def step_back(self):
if self._last is self._sentinel:
raise ValueError("Can't back up a step")
self._next, self._last = self._last, self._sentinel
class SlotTree:
def __init__(self, slot, subslots, parent=None):
self.parent = parent
self.slot = slot
self.subslots = []
slots = OneStepBuffered(subslots)
for slot in slots:
subslots = it.takewhile(lambda x: x.stop <= slot.stop, slots)
self.subslots.append(SlotTree(slot, subslots, self))
try:
slots.step_back()
except ValueError:
break
def __str__(self):
sub_repr = ['\n| '.join(str(slot).split('\n'))
for slot in self.subslots]
sub_repr = [f'| {x}' for x in sub_repr]
sub_repr = '\n'.join(sub_repr)
sep = '\n' if sub_repr else ''
return f'{self.slot}{sep}{sub_repr}'
def find_minimal_containing_slot(self, slot):
try:
return min(self.find_containing_slots(slot),
key=lambda x: x.slot.stop.second - x.slot.start.second)
except ValueError:
raise ValueError('Requested time slot not available') from None
def find_containing_slots(self, slot):
for candidate in self.subslots:
if slot in candidate.slot:
yield from candidate.find_containing_slots(slot)
yield candidate
@classmethod
def from_slots(cls, slots):
# Ascending in start time, descending in stop time (secondary).
return cls(cls.__name__, sorted(slots))
class Schedule:
def __init__(self, n_lanes, start):
self.lanes = [Lane(start, i+1) for i in range(n_lanes)]
self.slots = SlotTree.from_slots(
s for lane in self.lanes for s in lane.availability)
def add_job(self, job):
if not isinstance(job, TimeSlot):
job = TimeSlot(*job, lane=None)
# Minimal containing slot is one possible strategy,
# others can be implemented as well.
slot = self.slots.find_minimal_containing_slot(job)
lane = slot.slot.lane
if job == slot.slot:
slot.parent.subslots.remove(slot)
elif job.start > slot.slot.start and job.stop < slot.slot.stop:
slot.parent.subslots.insert(
slot.parent.subslots.index(slot) + 1,
SlotTree(TimeSlot(job.stop, slot.slot.stop, lane), [], slot.parent))
lane.add_job(job)
Now we can use the Schedule
class to automatically assign jobs to lanes and update their availability:
if __name__ == '__main__':
jobs = [(time(10), time(15)), # example from OP
(time(9), time(11)),
(time(12, 30), time(16)),
(time(10), time(18))]
schedule = Schedule(3, start=time(9))
print(schedule.slots, end='\n\n')
for job in jobs:
print(f'Adding {TimeSlot(*job, "new slot")}')
schedule.add_job(job)
print(schedule.slots, end='\n\n')
which outputs:
SlotTree
| (1) [datetime.time(9, 0), ...]
| (2) [datetime.time(9, 0), ...]
| (3) [datetime.time(9, 0), ...]
Adding (new slot) [datetime.time(10, 0), datetime.time(15, 0)]
SlotTree
| (1) [datetime.time(9, 0), datetime.time(10, 0)]
| (1) [datetime.time(15, 0), ...]
| (2) [datetime.time(9, 0), ...]
| (3) [datetime.time(9, 0), ...]
Adding (new slot) [datetime.time(9, 0), datetime.time(11, 0)]
SlotTree
| (1) [datetime.time(9, 0), datetime.time(10, 0)]
| (1) [datetime.time(15, 0), ...]
| (2) [datetime.time(11, 0), ...]
| (3) [datetime.time(9, 0), ...]
Adding (new slot) [datetime.time(12, 30), datetime.time(16, 0)]
SlotTree
| (1) [datetime.time(9, 0), datetime.time(10, 0)]
| (1) [datetime.time(15, 0), ...]
| (2) [datetime.time(11, 0), datetime.time(12, 30)]
| (2) [datetime.time(16, 0), ...]
| (3) [datetime.time(9, 0), ...]
Adding (new slot) [datetime.time(10, 0), datetime.time(18, 0)]
SlotTree
| (1) [datetime.time(9, 0), datetime.time(10, 0)]
| (1) [datetime.time(15, 0), ...]
| (2) [datetime.time(11, 0), datetime.time(12, 30)]
| (2) [datetime.time(16, 0), ...]
| (3) [datetime.time(9, 0), datetime.time(10, 0)]
| (3) [datetime.time(18, 0), ...]
The numbers (i)
indicate the lane number and the []
indicate the available time slots on that lane. A ...
indicates "open end" (time horizon). As we can see the tree doesn't restructure itself when time slots are adjusted; this would be a possible improvement. Ideally for each new job, the corresponding best fitting time slot would be popped from the tree and then, depending on how the job fits in the slot, an adjusted version and possibly new slots are pushed back to the tree (or none at all if the job fits the slot exactly).
The above examples consider just a single day and time
objects, but the code is easy to extend for usage with datetime
objects as well.
Upvotes: 0
Reputation: 36249
You can use (datetime, increment)
tuples to keep track of the changes in availability. A job-start event has increment = 1
and a job-end event has increment = -1
. Then itertools.accumulate
allows for computing the cumulative availability as jobs start and end over time. Here's an example implementation:
from datetime import time
import itertools as it
def compute_availability(jobs, opening_hours, capacity):
jobs = [((x, -1), (y, +1)) for x, y in jobs]
opens, closes = opening_hours
events = [[opens, capacity]] + sorted(t for job in jobs for t in job) + [(closes, 0)]
availability = list(it.accumulate(events,
lambda x, y: [y[0], x[1] + y[1]]))
for x, y in zip(availability, availability[1:]):
# If multiple events happen at the same time, only yield the last one.
if y[0] > x[0]:
yield x
This adds artificial (opens, capacity)
and (closes, 0)
events to initialize the computation. The above example considers a single day but it is easy to extend it to multiple days by creating opens
and closes
datetime
objects that share the day of the first and last job respectively.
Here is the output for the OP's example schedule:
from pprint import pprint
jobs = [(time(10), time(15)),
(time(9), time(11)),
(time(12, 30), time(16)),
(time(10), time(18))]
availability = list(compute_availability(
jobs, opening_hours=(time(9), time(18)), capacity=3
))
pprint(availability)
which prints:
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 0],
[datetime.time(11, 0), 1],
[datetime.time(12, 30), 0],
[datetime.time(15, 0), 1],
[datetime.time(16, 0), 2]]
The first element indicates when the availability changes and the second element denotes the availability that results from that change. For example at 9am one job is submitted causing the availability to drop from 3 to 2 and then at 10am two more jobs are submitted while the first one is still running (hence availability drops to 0).
Now that we have the initial availability computed an important aspect is to update it as new jobs are added. Here it is desirable not to recompute the availability from the full job list since that might be costly if many jobs are being tracked. Because the availability
is already sorted we can use the bisect
module to determine the relevant update range in O(log(N)). Then a number of steps need to be performed. Let's say the job is scheduled as [x, y]
where x
, y
are two datetime objects.
[x, y]
interval is greater than zero (including the event to the left of x
(i.e. the previous event)).[x, y]
by 1.x
is not in the list of events we need to add it, otherwise we need to check whether we can merge the x
event with the one left to it.y
is not in the list of events we need to add it.Here is the relevant code:
import bisect
def add_job(availability, job, *, weight=1):
"""weight: how many lanes the job requires"""
job = list(job)
start = bisect.bisect(availability, job[:1])
# Emulate a `bisect_right` which doens't work directly since
# we're comparing lists of different length.
if start < len(availability):
start += (job[0] == availability[start][0])
stop = bisect.bisect(availability, job[1:])
if any(slot[1] < weight for slot in availability[start-1:stop]):
raise ValueError('The requested time slot is not available')
for slot in availability[start:stop]:
slot[1] -= weight
if job[0] > availability[start-1][0]:
previous_availability = availability[start-1][1]
availability.insert(start, [job[0], previous_availability - weight])
stop += 1
else:
availability[start-1][1] -= weight
if start >= 2 and availability[start-1][1] == availability[start-2][1]:
del availability[start-1]
stop -= 1
if stop == len(availability) or job[1] < availability[stop][0]:
previous_availability = availability[stop-1][1]
availability.insert(stop, [job[1], previous_availability + weight])
We can test it by adding some jobs to the OP's example schedule:
for job in [[time(15), time(17)],
[time(11, 30), time(12)],
[time(13), time(14)]]: # this one should raise since availability is zero
print(f'\nAdding {job = }')
add_job(availability, job)
pprint(availability)
which outputs:
Adding job = [datetime.time(15, 0), datetime.time(17, 0)]
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 0],
[datetime.time(11, 0), 1],
[datetime.time(12, 30), 0],
[datetime.time(16, 0), 1],
[datetime.time(17, 0), 2]]
Adding job = [datetime.time(11, 30), datetime.time(12, 0)]
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 0],
[datetime.time(11, 0), 1],
[datetime.time(11, 30), 0],
[datetime.time(12, 0), 1],
[datetime.time(12, 30), 0],
[datetime.time(16, 0), 1],
[datetime.time(17, 0), 2]]
Adding job = [datetime.time(13, 0), datetime.time(14, 0)]
Traceback (most recent call last):
[...]
ValueError: The requested time slot is not available
We can also use this interface to block all lanes during hours when the service is unavailable (e.g. from 6pm to 9am on the next day). Just submit a job with weight=capacity
for that time span:
add_job(availability,
[datetime(2020, 3, 14, 18), datetime(2020, 3, 15, 9)]
weight=3)
We can also use add_job
to build the full schedule from scratch:
availability = availability = list(compute_availability(
[], opening_hours=(time(9), time(18)), capacity=3
))
print('Initial availability')
pprint(availability)
for job in jobs:
print(f'\nAdding {job = }')
add_job(availability, job)
pprint(availability)
which outputs:
Initial availability
[[datetime.time(9, 0), 3]]
Adding job = (datetime.time(10, 0), datetime.time(15, 0))
[[datetime.time(9, 0), 3],
[datetime.time(10, 0), 2],
[datetime.time(15, 0), 3]]
Adding job = (datetime.time(9, 0), datetime.time(11, 0))
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 1],
[datetime.time(11, 0), 2],
[datetime.time(15, 0), 3]]
Adding job = (datetime.time(12, 30), datetime.time(16, 0))
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 1],
[datetime.time(11, 0), 2],
[datetime.time(12, 30), 1],
[datetime.time(15, 0), 2],
[datetime.time(16, 0), 3]]
Adding job = (datetime.time(10, 0), datetime.time(18, 0))
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 0],
[datetime.time(11, 0), 1],
[datetime.time(12, 30), 0],
[datetime.time(15, 0), 1],
[datetime.time(16, 0), 2],
[datetime.time(18, 0), 3]]
Upvotes: 1
Reputation: 42143
Unless your time resolution is finer than a minute, I would suggest using a map of minutes in the day with a set of jobIds assigned over the time span of each job
For example:
# convert time to minute of the day (assumes24H time, but you can make this your own way)
def toMinute(time):
return sum(p*t for p,t in zip(map(int,time.split(":")),(60,1)))
def toTime(minute):
return f"{minute//60}:{minute%60:02d}"
# booking a job adds it to all minutes covered by its duration
def book(timeMap,jobId,start,duration):
startMin = toMinute(start)
for m in range(startMin,startMin+duration):
timeMap[m].add(jobId)
# unbooking a job removes it from all minutes where it was present
def unbook(timeMap,jobId):
for s in timeMap:
s.discard(jobId)
# return time ranges for minutes meeting a given condition
def minuteSpans(timeMap,condition,start="09:00",end="18:00"):
start,end = toMinute(start),toMinute(end)
timeRange = timeMap[start:end]
match = [condition(s) for s in timeRange]
breaks = [True] + [a!=b for a,b in zip(match,match[1:])]
starts = [i for (i,a),b in zip(enumerate(match),breaks) if b]
return [(start+s,start+e) for s,e in zip(starts,starts[1:]+[len(match)]) if match[s]]
def timeSpans(timeMap,condition,start="09:00",end="18:00"):
return [(toTime(s),toTime(e)) for s,e in minuteSpans(timeMap,condition,start,end)]
# availability is ranges of minutes where the number of jobs is less than your capacity
def available(timeMap,start="09:00",end="18:00",maxJobs=5):
return timeSpans(timeMap,lambda s:len(s)<maxJobs,start,end)
sample usage:
timeMap = [set() for _ in range(1440)]
book(timeMap,"job1","9:45",25)
book(timeMap,"job2","9:30",45)
book(timeMap,"job3","9:00",90)
print(available(timeMap,maxJobs=3))
[('9:00', '9:45'), ('10:10', '18:00')]
print(timeSpans(timeMap,lambda s:"job3" in s))
[('9:00', '10:30')]
With a few adjustments you could even have discontinuous jobs that skip over some periods (e.g. lunch time). You can also block out some periods by placing fake jobs in them.
If you need to manage job queues individually, you can have separate time maps (one per queue) and combine them into one when you need to have a global picture:
print(available(timeMap1,maxJobs=1))
print(available(timeMap2,maxJobs=1))
print(available(timeMap3,maxJobs=1))
globalMap = list(set.union(*qs) for qs in zip(timeMap1,timeMap2,timeMap3))
print(available(globalMap),maxJobs=3)
Put all this into a TimeMap class (instead of individual functions) and you should have a pretty good toolset to work with.
Upvotes: 0
Reputation: 996
To me, this problem would be well-represented by a list of boolean values. For ease of explanation, let's assume the length of every potential job is a multiple of 15 minutes. So, from 9 to 6, we have 135 "time slots" that we want to track availability for. We represent a queue's availability in a time slot with boolean variables: False
if the queue is processing a job, True
if the queue is available.
First, we create a list of time slots for every queue as well as the output. So, every queue and the output has time slots tk, 1 <= k <= 135.
Then, given five job queues, qj, 1 <= j <= 5, we say that tk is "open" at time k if there exists at least one qj where the time slot list at index k is True
.
We can implement this in standalone Python as follows:
slots = [ True ] * 135
queues = [ slots ] * 5
output = [ False ] * 135
def available (k):
for q in queues:
if q[k]:
return True
return False
We can then assume there exists some function dispatch (length)
that assigns a job to an available queue, setting the appropriate slots in queue[q]
to False
.
Finally, to update the output, we simply call:
def update():
for k in range(0, 135):
output[k] = available[k]
Or, for increased efficiency:
def update(i, j):
for k in range(i, j):
output[k] = available[k]
Then, you could simply call update(i, j)
whenever dispatch()
updates time slots i
thru j
for a new job. In this way, dispatching and updating is an O(n) operation, where n
is how many time slots are being changed, regardless of how many time slots there are.
This would allow you to make a simple function that maps human-readable time onto the range of time slot values, which would allow for making time slots larger or smaller as you wish.
You could also easily extend this idea to use a pandas data frame where each column is one queue, allowing you to use Series.any()
on every row at once to quickly update the output column.
Would love to hear suggestions regarding this approach! Perhaps there's a complexity of the problem I've missed, but I think this is a nice solution.
Upvotes: 2
Reputation: 96
My approach would be to build the time series, but include the availability object with a value set to the availability in that period.
availability:
[
{
"start": 09:00,
"end": 12:00,
"value": 4
},
{
"start": 12:00,
"end": 13:00,
"value": 3
}
]
data: [
{
"start": 10:00,
"end": 10:30,
}
]
Build the time series indexing on start/end times, with the value as the value. A start time for availability is +value, end time -value. While for an event, it'd be -1 or +1 as you said.
"09:00" 4
"10:00" -1
"10:30" 1
"12:00" -4
"12:00" 3
"13:00" -3
Then group by index, sum and cumulative sum.
getting:
"09:00" 4
"10:00" 3
"10:30" 4
"12:00" 3
"13:00" 0
Example code in pandas:
import numpy as np
import pandas as pd
data = [
{
"start": "10:00",
"end": "10:30",
}
]
breakpoints = [
{
"start": "00:00",
"end": "09:00",
"value": 0
},
{
"start": "09:00",
"end": "12:00",
"value": 4
},
{
"start": "12:00",
"end": "12:30",
"value": 4
},
{
"start": "12:30",
"end": "13:00",
"value": 3
},
{
"start": "13:00",
"end": "00:00",
"value": 0
}
]
df = pd.DataFrame(data, columns=['start', 'end'])
print(df.head(5))
starts = pd.DataFrame(data, columns=['start'])
starts["value"] = -1
starts = starts.set_index("start")
ends = pd.DataFrame(data, columns=['end'])
ends["value"] = 1
ends = ends.set_index("end")
breakpointsStarts = pd.DataFrame(breakpoints, columns=['start', 'value']).set_index("start")
breakpointsEnds = pd.DataFrame(breakpoints, columns=['end', 'value'])
breakpointsEnds["value"] = breakpointsEnds["value"].transform(lambda x: -x)
breakpointsEnds = breakpointsEnds.set_index("end")
countsDf = pd.concat([starts, ends, breakpointsEnds, breakpointsStarts]).sort_index()
countsDf = countsDf.groupby(countsDf.index).sum().cumsum()
print(countsDf)
# Periods that are available
df = countsDf
df["available"] = df["value"] > 0
# Indexes where the value of available changes
# Alternatively swap out available for the value.
time_changes = df["available"].diff()[df["available"].diff() != 0].index.values
newDf = pd.DataFrame(time_changes, columns= ["start"])
# Setting the end column to the value of the next start
newDf['end'] = newDf.transform(np.roll, shift=-1)
print(newDf)
# Join this back in to get the actual value of available
mergedDf = newDf.merge(df, left_on="start", right_index=True)
print(mergedDf)
returning at the end:
start end value available
0 00:00 09:00 0 False
1 09:00 13:00 4 True
2 13:00 00:00 0 False
Upvotes: 8
Reputation: 1042
I'd approach it the same way you did with the appointments. Model the free time as appointments on its own. For each ending appointment check if theres another on ongoing, if so, skip here. If not, find the next starting appointment (one with a start date greater than this ones enddate.)
After you iterated all off your appointments, you should have an inverted mask of it.
Upvotes: 2