Merge pull request #336 from goodboy/callable_key_maybe_open_context
Callable key input to maybe open contextdun_unset_current_actor
commit
dfdad4d1fa
|
@ -0,0 +1,4 @@
|
||||||
|
Add ``key: Callable[..., Hashable]`` support to ``.trionics.maybe_open_context()``
|
||||||
|
|
||||||
|
Gives users finer grained control over cache hit behaviour using
|
||||||
|
a callable which receives the input ``kwargs: dict``.
|
|
@ -35,6 +35,7 @@ import warnings
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
|
from .trionics import maybe_open_nursery
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
@ -50,25 +51,6 @@ from ._streaming import Context, ReceiveMsgStream
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def maybe_open_nursery(
|
|
||||||
nursery: trio.Nursery = None,
|
|
||||||
shield: bool = False,
|
|
||||||
) -> AsyncGenerator[trio.Nursery, Any]:
|
|
||||||
'''
|
|
||||||
Create a new nursery if None provided.
|
|
||||||
|
|
||||||
Blocks on exit as expected if no input nursery is provided.
|
|
||||||
|
|
||||||
'''
|
|
||||||
if nursery is not None:
|
|
||||||
yield nursery
|
|
||||||
else:
|
|
||||||
async with trio.open_nursery() as nursery:
|
|
||||||
nursery.cancel_scope.shield = shield
|
|
||||||
yield nursery
|
|
||||||
|
|
||||||
|
|
||||||
def _unwrap_msg(
|
def _unwrap_msg(
|
||||||
|
|
||||||
msg: dict[str, Any],
|
msg: dict[str, Any],
|
||||||
|
|
|
@ -21,6 +21,7 @@ Sugary patterns for trio + tractor designs.
|
||||||
from ._mngrs import (
|
from ._mngrs import (
|
||||||
gather_contexts,
|
gather_contexts,
|
||||||
maybe_open_context,
|
maybe_open_context,
|
||||||
|
maybe_open_nursery,
|
||||||
)
|
)
|
||||||
from ._broadcast import (
|
from ._broadcast import (
|
||||||
broadcast_receiver,
|
broadcast_receiver,
|
||||||
|
@ -35,4 +36,5 @@ __all__ = [
|
||||||
'BroadcastReceiver',
|
'BroadcastReceiver',
|
||||||
'Lagged',
|
'Lagged',
|
||||||
'maybe_open_context',
|
'maybe_open_context',
|
||||||
|
'maybe_open_nursery',
|
||||||
]
|
]
|
||||||
|
|
|
@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
import inspect
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncContextManager,
|
AsyncContextManager,
|
||||||
|
@ -34,8 +35,8 @@ from typing import (
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
|
||||||
from ..log import get_logger
|
|
||||||
from .._state import current_actor
|
from .._state import current_actor
|
||||||
|
from ..log import get_logger
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -44,6 +45,25 @@ log = get_logger(__name__)
|
||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def maybe_open_nursery(
|
||||||
|
nursery: trio.Nursery = None,
|
||||||
|
shield: bool = False,
|
||||||
|
) -> AsyncGenerator[trio.Nursery, Any]:
|
||||||
|
'''
|
||||||
|
Create a new nursery if None provided.
|
||||||
|
|
||||||
|
Blocks on exit as expected if no input nursery is provided.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if nursery is not None:
|
||||||
|
yield nursery
|
||||||
|
else:
|
||||||
|
async with trio.open_nursery() as nursery:
|
||||||
|
nursery.cancel_scope.shield = shield
|
||||||
|
yield nursery
|
||||||
|
|
||||||
|
|
||||||
async def _enter_and_wait(
|
async def _enter_and_wait(
|
||||||
|
|
||||||
mngr: AsyncContextManager[T],
|
mngr: AsyncContextManager[T],
|
||||||
|
@ -121,6 +141,7 @@ class _Cache:
|
||||||
a kept-alive-while-in-use async resource.
|
a kept-alive-while-in-use async resource.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
service_n: Optional[trio.Nursery] = None
|
||||||
locks: dict[Hashable, trio.Lock] = {}
|
locks: dict[Hashable, trio.Lock] = {}
|
||||||
users: int = 0
|
users: int = 0
|
||||||
values: dict[Any, Any] = {}
|
values: dict[Any, Any] = {}
|
||||||
|
@ -128,6 +149,7 @@ class _Cache:
|
||||||
Hashable,
|
Hashable,
|
||||||
tuple[trio.Nursery, trio.Event]
|
tuple[trio.Nursery, trio.Event]
|
||||||
] = {}
|
] = {}
|
||||||
|
# nurseries: dict[int, trio.Nursery] = {}
|
||||||
no_more_users: Optional[trio.Event] = None
|
no_more_users: Optional[trio.Event] = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -158,7 +180,7 @@ async def maybe_open_context(
|
||||||
# XXX: used as cache key after conversion to tuple
|
# XXX: used as cache key after conversion to tuple
|
||||||
# and all embedded values must also be hashable
|
# and all embedded values must also be hashable
|
||||||
kwargs: dict = {},
|
kwargs: dict = {},
|
||||||
key: Hashable = None,
|
key: Hashable | Callable[..., Hashable] = None,
|
||||||
|
|
||||||
) -> AsyncIterator[tuple[bool, T]]:
|
) -> AsyncIterator[tuple[bool, T]]:
|
||||||
'''
|
'''
|
||||||
|
@ -168,8 +190,14 @@ async def maybe_open_context(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
fid = id(acm_func)
|
fid = id(acm_func)
|
||||||
ctx_key = (fid, key or tuple(kwargs.items()))
|
|
||||||
value = None
|
if inspect.isfunction(key):
|
||||||
|
ctx_key = (fid, key(**kwargs))
|
||||||
|
else:
|
||||||
|
ctx_key = (fid, key or tuple(kwargs.items()))
|
||||||
|
|
||||||
|
# yielded output
|
||||||
|
yielded: Any = None
|
||||||
|
|
||||||
# Lock resource acquisition around task racing / ``trio``'s
|
# Lock resource acquisition around task racing / ``trio``'s
|
||||||
# scheduler protocol.
|
# scheduler protocol.
|
||||||
|
@ -179,44 +207,51 @@ async def maybe_open_context(
|
||||||
lock = _Cache.locks.setdefault(fid, trio.Lock())
|
lock = _Cache.locks.setdefault(fid, trio.Lock())
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
|
||||||
|
# XXX: one singleton nursery per actor and we want to
|
||||||
|
# have it not be closed until all consumers have exited (which is
|
||||||
|
# currently difficult to implement any other way besides using our
|
||||||
|
# pre-allocated runtime instance..)
|
||||||
|
service_n: trio.Nursery = current_actor()._service_n
|
||||||
|
|
||||||
|
# TODO: is there any way to allocate
|
||||||
|
# a 'stays-open-till-last-task-finshed nursery?
|
||||||
|
# service_n: trio.Nursery
|
||||||
|
# async with maybe_open_nursery(_Cache.service_n) as service_n:
|
||||||
|
# _Cache.service_n = service_n
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# **critical section** that should prevent other tasks from
|
# **critical section** that should prevent other tasks from
|
||||||
# checking the _Cache until complete otherwise the scheduler
|
# checking the _Cache until complete otherwise the scheduler
|
||||||
# may switch and by accident we create more then one resource.
|
# may switch and by accident we create more then one resource.
|
||||||
value = _Cache.values[ctx_key]
|
yielded = _Cache.values[ctx_key]
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.info(f'Allocating new {acm_func} for {ctx_key}')
|
log.info(f'Allocating new {acm_func} for {ctx_key}')
|
||||||
|
|
||||||
mngr = acm_func(**kwargs)
|
mngr = acm_func(**kwargs)
|
||||||
# TODO: avoid pulling from ``tractor`` internals and
|
|
||||||
# instead offer a "root nursery" in piker actors?
|
|
||||||
service_n = current_actor()._service_n
|
|
||||||
|
|
||||||
# TODO: does this need to be a tractor "root nursery"?
|
|
||||||
resources = _Cache.resources
|
resources = _Cache.resources
|
||||||
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
|
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
|
||||||
resources[ctx_key] = (service_n, trio.Event())
|
resources[ctx_key] = (service_n, trio.Event())
|
||||||
|
|
||||||
value = await service_n.start(
|
# sync up to the mngr's yielded value
|
||||||
|
yielded = await service_n.start(
|
||||||
_Cache.run_ctx,
|
_Cache.run_ctx,
|
||||||
mngr,
|
mngr,
|
||||||
ctx_key,
|
ctx_key,
|
||||||
)
|
)
|
||||||
_Cache.users += 1
|
_Cache.users += 1
|
||||||
lock.release()
|
lock.release()
|
||||||
yield False, value
|
yield False, yielded
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.info(f'Reusing _Cached resource for {ctx_key}')
|
log.info(f'Reusing _Cached resource for {ctx_key}')
|
||||||
_Cache.users += 1
|
_Cache.users += 1
|
||||||
lock.release()
|
lock.release()
|
||||||
yield True, value
|
yield True, yielded
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
_Cache.users -= 1
|
_Cache.users -= 1
|
||||||
|
|
||||||
if value is not None:
|
if yielded is not None:
|
||||||
# if no more consumers, teardown the client
|
# if no more consumers, teardown the client
|
||||||
if _Cache.users <= 0:
|
if _Cache.users <= 0:
|
||||||
log.info(f'De-allocating resource for {ctx_key}')
|
log.info(f'De-allocating resource for {ctx_key}')
|
||||||
|
|
Loading…
Reference in New Issue