diff --git a/piker/_daemon.py b/piker/_daemon.py index 4e13e1ec..9bfefca8 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,7 +19,10 @@ Structured, daemon tree service management. """ from typing import Optional, Union, Callable, Any -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) from collections import defaultdict import tractor @@ -57,12 +60,21 @@ _root_modules = [ ] +# TODO: factor this into a ``tractor.highlevel`` extension +# pack for the library. class Services: actor_n: tractor._supervise.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag - service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} + service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + tractor.Portal, + trio.Event, + ] + ] = {} locks = defaultdict(trio.Lock) @classmethod @@ -84,7 +96,12 @@ class Services: ''' async def open_context_in_task( task_status: TaskStatus[ - trio.CancelScope] = trio.TASK_STATUS_IGNORED, + tuple[ + trio.CancelScope, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, ) -> Any: @@ -96,28 +113,33 @@ class Services: ) as (ctx, first): # unblock once the remote context has started - task_status.started((cs, first)) + complete = trio.Event() + task_status.started((cs, complete, first)) log.info( f'`pikerd` service {name} started with value {first}' ) try: # wait on any context's return value + # and any final portal result from the + # sub-actor. ctx_res = await ctx.result() - except tractor.ContextCancelled: - return await self.cancel_service(name) - else: - # wait on any error from the sub-actor - # NOTE: this will block indefinitely until - # cancelled either by error from the target - # context function or by being cancelled here by - # the surrounding cancel scope + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. return (await portal.result(), ctx_res) - cs, first = await self.service_n.start(open_context_in_task) + finally: + await portal.cancel_actor() + complete.set() + self.service_tasks.pop(name) + + cs, complete, first = await self.service_n.start(open_context_in_task) # store the cancel scope and portal for later cancellation or # retstart if needed. - self.service_tasks[name] = (cs, portal) + self.service_tasks[name] = (cs, portal, complete) return cs, first @@ -127,13 +149,38 @@ class Services: name: str, ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' log.info(f'Cancelling `pikerd` service {name}') - cs, portal = self.service_tasks[name] - # XXX: not entirely sure why this is required, - # and should probably be better fine tuned in - # ``tractor``? + cs, portal, complete = self.service_tasks[name] cs.cancel() - return await portal.cancel_actor() + await complete.wait() + assert name not in self.service_tasks, \ + f'Serice task for {name} not terminated?' + + +@cm +def maybe_set_global_registry_sockaddr( + registry_addr: None | tuple[str, int] = None, +) -> None: + + global _registry_addr + was_set: bool = False + if ( + _registry_addr is None + or registry_addr + ): + _registry_addr = registry_addr or _default_reg_addr + try: + yield _registry_addr + finally: + # XXX: always clear the global addr if we set it so that the + # next (set of) calls will apply whatever new one is passed + # in. + if was_set: + _registry_addr = None @acm @@ -155,40 +202,38 @@ async def open_pikerd( alive underling services (see below). ''' - global _registry_addr - if ( - _registry_addr is None - or registry_addr - ): - _registry_addr = registry_addr or _default_reg_addr + with maybe_set_global_registry_sockaddr(registry_addr) as reg_addr: + async with ( + tractor.open_root_actor( - # XXX: this may open a root actor as well - async with ( - tractor.open_root_actor( + # passed through to ``open_root_actor`` + arbiter_addr=reg_addr, + name=_root_dname, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, - # passed through to ``open_root_actor`` - arbiter_addr=_registry_addr, - name=_root_dname, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules, + ) as _, - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules, - ) as _, + tractor.open_nursery() as actor_nursery, + ): + async with trio.open_nursery() as service_nursery: - tractor.open_nursery() as actor_nursery, - ): - async with trio.open_nursery() as service_nursery: - - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_nursery - Services.debug_mode = debug_mode - yield + # assign globally for future daemon/task creation + Services.actor_n = actor_nursery + Services.service_n = service_nursery + Services.debug_mode = debug_mode + try: + yield + finally: + # if 'samplerd' in Services.service_tasks: + # await Services.cancel_service('samplerd') + service_nursery.cancel_scope.cancel() @acm @@ -209,32 +254,24 @@ async def open_piker_runtime( existing piker actors on the local link based on configuration. ''' - global _registry_addr + with maybe_set_global_registry_sockaddr(registry_addr) as reg_addr: + async with ( + tractor.open_root_actor( - if ( - _registry_addr is None - or registry_addr - ): - _registry_addr = registry_addr or _default_reg_addr + # passed through to ``open_root_actor`` + arbiter_addr=reg_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, - # XXX: this may open a root actor as well - async with ( - tractor.open_root_actor( - - # passed through to ``open_root_actor`` - arbiter_addr=_registry_addr, - name=name, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, - - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules + enable_modules, - ) as _, - ): - yield tractor.current_actor() + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules + enable_modules, + ) as _, + ): + yield tractor.current_actor() @acm @@ -325,6 +362,11 @@ async def find_service( service_name: str, ) -> Optional[tractor.Portal]: + global _registry_addr + if not _registry_addr: + yield None + return + log.info(f'Scanning for service `{service_name}`') # attach to existing daemon by name if possible async with tractor.find_actor( @@ -342,6 +384,10 @@ async def check_for_service( Service daemon "liveness" predicate. ''' + global _registry_addr + if not _registry_addr: + return None + async with tractor.query_actor( service_name, arbiter_sockaddr=_registry_addr,