From dd3e918cfefe7cc7126843b15447d3d4682d372e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 3 Apr 2025 12:22:21 -0400 Subject: [PATCH] Mv `Actor._do_handshake()` to `Channel`, add `.aid` Finally.. i've been meaning todo this for ages since the actor-id-swap-as-handshake is better layered as part of the IPC msg-ing machinery and then let's us encapsulate the connection-time-assignment of a remote peer's `Aid` as a new `Channel.aid: Aid`. For now we continue to offer the `.uid: tuple[str, str]` attr (by delegating to the `.uid` field) since there's still a few things relying on it in the runtime and ctx layers Nice bonuses from this, - it's very easy to get the peer's `Aid.pid: int` from anywhere in an IPC ctx by just reading it from the chan. - we aren't saving more then the wire struct-msg received. Also add deprecation warnings around usage to get us moving on porting the rest of consuming runtime code to the new attr! --- tractor/_runtime.py | 87 +++++++++++++++++++++----------------------- tractor/ipc/_chan.py | 63 ++++++++++++++++++++++++++++---- 2 files changed, 97 insertions(+), 53 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index bb2ac579..3db7ad11 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -289,7 +289,9 @@ class Actor: @property def aid(self) -> msgtypes.Aid: ''' - This process-singleton-actor's "unique ID" in struct form. + This process-singleton-actor's "unique actor ID" in struct form. + + See the `tractor.msg.Aid` struct for details. ''' return self._aid @@ -308,6 +310,17 @@ class Actor: process plane. ''' + msg: str = ( + f'`{type(self).__name__}.uid` is now deprecated.\n' + 'Use the new `.aid: tractor.msg.Aid` (struct) instead ' + 'which also provides additional named (optional) fields ' + 'beyond just the `.name` and `.uuid`.' + ) + warnings.warn( + msg, + DeprecationWarning, + stacklevel=2, + ) return ( self._aid.name, self._aid.uuid, @@ -495,7 +508,9 @@ class Actor: # send/receive initial handshake response try: - uid: tuple|None = await self._do_handshake(chan) + peer_aid: msgtypes.Aid = await chan._do_handshake( + aid=self.aid, + ) except ( TransportClosed, # ^XXX NOTE, the above wraps `trio` exc types raised @@ -524,6 +539,12 @@ class Actor: ) return + uid: tuple[str, str] = ( + peer_aid.name, + peer_aid.uuid, + ) + # TODO, can we make this downstream peer tracking use the + # `peer_aid` instead? familiar: str = 'new-peer' if _pre_chan := self._peers.get(uid): familiar: str = 'pre-existing-peer' @@ -1127,9 +1148,8 @@ class Actor: ) assert isinstance(chan, Channel) - # TODO: move this into a `Channel.handshake()`? # Initial handshake: swap names. - await self._do_handshake(chan) + await chan._do_handshake(aid=self.aid) accept_addrs: list[UnwrappedAddress]|None = None @@ -1270,11 +1290,16 @@ class Actor: # -[ ] need to extend the `SpawnSpec` tho! ) - except OSError: # failed to connect + # failed to connect back? + except ( + OSError, + ConnectionError, + ): log.warning( f'Failed to connect to spawning parent actor!?\n' + f'\n' f'x=> {parent_addr}\n' - f'|_{self}\n\n' + f' |_{self}\n\n' ) await self.cancel(req_chan=None) # self cancel raise @@ -1316,13 +1341,13 @@ class Actor: if ( '[Errno 98] Address already in use' in - oserr.args[0] + oserr.args#[0] ): log.exception( f'Address already in use?\n' f'{addr}\n' ) - raise + raise listeners.append(listener) await server_n.start( @@ -1337,8 +1362,10 @@ class Actor: handler_nursery=handler_nursery ) ) - log.runtime( + # TODO, wow make this message better! XD + log.info( 'Started server(s)\n' + + '\n'.join([f'|_{addr}' for addr in listen_addrs]) ) self._listen_addrs.extend(listen_addrs) @@ -1457,8 +1484,13 @@ class Actor: if self._server_down is not None: await self._server_down.wait() else: + tpt_protos: list[str] = [] + addr: Address + for addr in self._listen_addrs: + tpt_protos.append(addr.proto_key) log.warning( - 'Transport[TCP] server was cancelled start?' + 'Transport server(s) may have been cancelled before started?\n' + f'protos: {tpt_protos!r}\n' ) # cancel all rpc tasks permanently @@ -1745,41 +1777,6 @@ class Actor: ''' return self._peers[uid] - # TODO: move to `Channel.handshake(uid)` - async def _do_handshake( - self, - chan: Channel - - ) -> msgtypes.Aid: - ''' - Exchange `(name, UUIDs)` identifiers as the first - communication step with any (peer) remote `Actor`. - - These are essentially the "mailbox addresses" found in - "actor model" parlance. - - ''' - name, uuid = self.uid - await chan.send( - msgtypes.Aid( - name=name, - uuid=uuid, - ) - ) - aid: msgtypes.Aid = await chan.recv() - chan.aid = aid - - uid: tuple[str, str] = ( - aid.name, - aid.uuid, - ) - - if not isinstance(uid, tuple): - raise ValueError(f"{uid} is not a valid uid?!") - - chan.uid = uid - return uid - def is_infected_aio(self) -> bool: ''' If `True`, this actor is running `trio` in guest mode on diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 01baf1e1..f6a50cc1 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -24,13 +24,13 @@ from contextlib import ( asynccontextmanager as acm, contextmanager as cm, ) -import os import platform from pprint import pformat import typing from typing import ( Any, ) +import warnings import trio @@ -50,7 +50,10 @@ from tractor._exceptions import ( MsgTypeError, pack_from_raise, ) -from tractor.msg import MsgCodec +from tractor.msg import ( + Aid, + MsgCodec, +) log = get_logger(__name__) @@ -86,8 +89,8 @@ class Channel: # user in ``.from_stream()``. self._transport: MsgTransport|None = transport - # set after handshake - always uid of far end - self.uid: tuple[str, str]|None = None + # set after handshake - always info from peer end + self.aid: Aid|None = None self._aiter_msgs = self._iter_msgs() self._exc: Exception|None = None @@ -99,6 +102,29 @@ class Channel: # runtime. self._cancel_called: bool = False + @property + def uid(self) -> tuple[str, str]: + ''' + Peer actor's unique id. + + ''' + msg: str = ( + f'`{type(self).__name__}.uid` is now deprecated.\n' + 'Use the new `.aid: tractor.msg.Aid` (struct) instead ' + 'which also provides additional named (optional) fields ' + 'beyond just the `.name` and `.uuid`.' + ) + warnings.warn( + msg, + DeprecationWarning, + stacklevel=2, + ) + peer_aid: Aid = self.aid + return ( + peer_aid.name, + peer_aid.uuid, + ) + @property def stream(self) -> trio.abc.Stream | None: return self._transport.stream if self._transport else None @@ -182,9 +208,7 @@ class Channel: f' _closed={self._closed}\n' f' _cancel_called={self._cancel_called}\n' f'\n' - f' |_runtime: Actor\n' - f' pid={os.getpid()}\n' - f' uid={self.uid}\n' + f' |_peer: {self.aid}\n' f'\n' f' |_msgstream: {tpt_name}\n' f' proto={tpt.laddr.proto_key!r}\n' @@ -281,7 +305,7 @@ class Channel: async def aclose(self) -> None: log.transport( - f'Closing channel to {self.uid} ' + f'Closing channel to {self.aid} ' f'{self.laddr} -> {self.raddr}' ) assert self._transport @@ -381,6 +405,29 @@ class Channel: def connected(self) -> bool: return self._transport.connected() if self._transport else False + async def _do_handshake( + self, + aid: Aid, + + ) -> Aid: + ''' + Exchange `(name, UUIDs)` identifiers as the first + communication step with any (peer) remote `Actor`. + + These are essentially the "mailbox addresses" found in + "actor model" parlance. + + ''' + await self.send(aid) + peer_aid: Aid = await self.recv() + log.runtime( + f'Received hanshake with peer actor,\n' + f'{peer_aid}\n' + ) + # NOTE, we always are referencing the remote peer! + self.aid = peer_aid + return peer_aid + @acm async def _connect_chan(