diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 0559cbfb..8b3cc416 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -26,14 +26,22 @@ from contextlib import asynccontextmanager from importlib import import_module from types import ModuleType from typing import ( - Dict, Any, Sequence, AsyncIterator, Optional + Dict, Any, Sequence, + AsyncIterator, Optional, + Callable, Awaitable ) +import trio import tractor +from pydantic import BaseModel from ..brokers import get_brokermod from ..log import get_logger, get_console_log -from .._daemon import spawn_brokerd, maybe_open_pikerd +from .._daemon import ( + spawn_brokerd, + maybe_open_pikerd, + maybe_spawn_brokerd, +) from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, @@ -74,57 +82,97 @@ def get_ingestormod(name: str) -> ModuleType: return module -@asynccontextmanager -async def maybe_spawn_brokerd( - brokername: str, - loglevel: Optional[str] = None, +# @dataclass +class _FeedsCache(BaseModel): + """Data feeds manager. - # XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = True, -) -> tractor._portal.Portal: - """If no ``brokerd.{brokername}`` daemon-actor can be found, - spawn one in a local subactor and return a portal to it. + This is a brokerd side api used to manager persistent real-time + streams that can be allocated and left alive indefinitely. """ - if loglevel: - get_console_log(loglevel) + brokername: str + nursery: trio.Nursery + tasks: Dict[str, trio.CancelScope] = {} - dname = f'brokerd.{brokername}' - async with tractor.find_actor(dname) as portal: + class Config: + arbitrary_types_allowed = True - # WTF: why doesn't this work? - if portal is not None: - yield portal + # tasks: Dict[str, trio.CancelScope] = field(default_factory=dict) - else: - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( - loglevel=loglevel - ) as pikerd_portal: + async def start_feed( + symbol: str, + func: Callable[[int], Awaitable[None]], + ) -> None: + """Start a bg feed task and register a surrouding cancel scope + for it. - if pikerd_portal is None: - # we are root so spawn brokerd directly in our tree - # the root nursery is accessed through process global state - await spawn_brokerd(brokername, loglevel=loglevel) + """ + with trio.CancelCscope() as cs: + pass - else: - await pikerd_portal.run( - spawn_brokerd, - brokername=brokername, - loglevel=loglevel, - debug_mode=debug_mode, - ) + async def cancel_all(self) -> None: + for name, cs in self.tasks.item(): + log.debug(f'Cancelling cached feed for {name}') + cs.cancel() + + +_feeds: _FeedsCache = None + + +def get_feeds_manager( + brokername: str, + nursery: Optional[trio.Nursery] = None, +) -> _FeedsCache: + """ + Retreive data feeds manager from process global scope. + + """ + + global _feeds + + if nursery is not None: + assert _feeds is None, "Feeds manager is already setup?" + + # this is initial setup by parent actor + _feeds = _FeedsCache( + brokername=brokername, + nursery=nursery, + ) + assert not _feeds.tasks + + assert _feeds.brokername == brokername, "Uhhh wtf" + return _feeds + + +async def _setup_persistent_feeds(brokername: str) -> None: + """Allocate a actor-wide service nursery in ``brokerd`` + such that feeds can be run in the background persistently by + the broker backend as needed. + + """ + async with trio.open_nursery() as service_nursery: + _feeds = get_feeds_manager(brokername, service_nursery) + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + + +@tractor.stream +async def allocate_cached_feed( + ctx: tractor.Context, + symbol: str +): + _feeds = get_feeds_manager(brokername, service_nursery) + + # setup shared mem buffer + pass - async with tractor.wait_for_actor(dname) as portal: - yield portal @dataclass class Feed: - """A data feed for client-side interaction with far-process + """A data feed for client-side interaction with far-process# }}} real-time data sources. This is an thin abstraction on top of ``tractor``'s portals for @@ -279,6 +327,8 @@ async def open_feed( if opened: assert data['is_shm_writer'] log.info("Started shared mem bar writer") + else: + s = attach_shm_array(shm_token) shm_token['dtype_descr'] = list(shm_token['dtype_descr']) assert shm_token == shm.token # sanity