Re-impl ctx-mng caching using `trio.Nursery.start()`

Maybe i've finally learned my lesson that exit stacks and per task ctx
manager caching is just not trionic.. Use the approach we've taken for
the daemon service manager as well: create a process global nursery for
each unique ctx manager we wish to cache and simply tear it down when
the number of consumers goes to zero.

This seems to resolve all prior issues and gets us error-free cached
feeds!
pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-30 17:39:53 -04:00
parent 2f1455d423
commit ff322ae7be
1 changed files with 62 additions and 43 deletions

View File

@ -23,19 +23,20 @@ Cacheing apis and toolz.
from collections import OrderedDict from collections import OrderedDict
from typing import ( from typing import (
Optional, Any,
Hashable, Hashable,
Optional,
TypeVar, TypeVar,
AsyncContextManager, AsyncContextManager,
AsyncIterable,
) )
from contextlib import ( from contextlib import (
asynccontextmanager, asynccontextmanager,
AsyncExitStack, AsyncExitStack,
contextmanager,
) )
import trio import trio
from trio_typing import TaskStatus
from tractor._portal import maybe_open_nursery
from .brokers import get_brokermod from .brokers import get_brokermod
from .log import get_logger from .log import get_logger
@ -132,14 +133,35 @@ async def open_cached_client(
class cache: class cache:
'''Globally (processs wide) cached, task access to a '''Globally (processs wide) cached, task access to a
kept-alive-while-in-use data feed. kept-alive-while-in-use async resource.
''' '''
lock = trio.Lock() lock = trio.Lock()
users: int = 0 users: int = 0
ctxs: dict[tuple[str, str], AsyncIterable] = {} values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {}
nurseries: dict[int, Optional[trio.Nursery]] = {}
no_more_users: Optional[trio.Event] = None no_more_users: Optional[trio.Event] = None
@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
cls.no_more_users = trio.Event()
cls.values[key] = value
task_status.started(value)
try:
await cls.no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.nurseries.pop(id(mng))
@asynccontextmanager @asynccontextmanager
async def maybe_open_ctx( async def maybe_open_ctx(
@ -153,51 +175,48 @@ async def maybe_open_ctx(
a cache hit. a cache hit.
''' '''
@contextmanager
def get_and_use() -> AsyncIterable[T]:
# key error must bubble here
value = cache.ctxs[key]
log.info(f'Reusing cached feed for {key}')
try:
cache.users += 1
yield value
finally:
cache.users -= 1
if cache.users == 0:
# signal to original allocator task feed use is complete
cache.no_more_users.set()
await cache.lock.acquire()
ctx_key = id(mngr)
# TODO: does this need to be a tractor "root nursery"?
async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n:
cache.nurseries[ctx_key] = n
value = None
try: try:
with get_and_use() as value:
yield True, value
except KeyError:
# lock feed acquisition around task racing / ``trio``'s # lock feed acquisition around task racing / ``trio``'s
# scheduler protocol # scheduler protocol
await cache.lock.acquire() value = cache.values[key]
try: log.info(f'Reusing cached feed for {key}')
with get_and_use() as value: cache.users += 1
cache.lock.release() cache.lock.release()
yield True, value yield True, value
return
except KeyError: except KeyError:
log.info(f'Allocating new feed for {key}')
# **critical section** that should prevent other tasks from # **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler # checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed. # may switch and by accident we create more then one feed.
cache.no_more_users = trio.Event()
log.info(f'Allocating new feed for {key}') value = await n.start(cache.run_ctx, mngr, key)
# TODO: eventually support N-brokers cache.users += 1
async with mngr as value:
cache.ctxs[key] = value
cache.lock.release() cache.lock.release()
try:
yield True, value
finally:
# don't tear down the feed until there are zero
# users of it left.
if cache.users > 0:
await cache.no_more_users.wait()
log.warning('De-allocating feed for {key}') yield False, value
cache.ctxs.pop(key)
finally:
cache.users -= 1
if cache.lock.locked():
cache.lock.release()
if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating feed for {key}')
# terminate mngr nursery
cache.no_more_users.set()