forked from goodboy/tractor
				
			Add `maybe_open_context()` an actor wide task-resource cache
							parent
							
								
									2d6fbd5437
								
							
						
					
					
						commit
						5f41dbf34f
					
				| 
						 | 
				
			
			@ -18,8 +18,15 @@
 | 
			
		|||
Sugary patterns for trio + tractor designs.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from ._mngrs import gather_contexts
 | 
			
		||||
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
 | 
			
		||||
from ._mngrs import (
 | 
			
		||||
    gather_contexts,
 | 
			
		||||
    maybe_open_context,
 | 
			
		||||
)
 | 
			
		||||
from ._broadcast import (
 | 
			
		||||
    broadcast_receiver,
 | 
			
		||||
    BroadcastReceiver,
 | 
			
		||||
    Lagged,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
__all__ = [
 | 
			
		||||
| 
						 | 
				
			
			@ -27,4 +34,5 @@ __all__ = [
 | 
			
		|||
    'broadcast_receiver',
 | 
			
		||||
    'BroadcastReceiver',
 | 
			
		||||
    'Lagged',
 | 
			
		||||
    'maybe_open_context',
 | 
			
		||||
]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,12 +18,25 @@
 | 
			
		|||
Async context manager primitives with hard ``trio``-aware semantics
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from typing import AsyncContextManager, AsyncGenerator
 | 
			
		||||
from typing import TypeVar, Sequence
 | 
			
		||||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    AsyncContextManager,
 | 
			
		||||
    AsyncGenerator,
 | 
			
		||||
    Hashable,
 | 
			
		||||
    Optional,
 | 
			
		||||
    Sequence,
 | 
			
		||||
    TypeVar,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from trio_typing import TaskStatus
 | 
			
		||||
 | 
			
		||||
from ..log import get_logger
 | 
			
		||||
from .._state import current_actor
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
# A regular invariant generic type
 | 
			
		||||
T = TypeVar("T")
 | 
			
		||||
| 
						 | 
				
			
			@ -92,3 +105,111 @@ async def gather_contexts(
 | 
			
		|||
        # we don't need a try/finally since cancellation will be triggered
 | 
			
		||||
        # by the surrounding nursery on error.
 | 
			
		||||
        parent_exit.set()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Per actor task caching helpers.
 | 
			
		||||
# Further potential examples of interest:
 | 
			
		||||
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
 | 
			
		||||
 | 
			
		||||
class cache:
 | 
			
		||||
    '''
 | 
			
		||||
    Globally (processs wide) cached, task access to a
 | 
			
		||||
    kept-alive-while-in-use async resource.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    lock = trio.Lock()
 | 
			
		||||
    users: int = 0
 | 
			
		||||
    values: dict[Any,  Any] = {}
 | 
			
		||||
    resources: dict[
 | 
			
		||||
        int,
 | 
			
		||||
        Optional[tuple[trio.Nursery, trio.Event]]
 | 
			
		||||
    ] = {}
 | 
			
		||||
    no_more_users: Optional[trio.Event] = None
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    async def run_ctx(
 | 
			
		||||
        cls,
 | 
			
		||||
        mng,
 | 
			
		||||
        key,
 | 
			
		||||
        task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        async with mng as value:
 | 
			
		||||
 | 
			
		||||
            _, no_more_users = cls.resources[id(mng)]
 | 
			
		||||
            cls.values[key] = value
 | 
			
		||||
            task_status.started(value)
 | 
			
		||||
            try:
 | 
			
		||||
                await no_more_users.wait()
 | 
			
		||||
            finally:
 | 
			
		||||
                value = cls.values.pop(key)
 | 
			
		||||
                # discard nursery ref so it won't be re-used (an error)
 | 
			
		||||
                cls.resources.pop(id(mng))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def maybe_open_context(
 | 
			
		||||
 | 
			
		||||
    key: Hashable,
 | 
			
		||||
    mngr: AsyncContextManager[T],
 | 
			
		||||
 | 
			
		||||
) -> (bool, T):
 | 
			
		||||
    '''
 | 
			
		||||
    Maybe open a context manager if there is not already a cached
 | 
			
		||||
    version for the provided ``key``. Return the cached instance on
 | 
			
		||||
    a cache hit.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    await cache.lock.acquire()
 | 
			
		||||
 | 
			
		||||
    ctx_key = id(mngr)
 | 
			
		||||
 | 
			
		||||
    value = None
 | 
			
		||||
    try:
 | 
			
		||||
        # lock feed acquisition around task racing  / ``trio``'s
 | 
			
		||||
        # scheduler protocol
 | 
			
		||||
        value = cache.values[key]
 | 
			
		||||
        log.info(f'Reusing cached resource for {key}')
 | 
			
		||||
        cache.users += 1
 | 
			
		||||
        cache.lock.release()
 | 
			
		||||
        yield True, value
 | 
			
		||||
 | 
			
		||||
    except KeyError:
 | 
			
		||||
        log.info(f'Allocating new resource for {key}')
 | 
			
		||||
 | 
			
		||||
        # **critical section** that should prevent other tasks from
 | 
			
		||||
        # checking the cache until complete otherwise the scheduler
 | 
			
		||||
        # may switch and by accident we create more then one feed.
 | 
			
		||||
 | 
			
		||||
        # TODO: avoid pulling from ``tractor`` internals and
 | 
			
		||||
        # instead offer a "root nursery" in piker actors?
 | 
			
		||||
        service_n = current_actor()._service_n
 | 
			
		||||
 | 
			
		||||
        # TODO: does this need to be a tractor "root nursery"?
 | 
			
		||||
        ln = cache.resources.get(ctx_key)
 | 
			
		||||
        assert not ln
 | 
			
		||||
 | 
			
		||||
        ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
 | 
			
		||||
 | 
			
		||||
        value = await ln.start(cache.run_ctx, mngr, key)
 | 
			
		||||
        cache.users += 1
 | 
			
		||||
        cache.lock.release()
 | 
			
		||||
 | 
			
		||||
        yield False, value
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        cache.users -= 1
 | 
			
		||||
 | 
			
		||||
        if cache.lock.locked():
 | 
			
		||||
            cache.lock.release()
 | 
			
		||||
 | 
			
		||||
        if value is not None:
 | 
			
		||||
            # if no more consumers, teardown the client
 | 
			
		||||
            if cache.users <= 0:
 | 
			
		||||
                log.info(f'De-allocating resource for {key}')
 | 
			
		||||
 | 
			
		||||
                # terminate mngr nursery
 | 
			
		||||
                entry = cache.resources.get(ctx_key)
 | 
			
		||||
                if entry:
 | 
			
		||||
                    _, no_more_users = entry
 | 
			
		||||
                    no_more_users.set()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue