diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 2e75cff..5f4da96 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -553,12 +553,6 @@ class Actor: ) registry_addrs: list[tuple[str, int]] = [arbiter_addr] - self._reg_addrs: list[tuple[str, int]] = ( - registry_addrs - or - None - ) - # marked by the process spawning backend at startup # will be None for the parent most process started manually # by the user (currently called the "arbiter") @@ -591,6 +585,44 @@ class Actor: ActorNursery | None, ] = {} # type: ignore # noqa + # when provided, init the registry addresses property from + # input via the validator. + self._reg_addrs: list[tuple[str, int]] = [] + if registry_addrs: + self.reg_addrs: list[tuple[str, int]] = registry_addrs + + @property + def reg_addrs(self) -> list[tuple[str, int]]: + ''' + List of (socket) addresses for all known (and contactable) + registry actors. + + ''' + return self._reg_addrs + + @reg_addrs.setter + def reg_addrs( + self, + addrs: list[tuple[str, int]], + ) -> None: + if not addrs: + log.warning( + 'Empty registry address list is invalid:\n' + f'{addrs}' + ) + return + + # always sanity check the input list since it's critical + # that addrs are correct for discovery sys operation. + for addr in addrs: + if not isinstance(addr, tuple): + raise ValueError( + 'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n' + f'Got {addrs}' + ) + + self._reg_addrs = addrs + async def wait_for_peer( self, uid: tuple[str, str] ) -> tuple[trio.Event, Channel]: @@ -670,9 +702,10 @@ class Actor: stream: trio.SocketStream, ) -> None: - """Entry point for new inbound connections to the channel server. + ''' + Entry point for new inbound connections to the channel server. - """ + ''' self._no_more_peers = trio.Event() # unset chan = Channel.from_stream(stream) @@ -792,17 +825,21 @@ class Actor: if disconnected: # if the transport died and this actor is still - # registered within a local nursery, we report that the - # IPC layer may have failed unexpectedly since it may be - # the cause of other downstream errors. + # registered within a local nursery, we report + # that the IPC layer may have failed + # unexpectedly since it may be the cause of + # other downstream errors. entry = local_nursery._children.get(uid) if entry: _, proc, _ = entry - poll = getattr(proc, 'poll', None) - if poll and poll() is None: + if ( + (poll := getattr(proc, 'poll', None)) + and poll() is None + ): log.cancel( - f'Actor {uid} IPC broke but proc is alive?' + f'Actor {uid} IPC broke but proc is alive?\n' + 'Attempting to self cancel..' ) # ``Channel`` teardown and closure sequence @@ -1016,14 +1053,18 @@ class Actor: _state._runtime_vars.update(rvs) for attr, value in parent_data.items(): - - if attr == '_reg_addrs': + if ( + attr == 'reg_addrs' + and value + ): # XXX: ``msgspec`` doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - self._reg_addrs = [ - tuple(val) for val in value - ] if value else None + # TODO: we don't really NEED these as + # tuples so we can probably drop this + # casting since apparently in python lists + # are "more efficient"? + self.reg_addrs = [tuple(val) for val in value] else: setattr(self, attr, value) @@ -1099,7 +1140,10 @@ class Actor: ''' assert self._service_n - self._service_n.start_soon(self.cancel) + self._service_n.start_soon( + self.cancel, + self.uid, + ) async def cancel( self, @@ -1445,9 +1489,12 @@ async def async_main( # if addresses point to the same actor.. # So we need a way to detect that? maybe iterate # only on unique actor uids? - for addr in actor._reg_addrs: - assert isinstance(addr, tuple) - assert addr[1] # non-zero after bind + for addr in actor.reg_addrs: + try: + assert isinstance(addr, tuple) + assert addr[1] # non-zero after bind + except AssertionError: + await _debug.pause() async with get_registry(*addr) as reg_portal: for accept_addr in accept_addrs: @@ -1500,12 +1547,14 @@ async def async_main( # once we have that all working with std streams locking? log.exception( f"Actor errored and failed to register with arbiter " - f"@ {actor._reg_addrs[0]}?") + f"@ {actor.reg_addrs[0]}?") log.error( - "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" - "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" - "\tIf this is a sub-actor likely its parent will keep running " - "\tcorrectly if this error is caught and ignored.." + "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" + "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" + "\tIf this is a sub-actor hopefully its parent will keep running " + "correctly presuming this error was safely ignored..\n\n" + "\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: " + "https://github.com/goodboy/tractor/issues\n" ) if actor._parent_chan: @@ -1546,7 +1595,7 @@ async def async_main( and not actor.is_registrar ): failed: bool = False - for addr in actor._reg_addrs: + for addr in actor.reg_addrs: assert isinstance(addr, tuple) with trio.move_on_after(0.5) as cs: cs.shield = True