Go back to a `global` single-ton nursery per actor

Turns out the lifetime mgmt of separate nurseries per delegate manager
is tricky; a new nursery can't be naively allocated on cache-misses since
it may get closed by some early terminating task instead of by the "last
using" consumer task. In theory if we allocate using the same logic as
that used for the last-task-triggers-exit then this should work?

For now just go back to a single global nursery per `_Cache` which still
avoids use of the internal actor service nursery.
callable_key_maybe_open_context
Tyler Goodlet 2022-10-07 12:45:15 -04:00
parent 7a719ac2a7
commit 44b59f3338
1 changed files with 27 additions and 16 deletions

View File

@ -35,6 +35,7 @@ from typing import (
import trio
from trio_typing import TaskStatus
from .._state import current_actor
from ..log import get_logger
@ -140,6 +141,7 @@ class _Cache:
a kept-alive-while-in-use async resource.
'''
service_n: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
@ -147,7 +149,7 @@ class _Cache:
Hashable,
tuple[trio.Nursery, trio.Event]
] = {}
nurseries: dict[int, trio.Nursery] = {}
# nurseries: dict[int, trio.Nursery] = {}
no_more_users: Optional[trio.Event] = None
@classmethod
@ -205,6 +207,18 @@ async def maybe_open_context(
lock = _Cache.locks.setdefault(fid, trio.Lock())
await lock.acquire()
# XXX: one singleton nursery per actor and we want to
# have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our
# pre-allocated runtime instance..)
service_n: trio.Nursery = current_actor()._service_n
# TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery?
# service_n: trio.Nursery
# async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_n = service_n
try:
# **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler
@ -214,22 +228,19 @@ async def maybe_open_context(
except KeyError:
log.info(f'Allocating new {acm_func} for {ctx_key}')
mngr = acm_func(**kwargs)
service_n: Optional[trio.Nursery] = _Cache.nurseries.get(fid)
async with maybe_open_nursery(service_n) as service_n:
_Cache.nurseries[fid] = service_n
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event())
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event())
# sync up to the mngr's yielded value
yielded = await service_n.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
lock.release()
yield False, yielded
# sync up to the mngr's yielded value
yielded = await service_n.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
lock.release()
yield False, yielded
else:
log.info(f'Reusing _Cached resource for {ctx_key}')