diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index d833a535..4fefaae6 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -134,65 +134,65 @@ def pikerd( Spawn the piker broker-daemon. ''' - from tractor.devx import maybe_open_crash_handler - with maybe_open_crash_handler(pdb=pdb): - log = get_console_log(loglevel, name='cli') + # from tractor.devx import maybe_open_crash_handler + # with maybe_open_crash_handler(pdb=False): + log = get_console_log(loglevel, name='cli') - if pdb: - log.warning(( - "\n" - "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" - "When a `piker` daemon crashes it will block the " - "task-thread until resumed from console!\n" - "\n" + if pdb: + log.warning(( + "\n" + "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" + "When a `piker` daemon crashes it will block the " + "task-thread until resumed from console!\n" + "\n" + )) + + # service-actor registry endpoint socket-address set + regaddrs: list[tuple[str, int]] = [] + + conf, _ = config.load( + conf_name='conf', + ) + network: dict = conf.get('network') + if ( + network is None + and not maddr + ): + regaddrs = [( + _default_registry_host, + _default_registry_port, + )] + + else: + eps: dict = load_trans_eps( + network, + maddr, + ) + for layers in eps['pikerd']: + regaddrs.append(( + layers['ipv4']['addr'], + layers['tcp']['port'], )) - # service-actor registry endpoint socket-address set - regaddrs: list[tuple[str, int]] = [] + from .. import service - conf, _ = config.load( - conf_name='conf', - ) - network: dict = conf.get('network') - if ( - network is None - and not maddr + async def main(): + service_mngr: service.Services + async with ( + service.open_pikerd( + registry_addrs=regaddrs, + loglevel=loglevel, + debug_mode=pdb, + enable_transports=['uds'], + # enable_transports=['tcp'], + ) as service_mngr, ): - regaddrs = [( - _default_registry_host, - _default_registry_port, - )] + assert service_mngr + # ?TODO? spawn all other sub-actor daemons according to + # multiaddress endpoint spec defined by user config + await trio.sleep_forever() - else: - eps: dict = load_trans_eps( - network, - maddr, - ) - for layers in eps['pikerd']: - regaddrs.append(( - layers['ipv4']['addr'], - layers['tcp']['port'], - )) - - from .. import service - - async def main(): - service_mngr: service.Services - - async with ( - service.open_pikerd( - registry_addrs=regaddrs, - loglevel=loglevel, - debug_mode=pdb, - enable_transports=['uds'], - ) as service_mngr, - ): - assert service_mngr - # ?TODO? spawn all other sub-actor daemons according to - # multiaddress endpoint spec defined by user config - await trio.sleep_forever() - - trio.run(main) + trio.run(main) @click.group(context_settings=config._context_defaults) @@ -307,6 +307,10 @@ def services(config, tl, ports): if not ports: ports = [_default_registry_port] + addr = tractor._addr.wrap_address( + addr=(host, ports[0]) + ) + async def list_services(): nonlocal host async with ( @@ -315,15 +319,17 @@ def services(config, tl, ports): loglevel=config['loglevel'] if tl else None, ), tractor.get_registry( - host=host, - port=ports[0] + addr=addr, ) as portal ): - registry = await portal.run_from_ns('self', 'get_registry') + registry = await portal.run_from_ns( + 'self', + 'get_registry', + ) json_d = {} for key, socket in registry.items(): - host, port = socket - json_d[key] = f'{host}:{port}' + json_d[key] = f'{socket}' + click.echo(f"{colorize_json(json_d)}") trio.run(list_services) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 91157451..33f23453 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -107,17 +107,22 @@ async def open_piker_runtime( async with ( tractor.open_root_actor( - # passed through to ``open_root_actor`` + # passed through to `open_root_actor` registry_addrs=registry_addrs, name=name, + start_method=start_method, loglevel=loglevel, debug_mode=debug_mode, - start_method=start_method, + + # XXX NOTE MEMBER DAT der's a perf hit yo!! + # https://greenback.readthedocs.io/en/latest/principle.html#performance + maybe_enable_greenback=True, # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? enable_modules=enable_modules, + hide_tb=False, **tractor_kwargs, ) as actor, @@ -257,7 +262,10 @@ async def maybe_open_pikerd( loglevel: str | None = None, **kwargs, -) -> tractor._portal.Portal | ClassVar[Services]: +) -> ( + tractor._portal.Portal + |ClassVar[Services] +): ''' If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self @@ -282,10 +290,11 @@ async def maybe_open_pikerd( registry_addrs: list[tuple[str, int]] = ( registry_addrs - or [_default_reg_addr] + or + [_default_reg_addr] ) - pikerd_portal: tractor.Portal | None + pikerd_portal: tractor.Portal|None async with ( open_piker_runtime( name=query_name, diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 1e7ff096..89d7f28d 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -28,6 +28,7 @@ from contextlib import ( ) import tractor +from trio.lowlevel import current_task from ._util import ( log, # sub-sys logger @@ -70,69 +71,84 @@ async def maybe_spawn_daemon( lock = Services.locks[service_name] await lock.acquire() - async with find_service( - service_name, - registry_addrs=[('127.0.0.1', 6116)], - ) as portal: - if portal is not None: - lock.release() - yield portal - return + try: + async with find_service( + service_name, + registry_addrs=[('127.0.0.1', 6116)], + ) as portal: + if portal is not None: + lock.release() + yield portal + return - log.warning( - f"Couldn't find any existing {service_name}\n" - 'Attempting to spawn new daemon-service..' - ) + log.warning( + f"Couldn't find any existing {service_name}\n" + 'Attempting to spawn new daemon-service..' + ) - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( - loglevel=loglevel, - **pikerd_kwargs, + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel, + **pikerd_kwargs, - ) as pikerd_portal: + ) as pikerd_portal: - # we are the root and thus are `pikerd` - # so spawn the target service directly by calling - # the provided target routine. - # XXX: this assumes that the target is well formed and will - # do the right things to setup both a sub-actor **and** call - # the ``_Services`` api from above to start the top level - # service task for that actor. - started: bool - if pikerd_portal is None: - started = await service_task_target( - loglevel=loglevel, - **spawn_args, + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + started: bool + if pikerd_portal is None: + started = await service_task_target( + loglevel=loglevel, + **spawn_args, + ) + + else: + # request a remote `pikerd` (service manager) to start the + # target daemon-task, the target can't return + # a non-serializable value since it is expected that service + # starting is non-blocking and the target task will persist + # running "under" or "within" the `pikerd` actor tree after + # the questing client disconnects. in other words this + # spawns a persistent daemon actor that continues to live + # for the lifespan of whatever the service manager inside + # `pikerd` says it should. + started = await pikerd_portal.run( + service_task_target, + loglevel=loglevel, + **spawn_args, + ) + + if started: + log.info(f'Service {service_name} started!') + + # block until we can discover (by IPC connection) to the newly + # spawned daemon-actor and then deliver the portal to the + # caller. + async with tractor.wait_for_actor(service_name) as portal: + lock.release() + yield portal + await portal.cancel_actor() + + except BaseException as _err: + err = _err + if ( + lock.locked() + and + lock.statistics().owner is current_task() + ): + log.exception( + f'Releasing stale lock after crash..?' + f'{err!r}\n' ) - - else: - # request a remote `pikerd` (service manager) to start the - # target daemon-task, the target can't return - # a non-serializable value since it is expected that service - # starting is non-blocking and the target task will persist - # running "under" or "within" the `pikerd` actor tree after - # the questing client disconnects. in other words this - # spawns a persistent daemon actor that continues to live - # for the lifespan of whatever the service manager inside - # `pikerd` says it should. - started = await pikerd_portal.run( - service_task_target, - loglevel=loglevel, - **spawn_args, - ) - - if started: - log.info(f'Service {service_name} started!') - - # block until we can discover (by IPC connection) to the newly - # spawned daemon-actor and then deliver the portal to the - # caller. - async with tractor.wait_for_actor(service_name) as portal: lock.release() - yield portal - await portal.cancel_actor() + raise err async def spawn_emsd( diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 89e98411..726a34c8 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -109,7 +109,7 @@ class Services: # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res: Any = await ctx.result() + ctx_res: Any = await ctx.wait_for_result() # NOTE: blocks indefinitely until cancelled # either by error from the target context diff --git a/piker/service/_registry.py b/piker/service/_registry.py index ed4569f7..94ccbc68 100644 --- a/piker/service/_registry.py +++ b/piker/service/_registry.py @@ -101,13 +101,15 @@ async def open_registry( if ( not tractor.is_root_process() - and not Registry.addrs + and + not Registry.addrs ): Registry.addrs.extend(actor.reg_addrs) if ( ensure_exists - and not Registry.addrs + and + not Registry.addrs ): raise RuntimeError( f"`{uid}` registry should already exist but doesn't?" @@ -146,7 +148,7 @@ async def find_service( | list[Portal] | None ): - + # try: reg_addrs: list[tuple[str, int]] async with open_registry( addrs=( @@ -157,22 +159,39 @@ async def find_service( or Registry.addrs ), ) as reg_addrs: - log.info(f'Scanning for service `{service_name}`') - maybe_portals: list[Portal] | Portal | None + log.info( + f'Scanning for service {service_name!r}' + ) # attach to existing daemon by name if possible + maybe_portals: list[Portal]|Portal|None async with tractor.find_actor( service_name, registry_addrs=reg_addrs, only_first=first_only, # if set only returns single ref ) as maybe_portals: if not maybe_portals: + # log.info( + print( + f'Could NOT find service {service_name!r} -> {maybe_portals!r}' + ) yield None return + # log.info( + print( + f'Found service {service_name!r} -> {maybe_portals}' + ) yield maybe_portals + # except BaseException as _berr: + # berr = _berr + # log.exception( + # 'tractor.find_actor() failed with,\n' + # ) + # raise berr + async def check_for_service( service_name: str,