Allocate an event per context

pause_feeds_on_sym_switch
Tyler Goodlet 2021-09-01 09:45:14 -04:00
parent 26cb7aa660
commit 4527d4a677
1 changed files with 17 additions and 15 deletions

View File

@ -31,13 +31,11 @@ from typing import (
) )
from contextlib import ( from contextlib import (
asynccontextmanager, asynccontextmanager,
AsyncExitStack,
) )
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor._portal import maybe_open_nursery
from .brokers import get_brokermod from .brokers import get_brokermod
from .log import get_logger from .log import get_logger
@ -86,8 +84,11 @@ class cache:
''' '''
lock = trio.Lock() lock = trio.Lock()
users: int = 0 users: int = 0
values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {} values: dict[Any, Any] = {}
nurseries: dict[int, Optional[trio.Nursery]] = {} resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None no_more_users: Optional[trio.Event] = None
@classmethod @classmethod
@ -100,15 +101,15 @@ class cache:
) -> None: ) -> None:
async with mng as value: async with mng as value:
cls.no_more_users = trio.Event() _, no_more_users = cls.resources[id(mng)]
cls.values[key] = value cls.values[key] = value
task_status.started(value) task_status.started(value)
try: try:
await cls.no_more_users.wait() await no_more_users.wait()
finally: finally:
value = cls.values.pop(key) value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error) # discard nursery ref so it won't be re-used (an error)
cls.nurseries.pop(id(mng)) cls.resources.pop(id(mng))
@asynccontextmanager @asynccontextmanager
@ -128,13 +129,12 @@ async def maybe_open_ctx(
ctx_key = id(mngr) ctx_key = id(mngr)
value = None value = None
try: try:
# lock feed acquisition around task racing / ``trio``'s # lock feed acquisition around task racing / ``trio``'s
# scheduler protocol # scheduler protocol
value = cache.values[key] value = cache.values[key]
log.info(f'Reusing cached feed for {key}') log.info(f'Reusing cached resource for {key}')
cache.users += 1 cache.users += 1
cache.lock.release() cache.lock.release()
yield True, value yield True, value
@ -146,14 +146,15 @@ async def maybe_open_ctx(
# checking the cache until complete otherwise the scheduler # checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed. # may switch and by accident we create more then one feed.
# TODO: vaoid pulling from ``tractor`` internals and # TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in # instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n service_n = tractor.current_actor()._service_n
# TODO: does this need to be a tractor "root nursery"? # TODO: does this need to be a tractor "root nursery"?
ln = cache.nurseries.get(ctx_key) ln = cache.resources.get(ctx_key)
assert not ln assert not ln
ln = cache.nurseries[ctx_key] = service_n
ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
value = await ln.start(cache.run_ctx, mngr, key) value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1 cache.users += 1
@ -170,10 +171,11 @@ async def maybe_open_ctx(
if value is not None: if value is not None:
# if no more consumers, teardown the client # if no more consumers, teardown the client
if cache.users <= 0: if cache.users <= 0:
log.warning(f'De-allocating feed for {key}') log.warning(f'De-allocating resource for {key}')
# terminate mngr nursery # terminate mngr nursery
cache.no_more_users.set() _, no_more_users = cache.resources[ctx_key]
no_more_users.set()
@asynccontextmanager @asynccontextmanager