Reputation: 6121
I'm trying to create function that would add following functionality to existing coroutine.
I have a problem with testing that last condition.
def cached(cache, locks, f):
@wraps(f)
async def wrapper(*args, ttl=float('inf')):
value, updated_at = cache.get(args, (None, None))
if value and updated_at >= time() - ttl:
return value
else:
loading_sync = locks.setdefault(args, Sync())
if loading_sync.flag:
await loading_sync.condition.wait()
return cache[args]
else:
with await loading_sync.condition:
loading_sync.flag = True
result = await f(*args)
cache[args] = result, time()
loading_sync.flag = False
loading_sync.condition.notify_all()
return result
return wrapper
Upvotes: 1
Views: 580
Reputation: 522005
To unit test such a scenario, you use futures, which you can resolve at will. Using a very simplified @cached
decorator and function here:
@cached
async def test_mock(future):
await asyncio.wait_for(future, None)
func1_future = asyncio.Future()
func1_coro = test_mock(func1_future)
func2_coro = test_mock(...)
func1_future.set_result(True)
await func1_coro
await func2_coro
Original answer, based on misunderstanding:
The logic is pretty simple: you have your cache somewhere, let's use a simple dictionary. When you first encounter particular arguments, you create a Future
at the cache location. Whenever you access the cache, check if your value is a Future
and if so, await
it. Very simple illustration:
cache = dict()
async def memoizer(args):
if args in cache:
cached_value = cache[args]
if isinstance(cached_value, asyncio.Future):
cached_value = await asyncio.wait_for(cached_value, None)
return cached_value
else:
future = asyncio.Future()
cache[args] = future
value = await compute_value(args)
future.set_result(value)
cache[args] = value
return value
Upvotes: 2