Reputation: 129
I've been wondering recently if there's a way to detect whether a context manager is nested.
I've created Timer and TimerGroup classes:
class Timer:
def __init__(self, name="Timer"):
self.name = name
self.start_time = clock()
@staticmethod
def seconds_to_str(t):
return str(timedelta(seconds=t))
def end(self):
return clock() - self.start_time
def print(self, t):
print(("{0:<" + str(line_width - 18) + "} >> {1}").format(self.name, self.seconds_to_str(t)))
def __enter__(self):
return self
def __exit__(self, exc_type, value, traceback):
self.print(self.end())
class TimerGroup(Timer):
def __enter__(self):
print(('= ' + self.name + ' ').ljust(line_width, '='))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
total_time = self.seconds_to_str(self.end())
print(" Total: {0}".format(total_time).rjust(line_width, '='))
print()
This code prints timings in a readable format:
with TimerGroup("Collecting child documents for %s context" % context_name):
with Timer("Collecting context features"):
# some code...
with Timer("Collecting child documents"):
# some code...
= Collecting child documents for Global context ============
Collecting context features >> 0:00:00.001063
Collecting child documents >> 0:00:10.611130
====================================== Total: 0:00:10.612292
However, when I nest TimerGroups, it messed things up:
with TimerGroup("Choosing the best classifier for %s context" % context_name):
with Timer("Splitting datasets"):
# some code...
for cname, cparams in classifiers.items():
with TimerGroup("%s classifier" % cname):
with Timer("Training"):
# some code...
with Timer("Calculating accuracy on testing set"):
# some code
= Choosing the best classifier for Global context ==========
Splitting datasets >> 0:00:00.002054
= Naive Bayes classifier ===================================
Training >> 0:00:34.184903
Calculating accuracy on testing set >> 0:05:08.481904
====================================== Total: 0:05:42.666949
====================================== Total: 0:05:42.669078
All I need is to do is to indent the nested Timers and TimerGroups somehow. Should I pass any parameters to their constructors? Or can I detect that from inside the class?
Upvotes: 4
Views: 1966
Reputation: 5609
If all you need to do is adjust an indentation level based on how many nested context managers you're executing in, then have a class attribute called indent_level
and adjust it each time you enter and exit a context manager. Something like the following:
class Context:
indent_level = 0
def __init__(self, name):
self.name = name
def __enter__(self):
print(' '*4*self.indent_level + 'Entering ' + self.name)
self.adjust_indent_level(1)
return self
def __exit__(self, *a, **k):
self.adjust_indent_level(-1)
print(' '*4*self.indent_level + 'Exiting ' + self.name)
@classmethod
def adjust_indent_level(cls, val):
cls.indent_level += val
And use it as:
>>> with Context('Outer') as outer_context:
with Context('Inner') as inner_context:
print(' '*inner_context.indent_level*4 + 'In the inner context')
Entering Outer
Entering Inner
In the inner context
Exiting Inner
Exiting Outer
Upvotes: 3
Reputation: 1121714
There are no special facilities to detect nested context managers, no. You'd have to handle this on your own. You could do this within your own context manager:
import threading
class TimerGroup(Timer):
_active_group = threading.local()
def __enter__(self):
if getattr(TimerGroup._active_group, 'current', False):
raise RuntimeError("Can't nest TimerGroup context managers")
TimerGroup._active_group.current = self
print(('= ' + self.name + ' ').ljust(line_width, '='))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
TimerGroup._active_group.current = None
total_time = self.seconds_to_str(self.end())
print(" Total: {0}".format(total_time).rjust(line_width, '='))
print()
You can then use the TimerGroup._active_group
attribute elsewhere to grab the currently active group. I used a thread-local object to ensure that this can be used across multiple threads of execution.
Alternatively, you could make that a stack counter and just increment and decrement in nested __enter__
calls, or a stack list and push self
onto that stack, popping it again when you __exit__
:
import threading
class TimerGroup(Timer):
_active_group = threading.local()
def __enter__(self):
if not hasattr(TimerGroup._active_group, 'current'):
TimerGroup._active_group.current = []
stack = TimerGroup._active_group.current
if stack:
# nested context manager.
# do something with stack[-1] or stack[0]
TimerGroup._active_group.current.append(self)
print(('= ' + self.name + ' ').ljust(line_width, '='))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
last = TimerGroup._active_group.current.pop()
assert last == self, "Context managers being exited out of order"
total_time = self.seconds_to_str(self.end())
print(" Total: {0}".format(total_time).rjust(line_width, '='))
print()
Upvotes: 7
Reputation: 40884
Explicit is better than implicit
A cleaner design would explicitly allow to specify a group:
with TimerGroup('Doing big task') as big_task_tg:
with Timer('Foo', big_task_tg):
foo_result = foo()
with Timer('Bar', big_task_tg):
bar(baz(foo_result))
On the other hand, you can always use traceback.extract_stack
and look for invocations of a particular function upstream. It is very useful for logging and error reporting, and can be moderately useful to ensure that particular functions are only invoked in a certain context. But it tends to create dependencies that are very hard to track.
I would avoid it for grouping timers, though you can try. If you badly need automatic grouping, @Martijn-Pieters's approach is far superior.
Upvotes: 1