diff --git a/tractor/_runtime.py b/tractor/_runtime.py index f0489814..bb2ac579 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -200,9 +200,14 @@ class Actor: phase (aka before a new process is executed). ''' - self.name = name - self.uid = (name, uuid) + self._aid = msgtypes.Aid( + name=name, + uuid=uuid, + pid=os.getpid(), + ) + self._task: trio.Task|None = None + # state self._cancel_complete = trio.Event() self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called: bool = False @@ -281,6 +286,77 @@ class Actor: self.reg_addrs: list[UnwrappedAddress] = registry_addrs _state._runtime_vars['_registry_addrs'] = registry_addrs + @property + def aid(self) -> msgtypes.Aid: + ''' + This process-singleton-actor's "unique ID" in struct form. + + ''' + return self._aid + + @property + def name(self) -> str: + return self._aid.name + + @property + def uid(self) -> tuple[str, str]: + ''' + This process-singleton's "unique (cross-host) ID". + + Delivered from the `.Aid.name/.uuid` fields as a `tuple` pair + and should be multi-host unique despite a large distributed + process plane. + + ''' + return ( + self._aid.name, + self._aid.uuid, + ) + + @property + def pid(self) -> int: + return self._aid.pid + + def pformat(self) -> str: + ds: str = '=' + parent_uid: tuple|None = None + if rent_chan := self._parent_chan: + parent_uid = rent_chan.uid + peers: list[tuple] = list(self._peer_connected) + listen_addrs: str = pformat(self._listen_addrs) + fmtstr: str = ( + f' |_id: {self.aid!r}\n' + # f" aid{ds}{self.aid!r}\n" + f" parent{ds}{parent_uid}\n" + f'\n' + f' |_ipc: {len(peers)!r} connected peers\n' + f" peers{ds}{peers!r}\n" + f" _listen_addrs{ds}'{listen_addrs}'\n" + f" _listeners{ds}'{self._listeners}'\n" + f'\n' + f' |_rpc: {len(self._rpc_tasks)} tasks\n' + f" ctxs{ds}{len(self._contexts)}\n" + f'\n' + f' |_runtime: ._task{ds}{self._task!r}\n' + f' _spawn_method{ds}{self._spawn_method}\n' + f' _actoruid2nursery{ds}{self._actoruid2nursery}\n' + f' _forkserver_info{ds}{self._forkserver_info}\n' + f'\n' + f' |_state: "TODO: .repr_state()"\n' + f' _cancel_complete{ds}{self._cancel_complete}\n' + f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n' + f' _cancel_called{ds}{self._cancel_called}\n' + ) + return ( + '\n' + ) + + __repr__ = pformat + @property def reg_addrs(self) -> list[UnwrappedAddress]: ''' @@ -421,12 +497,19 @@ class Actor: try: uid: tuple|None = await self._do_handshake(chan) except ( - # we need this for ``msgspec`` for some reason? - # for now, it's been put in the stream backend. + TransportClosed, + # ^XXX NOTE, the above wraps `trio` exc types raised + # during various `SocketStream.send/receive_xx()` calls + # under different fault conditions such as, + # # trio.BrokenResourceError, # trio.ClosedResourceError, - - TransportClosed, + # + # Inside our `.ipc._transport` layer we absorb and + # re-raise our own `TransportClosed` exc such that this + # higher level runtime code can only worry one + # "kinda-error" that we expect to tolerate during + # discovery-sys related pings, queires, DoS etc. ): # XXX: This may propagate up from `Channel._aiter_recv()` # and `MsgpackStream._inter_packets()` on a read from the @@ -1205,7 +1288,8 @@ class Actor: task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: ''' - Start the IPC transport server, begin listening for new connections. + Start the IPC transport server, begin listening/accepting new + `trio.SocketStream` connections. This will cause an actor to continue living (and thus blocking at the process/OS-thread level) until @@ -1223,10 +1307,24 @@ class Actor: self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: - listeners: list[trio.abc.Listener] = [ - await addr.open_listener() - for addr in listen_addrs - ] + + listeners: list[trio.abc.Listener] = [] + for addr in listen_addrs: + try: + listener: trio.abc.Listener = await addr.open_listener() + except OSError as oserr: + if ( + '[Errno 98] Address already in use' + in + oserr.args[0] + ): + log.exception( + f'Address already in use?\n' + f'{addr}\n' + ) + raise + listeners.append(listener) + await server_n.start( partial( trio.serve_listeners, @@ -1249,8 +1347,10 @@ class Actor: task_status.started(server_n) finally: + addr: Address for addr in listen_addrs: - await addr.close_listener() + addr.close_listener() + # signal the server is down since nursery above terminated self._server_down.set() @@ -1717,6 +1817,8 @@ async def async_main( the actor's "runtime" and all thus all ongoing RPC tasks. ''' + actor._task: trio.Task = trio.lowlevel.current_task() + # attempt to retreive ``trio``'s sigint handler and stash it # on our debugger state. _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) @@ -1726,18 +1828,17 @@ async def async_main( # establish primary connection with immediate parent actor._parent_chan: Channel|None = None - if parent_addr is not None: + if parent_addr is not None: ( actor._parent_chan, set_accept_addr_says_rent, maybe_preferred_transports_says_rent, ) = await actor._from_parent(parent_addr) - + accept_addrs: list[UnwrappedAddress] = [] # either it's passed in because we're not a child or # because we're running in mp mode - accept_addrs: list[UnwrappedAddress] = [] if ( set_accept_addr_says_rent and diff --git a/tractor/msg/types.py b/tractor/msg/types.py index e082d950..d71fb7e0 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -143,6 +143,7 @@ class Aid( ''' name: str uuid: str + pid: int|None = None # TODO? can/should we extend this field set? # -[ ] use built-in support for UUIDs? `uuid.UUID` which has