forked from goodboy/tractor
Merge pull request #257 from goodboy/context_caching
Add `maybe_open_context()` an actor wide task-resource cachewin_ci_timeout
commit
4001d2c3fc
|
@ -0,0 +1,9 @@
|
|||
Add ``trionics.maybe_open_context()`` an actor-scoped async multi-task
|
||||
context manager resource caching API.
|
||||
|
||||
Adds an SC-safe cacheing async context manager api that only enters on
|
||||
the *first* task entry and only exits on the *last* task exit while in
|
||||
between delivering the same cached value per input key. Keys can be
|
||||
either an explicit ``key`` named arg provided by the user or a
|
||||
hashable ``kwargs`` dict (will be converted to a ``list[tuple]``) which
|
||||
is passed to the underlying manager function as input.
|
|
@ -0,0 +1,182 @@
|
|||
'''
|
||||
Async context manager cache api testing: ``trionics.maybe_open_context():``
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
import platform
|
||||
from typing import Awaitable
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
|
||||
_resource: int = 0
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_increment_counter(task_name: str):
|
||||
global _resource
|
||||
|
||||
_resource += 1
|
||||
await trio.lowlevel.checkpoint()
|
||||
yield _resource
|
||||
await trio.lowlevel.checkpoint()
|
||||
_resource -= 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'key_on',
|
||||
['key_value', 'kwargs'],
|
||||
ids="key_on={}".format,
|
||||
)
|
||||
def test_resource_only_entered_once(key_on):
|
||||
global _resource
|
||||
_resource = 0
|
||||
|
||||
kwargs = {}
|
||||
key = None
|
||||
if key_on == 'key_value':
|
||||
key = 'some_common_key'
|
||||
|
||||
async def main():
|
||||
cache_active: bool = False
|
||||
|
||||
async def enter_cached_mngr(name: str):
|
||||
nonlocal cache_active
|
||||
|
||||
if key_on == 'kwargs':
|
||||
# make a common kwargs input to key on it
|
||||
kwargs = {'task_name': 'same_task_name'}
|
||||
assert key is None
|
||||
else:
|
||||
# different task names per task will be used
|
||||
kwargs = {'task_name': name}
|
||||
|
||||
async with tractor.trionics.maybe_open_context(
|
||||
maybe_increment_counter,
|
||||
kwargs=kwargs,
|
||||
key=key,
|
||||
|
||||
) as (cache_hit, resource):
|
||||
if cache_hit:
|
||||
try:
|
||||
cache_active = True
|
||||
assert resource == 1
|
||||
await trio.sleep_forever()
|
||||
finally:
|
||||
cache_active = False
|
||||
else:
|
||||
assert resource == 1
|
||||
await trio.sleep_forever()
|
||||
|
||||
with trio.move_on_after(0.5):
|
||||
async with (
|
||||
tractor.open_root_actor(),
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
|
||||
for i in range(10):
|
||||
n.start_soon(enter_cached_mngr, f'task_{i}')
|
||||
await trio.sleep(0.001)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def streamer(
|
||||
ctx: tractor.Context,
|
||||
seq: list[int] = list(range(1000)),
|
||||
) -> None:
|
||||
|
||||
await ctx.started()
|
||||
async with ctx.open_stream() as stream:
|
||||
for val in seq:
|
||||
await stream.send(val)
|
||||
await trio.sleep(0.001)
|
||||
|
||||
print('producer finished')
|
||||
|
||||
|
||||
@acm
|
||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
||||
|
||||
async with tractor.open_nursery() as tn:
|
||||
portal = await tn.start_actor('streamer', enable_modules=[__name__])
|
||||
async with (
|
||||
portal.open_context(streamer) as (ctx, first),
|
||||
ctx.open_stream() as stream,
|
||||
):
|
||||
yield stream
|
||||
|
||||
await portal.cancel_actor()
|
||||
print('CANCELLED STREAMER')
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_stream(taskname: str):
|
||||
async with tractor.trionics.maybe_open_context(
|
||||
# NOTE: all secondary tasks should cache hit on the same key
|
||||
acm_func=open_stream,
|
||||
) as (cache_hit, stream):
|
||||
|
||||
if cache_hit:
|
||||
print(f'{taskname} loaded from cache')
|
||||
|
||||
# add a new broadcast subscription for the quote stream
|
||||
# if this feed is already allocated by the first
|
||||
# task that entereed
|
||||
async with stream.subscribe() as bstream:
|
||||
yield bstream
|
||||
else:
|
||||
# yield the actual stream
|
||||
yield stream
|
||||
|
||||
|
||||
def test_open_local_sub_to_stream():
|
||||
'''
|
||||
Verify a single inter-actor stream can can be fanned-out shared to
|
||||
N local tasks using ``trionics.maybe_open_context():``.
|
||||
|
||||
'''
|
||||
timeout = 3 if platform.system() != "Windows" else 10
|
||||
|
||||
async def main():
|
||||
|
||||
full = list(range(1000))
|
||||
|
||||
async def get_sub_and_pull(taskname: str):
|
||||
async with (
|
||||
maybe_open_stream(taskname) as stream,
|
||||
):
|
||||
if '0' in taskname:
|
||||
assert isinstance(stream, tractor.MsgStream)
|
||||
else:
|
||||
assert isinstance(
|
||||
stream,
|
||||
tractor.trionics.BroadcastReceiver
|
||||
)
|
||||
|
||||
first = await stream.receive()
|
||||
print(f'{taskname} started with value {first}')
|
||||
seq = []
|
||||
async for msg in stream:
|
||||
seq.append(msg)
|
||||
|
||||
assert set(seq).issubset(set(full))
|
||||
print(f'{taskname} finished')
|
||||
|
||||
with trio.fail_after(timeout):
|
||||
# TODO: turns out this isn't multi-task entrant XD
|
||||
# We probably need an indepotent entry semantic?
|
||||
async with tractor.open_root_actor():
|
||||
async with (
|
||||
trio.open_nursery() as nurse,
|
||||
):
|
||||
for i in range(10):
|
||||
nurse.start_soon(get_sub_and_pull, f'task_{i}')
|
||||
await trio.sleep(0.001)
|
||||
|
||||
print('all consumer tasks finished')
|
||||
|
||||
trio.run(main)
|
|
@ -215,6 +215,14 @@ async def open_root_actor(
|
|||
raise
|
||||
|
||||
finally:
|
||||
# NOTE: not sure if we'll ever need this but it's
|
||||
# possibly better for even more determinism?
|
||||
# logger.cancel(f'Waiting on {len(nurseries)} nurseries in root..')
|
||||
# nurseries = actor._actoruid2nursery.values()
|
||||
# async with trio.open_nursery() as tempn:
|
||||
# for an in nurseries:
|
||||
# tempn.start_soon(an.exited.wait)
|
||||
|
||||
logger.cancel("Shutting down root actor")
|
||||
await actor.cancel()
|
||||
finally:
|
||||
|
|
|
@ -18,8 +18,15 @@
|
|||
Sugary patterns for trio + tractor designs.
|
||||
|
||||
'''
|
||||
from ._mngrs import gather_contexts
|
||||
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
|
||||
from ._mngrs import (
|
||||
gather_contexts,
|
||||
maybe_open_context,
|
||||
)
|
||||
from ._broadcast import (
|
||||
broadcast_receiver,
|
||||
BroadcastReceiver,
|
||||
Lagged,
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
@ -27,4 +34,5 @@ __all__ = [
|
|||
'broadcast_receiver',
|
||||
'BroadcastReceiver',
|
||||
'Lagged',
|
||||
'maybe_open_context',
|
||||
]
|
||||
|
|
|
@ -331,10 +331,16 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
if self._closed:
|
||||
return
|
||||
|
||||
# if there are sleeping consumers wake
|
||||
# them on closure.
|
||||
rr = self._state.recv_ready
|
||||
if rr:
|
||||
_, event = rr
|
||||
event.set()
|
||||
|
||||
# XXX: leaving it like this consumers can still get values
|
||||
# up to the last received that still reside in the queue.
|
||||
self._state.subs.pop(self.key)
|
||||
|
||||
self._closed = True
|
||||
|
||||
|
||||
|
|
|
@ -18,12 +18,27 @@
|
|||
Async context manager primitives with hard ``trio``-aware semantics
|
||||
|
||||
'''
|
||||
from typing import AsyncContextManager, AsyncGenerator
|
||||
from typing import TypeVar, Sequence
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncContextManager,
|
||||
AsyncGenerator,
|
||||
AsyncIterator,
|
||||
Callable,
|
||||
Hashable,
|
||||
Optional,
|
||||
Sequence,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
|
||||
from ..log import get_logger
|
||||
from .._state import current_actor
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
# A regular invariant generic type
|
||||
T = TypeVar("T")
|
||||
|
@ -92,3 +107,118 @@ async def gather_contexts(
|
|||
# we don't need a try/finally since cancellation will be triggered
|
||||
# by the surrounding nursery on error.
|
||||
parent_exit.set()
|
||||
|
||||
|
||||
# Per actor task caching helpers.
|
||||
# Further potential examples of interest:
|
||||
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
|
||||
|
||||
class _Cache:
|
||||
'''
|
||||
Globally (actor-processs scoped) cached, task access to
|
||||
a kept-alive-while-in-use async resource.
|
||||
|
||||
'''
|
||||
lock = trio.Lock()
|
||||
users: int = 0
|
||||
values: dict[Any, Any] = {}
|
||||
resources: dict[
|
||||
Hashable,
|
||||
tuple[trio.Nursery, trio.Event]
|
||||
] = {}
|
||||
no_more_users: Optional[trio.Event] = None
|
||||
|
||||
@classmethod
|
||||
async def run_ctx(
|
||||
cls,
|
||||
mng,
|
||||
ctx_key: tuple,
|
||||
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
async with mng as value:
|
||||
_, no_more_users = cls.resources[ctx_key]
|
||||
cls.values[ctx_key] = value
|
||||
task_status.started(value)
|
||||
try:
|
||||
await no_more_users.wait()
|
||||
finally:
|
||||
# discard nursery ref so it won't be re-used (an error)?
|
||||
value = cls.values.pop(ctx_key)
|
||||
cls.resources.pop(ctx_key)
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_context(
|
||||
|
||||
acm_func: Callable[..., AsyncContextManager[T]],
|
||||
|
||||
# XXX: used as cache key after conversion to tuple
|
||||
# and all embedded values must also be hashable
|
||||
kwargs: dict = {},
|
||||
key: Hashable = None,
|
||||
|
||||
) -> AsyncIterator[tuple[bool, T]]:
|
||||
'''
|
||||
Maybe open a context manager if there is not already a _Cached
|
||||
version for the provided ``key`` for *this* actor. Return the
|
||||
_Cached instance on a _Cache hit.
|
||||
|
||||
'''
|
||||
# lock resource acquisition around task racing / ``trio``'s
|
||||
# scheduler protocol
|
||||
await _Cache.lock.acquire()
|
||||
|
||||
ctx_key = (id(acm_func), key or tuple(kwargs.items()))
|
||||
print(ctx_key)
|
||||
value = None
|
||||
|
||||
try:
|
||||
# **critical section** that should prevent other tasks from
|
||||
# checking the _Cache until complete otherwise the scheduler
|
||||
# may switch and by accident we create more then one resource.
|
||||
value = _Cache.values[ctx_key]
|
||||
|
||||
except KeyError:
|
||||
log.info(f'Allocating new resource for {ctx_key}')
|
||||
|
||||
mngr = acm_func(**kwargs)
|
||||
# TODO: avoid pulling from ``tractor`` internals and
|
||||
# instead offer a "root nursery" in piker actors?
|
||||
service_n = current_actor()._service_n
|
||||
|
||||
# TODO: does this need to be a tractor "root nursery"?
|
||||
resources = _Cache.resources
|
||||
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
|
||||
ln, _ = resources[ctx_key] = (service_n, trio.Event())
|
||||
|
||||
value = await ln.start(
|
||||
_Cache.run_ctx,
|
||||
mngr,
|
||||
ctx_key,
|
||||
)
|
||||
_Cache.users += 1
|
||||
_Cache.lock.release()
|
||||
yield False, value
|
||||
|
||||
else:
|
||||
log.info(f'Reusing _Cached resource for {ctx_key}')
|
||||
_Cache.users += 1
|
||||
_Cache.lock.release()
|
||||
yield True, value
|
||||
|
||||
finally:
|
||||
_Cache.users -= 1
|
||||
|
||||
if value is not None:
|
||||
# if no more consumers, teardown the client
|
||||
if _Cache.users <= 0:
|
||||
log.info(f'De-allocating resource for {ctx_key}')
|
||||
|
||||
# XXX: if we're cancelled we the entry may have never
|
||||
# been entered since the nursery task was killed.
|
||||
# _, no_more_users = _Cache.resources[ctx_key]
|
||||
entry = _Cache.resources.get(ctx_key)
|
||||
if entry:
|
||||
_, no_more_users = entry
|
||||
no_more_users.set()
|
||||
|
|
Loading…
Reference in New Issue