diff --git a/piker/_daemon.py b/piker/_daemon.py index 9e68ebdd..6f77cc48 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -18,10 +18,16 @@ Structured, daemon tree service management. """ -from typing import Optional, Union, Callable, Any +from __future__ import annotations +import os +from typing import ( + Optional, + Union, + Callable, + Any, +) from contextlib import ( asynccontextmanager as acm, - contextmanager as cm, ) from collections import defaultdict @@ -29,7 +35,10 @@ import tractor import trio from trio_typing import TaskStatus -from .log import get_logger, get_console_log +from .log import ( + get_logger, + get_console_log, +) from .brokers import get_brokermod @@ -44,14 +53,86 @@ _default_reg_addr: tuple[str, int] = ( _default_registry_port, ) + # NOTE: this value is set as an actor-global once the first endpoint # who is capable, spawns a `pikerd` service tree. -_registry_addr: tuple[str, int] | None = None +_registry: Registry | None = None + + +class Registry: + addr: None | tuple[str, int] = None + + # TODO: table of uids to sockaddrs + peers: dict[ + tuple[str, str], + tuple[str, int], + ] = {} + + +# _registry_addr: None | tuple[str, int] = None +_tractor_kwargs: dict[str, Any] = {} + + +@acm +async def open_registry( + addr: None | tuple[str, int] = None, + ensure_exists: bool = True, + +) -> tuple[str, int]: + + global _tractor_kwargs + actor = tractor.current_actor() + uid = actor.uid + if ( + Registry.addr is not None + and addr + ): + raise RuntimeError( + f'`{uid}` registry addr already bound @ {_registry.sockaddr}' + ) + + was_set: bool = False + + if ( + not tractor.is_root_process() + and Registry.addr is None + ): + Registry.addr = actor._arb_addr + + if ( + ensure_exists + and Registry.addr is None + ): + raise RuntimeError( + f"`{uid}` registry should already exist bug doesn't?" + ) + + if ( + Registry.addr is None + ): + was_set = True + Registry.addr = addr or _default_reg_addr + + _tractor_kwargs['arbiter_addr'] = Registry.addr + + try: + yield Registry.addr + finally: + # XXX: always clear the global addr if we set it so that the + # next (set of) calls will apply whatever new one is passed + # in. + if was_set: + Registry.addr = None + + +def get_tractor_runtime_kwargs() -> dict[str, Any]: + ''' + Deliver ``tractor`` related runtime variables in a `dict`. + + ''' + return _tractor_kwargs + -_tractor_kwargs: dict[str, Any] = { - # use a different registry addr then tractor's default - 'arbiter_addr': _registry_addr -} _root_modules = [ __name__, 'piker.clearing._ems', @@ -161,31 +242,78 @@ class Services: f'Serice task for {name} not terminated?' -@cm -def maybe_set_global_registry_sockaddr( - registry_addr: None | tuple[str, int] = None, -) -> None: +@acm +async def open_piker_runtime( + name: str, + enable_modules: list[str] = [], + loglevel: Optional[str] = None, - global _registry_addr - was_set: bool = False - if ( - _registry_addr is None - or registry_addr - ): - _registry_addr = registry_addr or _default_reg_addr - try: - yield _registry_addr - finally: - # XXX: always clear the global addr if we set it so that the - # next (set of) calls will apply whatever new one is passed - # in. - if was_set: - _registry_addr = None + # XXX NOTE XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + + registry_addr: None | tuple[str, int] = None, + + # TODO: once we have `rsyscall` support we will read a config + # and spawn the service tree distributed per that. + start_method: str = 'trio', + + tractor_kwargs: dict = {}, + +) -> tuple[ + tractor.Actor, + tuple[str, int], +]: + ''' + Start a piker actor who's runtime will automatically sync with + existing piker actors on the local link based on configuration. + + Can be called from a subactor or any program that needs to start + a root actor. + + ''' + try: + # check for existing runtime + actor = tractor.current_actor().uid + + except tractor._exceptions.NoRuntime: + + registry_addr = registry_addr or _default_reg_addr + + async with ( + tractor.open_root_actor( + + # passed through to ``open_root_actor`` + arbiter_addr=registry_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=enable_modules, + + **tractor_kwargs, + ) as _, + + open_registry(registry_addr, ensure_exists=False) as addr, + ): + yield ( + tractor.current_actor(), + addr, + ) + else: + async with open_registry(registry_addr) as addr: + yield ( + actor, + addr, + ) @acm async def open_pikerd( - start_method: str = 'trio', loglevel: str | None = None, # XXX: you should pretty much never want debug mode @@ -203,75 +331,35 @@ async def open_pikerd( ''' - with maybe_set_global_registry_sockaddr(registry_addr) as reg_addr: - async with ( - tractor.open_root_actor( + async with ( + open_piker_runtime( - # passed through to ``open_root_actor`` - arbiter_addr=reg_addr, - name=_root_dname, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, + name=_root_dname, + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules, - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules, - ) as _, + loglevel=loglevel, + debug_mode=debug_mode, + registry_addr=registry_addr, - tractor.open_nursery() as actor_nursery, - ): - async with trio.open_nursery() as service_nursery: + ) as (root_actor, reg_addr), + tractor.open_nursery() as actor_nursery, + trio.open_nursery() as service_nursery, + ): + assert root_actor.accept_addr == reg_addr - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_nursery - Services.debug_mode = debug_mode - try: - yield Services - finally: - # if 'samplerd' in Services.service_tasks: - # await Services.cancel_service('samplerd') - service_nursery.cancel_scope.cancel() - - -@acm -async def open_piker_runtime( - name: str, - enable_modules: list[str] = [], - start_method: str = 'trio', - loglevel: Optional[str] = None, - - # XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = False, - registry_addr: None | tuple[str, int] = None, - -) -> tractor.Actor: - ''' - Start a piker actor who's runtime will automatically sync with - existing piker actors on the local link based on configuration. - - ''' - with maybe_set_global_registry_sockaddr(registry_addr) as reg_addr: - async with ( - tractor.open_root_actor( - - # passed through to ``open_root_actor`` - arbiter_addr=reg_addr, - name=name, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, - - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules + enable_modules, - ) as _, - ): - yield tractor.current_actor() + # assign globally for future daemon/task creation + Services.actor_n = actor_nursery + Services.service_n = service_nursery + Services.debug_mode = debug_mode + try: + yield Services + finally: + # if 'samplerd' in Services.service_tasks: + # await Services.cancel_service('samplerd') + service_nursery.cancel_scope.cancel() @acm @@ -284,23 +372,30 @@ async def maybe_open_runtime( Start the ``tractor`` runtime (a root actor) if none exists. """ - settings = _tractor_kwargs - settings.update(kwargs) + # settings = get_tractor_runtime_kwargs() + # settings.update(kwargs) + + name = kwargs.pop('name') if not tractor.current_actor(err_on_no_runtime=False): - async with tractor.open_root_actor( + async with open_piker_runtime( + name, loglevel=loglevel, - **settings, - ): - yield + **kwargs, + # registry + # tractor_kwargs=kwargs, + ) as (_, addr): + yield addr, else: - yield + async with open_registry() as addr: + yield addr @acm async def maybe_open_pikerd( loglevel: Optional[str] = None, registry_addr: None | tuple = None, + **kwargs, ) -> Union[tractor._portal.Portal, Services]: @@ -314,18 +409,31 @@ async def maybe_open_pikerd( get_console_log(loglevel) # subtle, we must have the runtime up here or portal lookup will fail + query_name = kwargs.pop('name', f'piker_query_{os.getpid()}') + + # TODO: if we need to make the query part faster we could not init + # an actor runtime and instead just hit the socket? + # from tractor._ipc import _connect_chan, Channel + # async with _connect_chan(host, port) as chan: + # async with open_portal(chan) as arb_portal: + # yield arb_portal + async with ( - maybe_open_runtime(loglevel, **kwargs), - tractor.find_actor(_root_dname) as portal + open_piker_runtime( + name=query_name, + registry_addr=registry_addr, + loglevel=loglevel, + **kwargs, + ) as _, + tractor.find_actor( + _root_dname, + arbiter_sockaddr=registry_addr, + ) as portal ): # connect to any existing daemon presuming # its registry socket was selected. if ( portal is not None - and ( - registry_addr is None - or portal.channel.raddr == registry_addr - ) ): yield portal return @@ -333,7 +441,6 @@ async def maybe_open_pikerd( # presume pikerd role since no daemon could be found at # configured address async with open_pikerd( - loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, @@ -361,30 +468,32 @@ _data_mods = [ @acm async def find_service( service_name: str, -) -> Optional[tractor.Portal]: +) -> tractor.Portal | None: - log.info(f'Scanning for service `{service_name}`') - # attach to existing daemon by name if possible - async with tractor.find_actor( - service_name, - arbiter_sockaddr=_registry_addr or _default_reg_addr, - ) as maybe_portal: - yield maybe_portal + async with open_registry() as reg_addr: + log.info(f'Scanning for service `{service_name}`') + # attach to existing daemon by name if possible + async with tractor.find_actor( + service_name, + arbiter_sockaddr=reg_addr, + ) as maybe_portal: + yield maybe_portal async def check_for_service( service_name: str, -) -> bool: +) -> None | tuple[str, int]: ''' Service daemon "liveness" predicate. ''' - async with tractor.query_actor( - service_name, - arbiter_sockaddr=_registry_addr or _default_reg_addr, - ) as sockaddr: - return sockaddr + async with open_registry(ensure_exists=False) as reg_addr: + async with tractor.query_actor( + service_name, + arbiter_sockaddr=reg_addr, + ) as sockaddr: + return sockaddr @acm