Factor out context cacher to `tractor.trionics`
							parent
							
								
									65ad18b5c3
								
							
						
					
					
						commit
						6f2c2b46d5
					
				| 
						 | 
				
			
			@ -18,30 +18,18 @@
 | 
			
		|||
Cacheing apis and toolz.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
# further examples of interest:
 | 
			
		||||
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
 | 
			
		||||
 | 
			
		||||
from collections import OrderedDict
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    Hashable,
 | 
			
		||||
    Optional,
 | 
			
		||||
    TypeVar,
 | 
			
		||||
    AsyncContextManager,
 | 
			
		||||
)
 | 
			
		||||
from contextlib import (
 | 
			
		||||
    asynccontextmanager,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from trio_typing import TaskStatus
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor.trionics import maybe_open_context
 | 
			
		||||
 | 
			
		||||
from .brokers import get_brokermod
 | 
			
		||||
from .log import get_logger
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
T = TypeVar('T')
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128):
 | 
			
		|||
    return decorator
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_cache: dict[str, 'Client'] = {}  # noqa
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class cache:
 | 
			
		||||
    '''Globally (processs wide) cached, task access to a
 | 
			
		||||
    kept-alive-while-in-use async resource.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    lock = trio.Lock()
 | 
			
		||||
    users: int = 0
 | 
			
		||||
    values: dict[Any,  Any] = {}
 | 
			
		||||
    resources: dict[
 | 
			
		||||
        int,
 | 
			
		||||
        Optional[tuple[trio.Nursery, trio.Event]]
 | 
			
		||||
    ] = {}
 | 
			
		||||
    no_more_users: Optional[trio.Event] = None
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    async def run_ctx(
 | 
			
		||||
        cls,
 | 
			
		||||
        mng,
 | 
			
		||||
        key,
 | 
			
		||||
        task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        async with mng as value:
 | 
			
		||||
 | 
			
		||||
            _, no_more_users = cls.resources[id(mng)]
 | 
			
		||||
            cls.values[key] = value
 | 
			
		||||
            task_status.started(value)
 | 
			
		||||
            try:
 | 
			
		||||
                await no_more_users.wait()
 | 
			
		||||
            finally:
 | 
			
		||||
                value = cls.values.pop(key)
 | 
			
		||||
                # discard nursery ref so it won't be re-used (an error)
 | 
			
		||||
                cls.resources.pop(id(mng))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def maybe_open_ctx(
 | 
			
		||||
 | 
			
		||||
    key: Hashable,
 | 
			
		||||
    mngr: AsyncContextManager[T],
 | 
			
		||||
 | 
			
		||||
) -> (bool, T):
 | 
			
		||||
    '''Maybe open a context manager if there is not already a cached
 | 
			
		||||
    version for the provided ``key``. Return the cached instance on
 | 
			
		||||
    a cache hit.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
 | 
			
		||||
    await cache.lock.acquire()
 | 
			
		||||
 | 
			
		||||
    ctx_key = id(mngr)
 | 
			
		||||
 | 
			
		||||
    value = None
 | 
			
		||||
    try:
 | 
			
		||||
        # lock feed acquisition around task racing  / ``trio``'s
 | 
			
		||||
        # scheduler protocol
 | 
			
		||||
        value = cache.values[key]
 | 
			
		||||
        log.info(f'Reusing cached resource for {key}')
 | 
			
		||||
        cache.users += 1
 | 
			
		||||
        cache.lock.release()
 | 
			
		||||
        yield True, value
 | 
			
		||||
 | 
			
		||||
    except KeyError:
 | 
			
		||||
        log.info(f'Allocating new resource for {key}')
 | 
			
		||||
 | 
			
		||||
        # **critical section** that should prevent other tasks from
 | 
			
		||||
        # checking the cache until complete otherwise the scheduler
 | 
			
		||||
        # may switch and by accident we create more then one feed.
 | 
			
		||||
 | 
			
		||||
        # TODO: avoid pulling from ``tractor`` internals and
 | 
			
		||||
        # instead offer a "root nursery" in piker actors?
 | 
			
		||||
        service_n = tractor.current_actor()._service_n
 | 
			
		||||
 | 
			
		||||
        # TODO: does this need to be a tractor "root nursery"?
 | 
			
		||||
        ln = cache.resources.get(ctx_key)
 | 
			
		||||
        assert not ln
 | 
			
		||||
 | 
			
		||||
        ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
 | 
			
		||||
 | 
			
		||||
        value = await ln.start(cache.run_ctx, mngr, key)
 | 
			
		||||
        cache.users += 1
 | 
			
		||||
        cache.lock.release()
 | 
			
		||||
 | 
			
		||||
        yield False, value
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        cache.users -= 1
 | 
			
		||||
 | 
			
		||||
        if cache.lock.locked():
 | 
			
		||||
            cache.lock.release()
 | 
			
		||||
 | 
			
		||||
        if value is not None:
 | 
			
		||||
            # if no more consumers, teardown the client
 | 
			
		||||
            if cache.users <= 0:
 | 
			
		||||
                log.warning(f'De-allocating resource for {key}')
 | 
			
		||||
 | 
			
		||||
                # terminate mngr nursery
 | 
			
		||||
                entry = cache.resources.get(ctx_key)
 | 
			
		||||
                if entry:
 | 
			
		||||
                    _, no_more_users = entry
 | 
			
		||||
                    no_more_users.set()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def open_cached_client(
 | 
			
		||||
    brokername: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -190,7 +72,7 @@ async def open_cached_client(
 | 
			
		|||
 | 
			
		||||
    '''
 | 
			
		||||
    brokermod = get_brokermod(brokername)
 | 
			
		||||
    async with maybe_open_ctx(
 | 
			
		||||
    async with maybe_open_context(
 | 
			
		||||
        key=brokername,
 | 
			
		||||
        mngr=brokermod.get_client(),
 | 
			
		||||
    ) as (cache_hit, client):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,7 +37,7 @@ import tractor
 | 
			
		|||
from pydantic import BaseModel
 | 
			
		||||
 | 
			
		||||
from ..brokers import get_brokermod
 | 
			
		||||
from .._cacheables import maybe_open_ctx
 | 
			
		||||
from .._cacheables import maybe_open_context
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
from .._daemon import (
 | 
			
		||||
    maybe_spawn_brokerd,
 | 
			
		||||
| 
						 | 
				
			
			@ -363,7 +363,7 @@ async def open_sample_step_stream(
 | 
			
		|||
    # XXX: this should be singleton on a host,
 | 
			
		||||
    # a lone broker-daemon per provider should be
 | 
			
		||||
    # created for all practical purposes
 | 
			
		||||
    async with maybe_open_ctx(
 | 
			
		||||
    async with maybe_open_context(
 | 
			
		||||
        key=delay_s,
 | 
			
		||||
        mngr=portal.open_stream_from(
 | 
			
		||||
            iter_ohlc_periods,
 | 
			
		||||
| 
						 | 
				
			
			@ -585,7 +585,7 @@ async def maybe_open_feed(
 | 
			
		|||
    '''
 | 
			
		||||
    sym = symbols[0].lower()
 | 
			
		||||
 | 
			
		||||
    async with maybe_open_ctx(
 | 
			
		||||
    async with maybe_open_context(
 | 
			
		||||
        key=(brokername, sym),
 | 
			
		||||
        mngr=open_feed(
 | 
			
		||||
            brokername,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -35,7 +35,7 @@ import tractor
 | 
			
		|||
import trio
 | 
			
		||||
 | 
			
		||||
from .. import brokers
 | 
			
		||||
from .._cacheables import maybe_open_ctx
 | 
			
		||||
from .._cacheables import maybe_open_context
 | 
			
		||||
from ..trionics import async_enter_all
 | 
			
		||||
from ..data.feed import open_feed, Feed
 | 
			
		||||
from ._chart import (
 | 
			
		||||
| 
						 | 
				
			
			@ -555,7 +555,7 @@ async def maybe_open_fsp_cluster(
 | 
			
		|||
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
 | 
			
		||||
 | 
			
		||||
    uid = tractor.current_actor().uid
 | 
			
		||||
    async with maybe_open_ctx(
 | 
			
		||||
    async with maybe_open_context(
 | 
			
		||||
        key=uid,  # for now make a cluster per client?
 | 
			
		||||
        mngr=open_fsp_cluster(
 | 
			
		||||
            workers,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue