From 23aa97692ec44e90fbdb79429738e9a31e16b757 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 29 Feb 2024 18:20:41 -0500 Subject: [PATCH] Fix `Channel.__repr__()` safety, renames to `._transport` Hit a reallly weird bug in the `._runtime` IPC msg handling loop where it seems that by `str.format()`-ing a `Channel` before initializing it would put the `._MsgTransport._agen()` in an already started state causing an irrecoverable core startup failure.. I presume it's something to do with delegating to the `MsgpackTCPStream.__repr__()` and, something something.. the `.set_msg_transport(stream)` getting called to too early such that `.msgstream.__init__()` is called thus init-ing the `._agen()` before necessary? I'm sure there's a design lesson to be learned in here somewhere XD This was discovered while trying to add more "fancy" logging throughout said core for the purposes of cobbling together an init attempt at libp2p style multi-address representations for our IPC primitives. Thus I also tinker here with adding some new fields to `MsgpackTCPStream`: - `layer_key`: int = 4 - `name_key`: str = 'tcp' - `codec_key`: str = 'msgpack' Anyway, just changed it so that if `.msgstream` ain't set then we just return a little "null repr" `str` value thinger. Also renames `Channel.msgstream` internally to `._transport` with appropriate pub `@property`s added such that everything else won't break ;p Also drops `Optional` typing vis-a-vi modern union syntax B) --- tractor/_ipc.py | 93 +++++++++++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 37 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5e286c1..b108c90 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -30,7 +30,6 @@ import typing from typing import ( Any, runtime_checkable, - Optional, Protocol, Type, TypeVar, @@ -113,6 +112,13 @@ class MsgpackTCPStream(MsgTransport): using the ``msgspec`` codec lib. ''' + layer_key: int = 4 + name_key: str = 'tcp' + + # TODO: better naming for this? + # -[ ] check how libp2p does naming for such things? + codec_key: str = 'msgpack' + def __init__( self, stream: trio.SocketStream, @@ -268,7 +274,7 @@ class Channel: def __init__( self, - destaddr: Optional[tuple[str, int]], + destaddr: tuple[str, int]|None, msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), @@ -286,14 +292,14 @@ class Channel: # Either created in ``.connect()`` or passed in by # user in ``.from_stream()``. - self._stream: Optional[trio.SocketStream] = None - self.msgstream: Optional[MsgTransport] = None + self._stream: trio.SocketStream|None = None + self._transport: MsgTransport|None = None # set after handshake - always uid of far end - self.uid: Optional[tuple[str, str]] = None + self.uid: tuple[str, str]|None = None self._agen = self._aiter_recv() - self._exc: Optional[Exception] = None # set if far end actor errors + self._exc: Exception|None = None # set if far end actor errors self._closed: bool = False # flag set by ``Portal.cancel_actor()`` indicating remote @@ -301,6 +307,15 @@ class Channel: # runtime. self._cancel_called: bool = False + @property + def msgstream(self) -> MsgTransport: + log.info('`Channel.msgstream` is an old name, use `._transport`') + return self._transport + + @property + def transport(self) -> MsgTransport: + return self._transport + @classmethod def from_stream( cls, @@ -310,40 +325,44 @@ class Channel: ) -> Channel: src, dst = get_stream_addrs(stream) - chan = Channel(destaddr=dst, **kwargs) + chan = Channel( + destaddr=dst, + **kwargs, + ) # set immediately here from provided instance - chan._stream = stream + chan._stream: trio.SocketStream = stream chan.set_msg_transport(stream) return chan def set_msg_transport( self, stream: trio.SocketStream, - type_key: Optional[tuple[str, str]] = None, + type_key: tuple[str, str]|None = None, ) -> MsgTransport: type_key = type_key or self._transport_key - self.msgstream = get_msg_transport(type_key)(stream) - return self.msgstream + self._transport = get_msg_transport(type_key)(stream) + return self._transport def __repr__(self) -> str: - if self.msgstream: - return repr( - self.msgstream.stream.socket._sock - ).replace( # type: ignore - "socket.socket", - "Channel", - ) - return object.__repr__(self) + if not self._transport: + return '' + + return repr( + self._transport.stream.socket._sock + ).replace( # type: ignore + "socket.socket", + "Channel", + ) @property - def laddr(self) -> Optional[tuple[str, int]]: - return self.msgstream.laddr if self.msgstream else None + def laddr(self) -> tuple[str, int]|None: + return self._transport.laddr if self._transport else None @property - def raddr(self) -> Optional[tuple[str, int]]: - return self.msgstream.raddr if self.msgstream else None + def raddr(self) -> tuple[str, int]|None: + return self._transport.raddr if self._transport else None async def connect( self, @@ -362,12 +381,12 @@ class Channel: *destaddr, **kwargs ) - msgstream = self.set_msg_transport(stream) + transport = self.set_msg_transport(stream) log.transport( - f'Opened channel[{type(msgstream)}]: {self.laddr} -> {self.raddr}' + f'Opened channel[{type(transport)}]: {self.laddr} -> {self.raddr}' ) - return msgstream + return transport async def send(self, item: Any) -> None: @@ -375,16 +394,16 @@ class Channel: '=> send IPC msg:\n\n' f'{pformat(item)}\n' ) # type: ignore - assert self.msgstream + assert self._transport - await self.msgstream.send(item) + await self._transport.send(item) async def recv(self) -> Any: - assert self.msgstream - return await self.msgstream.recv() + assert self._transport + return await self._transport.recv() # try: - # return await self.msgstream.recv() + # return await self._transport.recv() # except trio.BrokenResourceError: # if self._autorecon: # await self._reconnect() @@ -397,8 +416,8 @@ class Channel: f'Closing channel to {self.uid} ' f'{self.laddr} -> {self.raddr}' ) - assert self.msgstream - await self.msgstream.stream.aclose() + assert self._transport + await self._transport.stream.aclose() self._closed = True async def __aenter__(self): @@ -449,16 +468,16 @@ class Channel: Async iterate items from underlying stream. ''' - assert self.msgstream + assert self._transport while True: try: - async for item in self.msgstream: + async for item in self._transport: yield item # sent = yield item # if sent is not None: # # optimization, passing None through all the # # time is pointless - # await self.msgstream.send(sent) + # await self._transport.send(sent) except trio.BrokenResourceError: # if not self._autorecon: @@ -471,7 +490,7 @@ class Channel: # continue def connected(self) -> bool: - return self.msgstream.connected() if self.msgstream else False + return self._transport.connected() if self._transport else False @asynccontextmanager