diff --git a/tractor/_root.py b/tractor/_root.py index 51dfc660..364e4563 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -202,7 +202,9 @@ async def open_root_actor( ''' # XXX NEVER allow nested actor-trees! - if already_actor := _state.current_actor(err_on_no_runtime=False): + if already_actor := _state.current_actor( + err_on_no_runtime=False, + ): rtvs: dict[str, Any] = _state._runtime_vars root_mailbox: list[str, int] = rtvs['_root_mailbox'] registry_addrs: list[list[str, int]] = rtvs['_registry_addrs'] @@ -272,14 +274,20 @@ async def open_root_actor( DeprecationWarning, stacklevel=2, ) - registry_addrs = [arbiter_addr] + uw_reg_addrs = [arbiter_addr] - if not registry_addrs: - registry_addrs: list[UnwrappedAddress] = default_lo_addrs( + uw_reg_addrs = registry_addrs + if not uw_reg_addrs: + uw_reg_addrs: list[UnwrappedAddress] = default_lo_addrs( enable_transports ) - assert registry_addrs + # must exist by now since all below code is dependent + assert uw_reg_addrs + registry_addrs: list[Address] = [ + wrap_address(uw_addr) + for uw_addr in uw_reg_addrs + ] loglevel = ( loglevel @@ -328,10 +336,10 @@ async def open_root_actor( enable_stack_on_sig() # closed into below ping task-func - ponged_addrs: list[UnwrappedAddress] = [] + ponged_addrs: list[Address] = [] async def ping_tpt_socket( - addr: UnwrappedAddress, + addr: Address, timeout: float = 1, ) -> None: ''' @@ -351,17 +359,22 @@ async def open_root_actor( # be better to eventually have a "discovery" protocol # with basic handshake instead? with trio.move_on_after(timeout): - async with _connect_chan(addr): + async with _connect_chan(addr.unwrap()): ponged_addrs.append(addr) except OSError: - # TODO: make this a "discovery" log level? + # ?TODO, make this a "discovery" log level? logger.info( - f'No actor registry found @ {addr}\n' + f'No root-actor registry found @ {addr!r}\n' ) + # !TODO, this is basically just another (abstract) + # happy-eyeballs, so we should try for formalize it somewhere + # in a `.[_]discovery` ya? + # async with trio.open_nursery() as tn: - for addr in registry_addrs: + for uw_addr in uw_reg_addrs: + addr: Address = wrap_address(uw_addr) tn.start_soon( ping_tpt_socket, addr, @@ -390,24 +403,28 @@ async def open_root_actor( loglevel=loglevel, enable_modules=enable_modules, ) - # DO NOT use the registry_addrs as the transport server - # addrs for this new non-registar, root-actor. + # **DO NOT** use the registry_addrs as the + # ipc-transport-server's bind-addrs as this is + # a new NON-registrar, ROOT-actor. + # + # XXX INSTEAD, bind random addrs using the same tpt + # proto. for addr in ponged_addrs: - waddr: Address = wrap_address(addr) trans_bind_addrs.append( - waddr.get_random(bindspace=waddr.bindspace) + addr.get_random( + bindspace=addr.bindspace, + ) ) # Start this local actor as the "registrar", aka a regular # actor who manages the local registry of "mailboxes" of # other process-tree-local sub-actors. else: - # NOTE that if the current actor IS THE REGISTAR, the # following init steps are taken: # - the tranport layer server is bound to each addr # pair defined in provided registry_addrs, or the default. - trans_bind_addrs = registry_addrs + trans_bind_addrs = uw_reg_addrs # - it is normally desirable for any registrar to stay up # indefinitely until either all registered (child/sub) @@ -431,6 +448,16 @@ async def open_root_actor( # `.trio.run()`. actor._infected_aio = _state._runtime_vars['_is_infected_aio'] + # NOTE, only set the loopback addr for the + # process-tree-global "root" mailbox since all sub-actors + # should be able to speak to their root actor over that + # channel. + raddrs: list[Address] = _state._runtime_vars['_root_addrs'] + raddrs.extend(trans_bind_addrs) + # TODO, remove once we have also removed all usage; + # eventually all (root-)registry apis should expect > 1 addr. + _state._runtime_vars['_root_mailbox'] = raddrs[0] + # Start up main task set via core actor-runtime nurseries. try: # assign process-local actor @@ -438,20 +465,26 @@ async def open_root_actor( # start local channel-server and fake the portal API # NOTE: this won't block since we provide the nursery - ml_addrs_str: str = '\n'.join( - f'@{addr}' for addr in trans_bind_addrs - ) - logger.info( - f'Starting local {actor.uid} on the following transport addrs:\n' - f'{ml_addrs_str}' - ) + report: str = f'Starting actor-runtime for {actor.aid.reprol()!r}\n' + if reg_addrs := actor.registry_addrs: + report += ( + '-> Opening new registry @ ' + + + '\n'.join( + f'@{addr}' for addr in reg_addrs + ) + ) + logger.info(f'{report}\n') # start runtime in a bg sub-task, yield to caller. async with ( collapse_eg(), trio.open_nursery() as root_tn, - ): + # XXX, finally-footgun below? + # -> see note on why shielding. + # maybe_raise_from_masking_exc(), + ): # `_runtime.async_main()` creates an internal nursery # and blocks here until any underlying actor(-process) # tree has terminated thereby conducting so called @@ -526,9 +559,14 @@ async def open_root_actor( ) logger.info( f'Closing down root actor\n' - f'{op_nested_actor_repr}\n' + f'{op_nested_actor_repr}' ) - await actor.cancel(None) # self cancel + # XXX, THIS IS A *finally-footgun*! + # -> though already shields iternally it can + # taskc here and mask underlying errors raised in + # the try-block above? + with trio.CancelScope(shield=True): + await actor.cancel(None) # self cancel finally: # revert all process-global runtime state if ( @@ -541,10 +579,16 @@ async def open_root_actor( _state._current_actor = None _state._last_actor_terminated = actor - logger.runtime( + sclang_repr: str = _pformat.nest_from_op( + input_op=')>', + text=actor.pformat(), + nest_prefix='|_', + nest_indent=1, + ) + + logger.info( f'Root actor terminated\n' - f')>\n' - f' |_{actor}\n' + f'{sclang_repr}' ) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index fda3a5c5..bbb85799 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -213,7 +213,7 @@ class Actor: *, enable_modules: list[str] = [], loglevel: str|None = None, - registry_addrs: list[UnwrappedAddress]|None = None, + registry_addrs: list[Address]|None = None, spawn_method: str|None = None, # TODO: remove! @@ -256,11 +256,12 @@ class Actor: if arbiter_addr is not None: warnings.warn( '`Actor(arbiter_addr=)` is now deprecated.\n' - 'Use `registry_addrs: list[tuple]` instead.', + 'Use `registry_addrs: list[Address]` instead.', DeprecationWarning, stacklevel=2, ) - registry_addrs: list[UnwrappedAddress] = [arbiter_addr] + + registry_addrs: list[Address] = [wrap_address(arbiter_addr)] # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -299,8 +300,10 @@ class Actor: # input via the validator. self._reg_addrs: list[UnwrappedAddress] = [] if registry_addrs: - self.reg_addrs: list[UnwrappedAddress] = registry_addrs - _state._runtime_vars['_registry_addrs'] = registry_addrs + _state._runtime_vars['_registry_addrs'] = self.reg_addrs = [ + addr.unwrap() + for addr in registry_addrs + ] @property def aid(self) -> msgtypes.Aid: @@ -472,7 +475,11 @@ class Actor: def reg_addrs(self) -> list[UnwrappedAddress]: ''' List of (socket) addresses for all known (and contactable) - registry actors. + registry-service actors in "unwrapped" (i.e. IPC interchange + wire-compat) form. + + If you are looking for the "wrapped" address form, use + `.registry_addrs` instead. ''' return self._reg_addrs @@ -491,8 +498,14 @@ class Actor: self._reg_addrs = addrs + @property + def registry_addrs(self) -> list[Address]: + return [wrap_address(uw_addr) + for uw_addr in self.reg_addrs] + def load_modules( self, + ) -> None: ''' Load explicitly enabled python modules from local fs after @@ -1365,7 +1378,7 @@ class Actor: Return all IPC channels to the actor with provided `uid`. ''' - return self._peers[uid] + return self._ipc_server._peers[uid] def is_infected_aio(self) -> bool: ''' @@ -1420,6 +1433,8 @@ async def async_main( # establish primary connection with immediate parent actor._parent_chan: Channel|None = None + # is this a sub-actor? + # get runtime info from parent. if parent_addr is not None: ( actor._parent_chan, @@ -1464,10 +1479,8 @@ async def async_main( ipc_server: _server.IPCServer async with ( - trio.open_nursery( - strict_exception_groups=False, - ) as service_nursery, - + collapse_eg(), + trio.open_nursery() as service_nursery, _server.open_ipc_server( parent_tn=service_nursery, stream_handler_tn=service_nursery, @@ -1518,9 +1531,6 @@ async def async_main( # TODO: why is this not with the root nursery? try: - log.runtime( - 'Booting IPC server' - ) eps: list = await ipc_server.listen_on( accept_addrs=accept_addrs, stream_handler_nursery=service_nursery, @@ -1552,18 +1562,6 @@ async def async_main( # TODO, just read direct from ipc_server? accept_addrs: list[UnwrappedAddress] = actor.accept_addrs - # NOTE: only set the loopback addr for the - # process-tree-global "root" mailbox since - # all sub-actors should be able to speak to - # their root actor over that channel. - if _state._runtime_vars['_is_root']: - raddrs: list[Address] = _state._runtime_vars['_root_addrs'] - for addr in accept_addrs: - waddr: Address = wrap_address(addr) - raddrs.append(addr) - else: - _state._runtime_vars['_root_mailbox'] = raddrs[0] - # Register with the arbiter if we're told its addr log.runtime( f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' @@ -1581,6 +1579,7 @@ async def async_main( except AssertionError: await debug.pause() + # !TODO, get rid of the local-portal crap XD async with get_registry(addr) as reg_portal: for accept_addr in accept_addrs: accept_addr = wrap_address(accept_addr) @@ -1619,7 +1618,7 @@ async def async_main( log.runtime( 'Service nursery complete\n' '\n' - '-> Waiting on root nursery to complete' + '->} waiting on root nursery to complete..\n' ) # Blocks here as expected until the root nursery is @@ -1674,6 +1673,7 @@ async def async_main( finally: teardown_report: str = ( 'Main actor-runtime task completed\n' + '\n' ) # ?TODO? should this be in `._entry`/`._root` mods instead? @@ -1715,7 +1715,8 @@ async def async_main( # Unregister actor from the registry-sys / registrar. if ( is_registered - and not actor.is_registrar + and + not actor.is_registrar ): failed: bool = False for addr in actor.reg_addrs: @@ -1750,7 +1751,8 @@ async def async_main( ipc_server.has_peers(check_chans=True) ): teardown_report += ( - f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n' + f'-> Waiting for remaining peers to clear..\n' + f' {pformat(ipc_server._peers)}' ) log.runtime(teardown_report) await ipc_server.wait_for_no_more_peers( @@ -1758,20 +1760,23 @@ async def async_main( ) teardown_report += ( - '-> All peer channels are complete\n' + '-]> all peer channels are complete.\n' ) - op_nested_actor_repr: str = _pformat.nest_from_op( - input_op=')>', - text=actor.pformat(), - nest_prefix='|_', - nest_indent=1, # under > - ) + # op_nested_actor_repr: str = _pformat.nest_from_op( + # input_op=')>', + # text=actor.pformat(), + # nest_prefix='|_', + # nest_indent=1, # under > + # ) teardown_report += ( - 'Actor runtime exited\n' - f'{op_nested_actor_repr}' + '-)> actor runtime main task exit.\n' + # f'{op_nested_actor_repr}' ) - log.info(teardown_report) + # if _state._runtime_vars['_is_root']: + # log.info(teardown_report) + # else: + log.runtime(teardown_report) # TODO: rename to `Registry` and move to `.discovery._registry`!