forked from goodboy/tractor
1
0
Fork 0

Fix `Channel.__repr__()` safety, renames to `._transport`

Hit a reallly weird bug in the `._runtime` IPC msg handling loop where
it seems that by `str.format()`-ing a `Channel` before initializing it
would put the `._MsgTransport._agen()` in an already started state
causing an irrecoverable core startup failure..

I presume it's something to do with delegating to the
`MsgpackTCPStream.__repr__()` and, something something.. the
`.set_msg_transport(stream)` getting called to too early such that
`.msgstream.__init__()` is called thus init-ing the `._agen()` before
necessary? I'm sure there's a design lesson to be learned in here
somewhere XD

This was discovered while trying to add more "fancy" logging throughout
said core for the purposes of cobbling together an init attempt at
libp2p style multi-address representations for our IPC primitives. Thus
I also tinker here with adding some new fields to `MsgpackTCPStream`:
- `layer_key`: int = 4
- `name_key`: str = 'tcp'
- `codec_key`: str = 'msgpack'

Anyway, just changed it so that if `.msgstream` ain't set then we just
return a little "null repr" `str` value thinger.

Also renames `Channel.msgstream` internally to `._transport` with
appropriate pub `@property`s added such that everything else won't break
;p

Also drops `Optional` typing vis-a-vi modern union syntax B)
modden_spawn_from_client_req
Tyler Goodlet 2024-02-29 18:20:41 -05:00
parent 1e5810e56c
commit 23aa97692e
1 changed files with 56 additions and 37 deletions

View File

@ -30,7 +30,6 @@ import typing
from typing import ( from typing import (
Any, Any,
runtime_checkable, runtime_checkable,
Optional,
Protocol, Protocol,
Type, Type,
TypeVar, TypeVar,
@ -113,6 +112,13 @@ class MsgpackTCPStream(MsgTransport):
using the ``msgspec`` codec lib. using the ``msgspec`` codec lib.
''' '''
layer_key: int = 4
name_key: str = 'tcp'
# TODO: better naming for this?
# -[ ] check how libp2p does naming for such things?
codec_key: str = 'msgpack'
def __init__( def __init__(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
@ -268,7 +274,7 @@ class Channel:
def __init__( def __init__(
self, self,
destaddr: Optional[tuple[str, int]], destaddr: tuple[str, int]|None,
msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'),
@ -286,14 +292,14 @@ class Channel:
# Either created in ``.connect()`` or passed in by # Either created in ``.connect()`` or passed in by
# user in ``.from_stream()``. # user in ``.from_stream()``.
self._stream: Optional[trio.SocketStream] = None self._stream: trio.SocketStream|None = None
self.msgstream: Optional[MsgTransport] = None self._transport: MsgTransport|None = None
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: Optional[tuple[str, str]] = None self.uid: tuple[str, str]|None = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors self._exc: Exception|None = None # set if far end actor errors
self._closed: bool = False self._closed: bool = False
# flag set by ``Portal.cancel_actor()`` indicating remote # flag set by ``Portal.cancel_actor()`` indicating remote
@ -301,6 +307,15 @@ class Channel:
# runtime. # runtime.
self._cancel_called: bool = False self._cancel_called: bool = False
@property
def msgstream(self) -> MsgTransport:
log.info('`Channel.msgstream` is an old name, use `._transport`')
return self._transport
@property
def transport(self) -> MsgTransport:
return self._transport
@classmethod @classmethod
def from_stream( def from_stream(
cls, cls,
@ -310,40 +325,44 @@ class Channel:
) -> Channel: ) -> Channel:
src, dst = get_stream_addrs(stream) src, dst = get_stream_addrs(stream)
chan = Channel(destaddr=dst, **kwargs) chan = Channel(
destaddr=dst,
**kwargs,
)
# set immediately here from provided instance # set immediately here from provided instance
chan._stream = stream chan._stream: trio.SocketStream = stream
chan.set_msg_transport(stream) chan.set_msg_transport(stream)
return chan return chan
def set_msg_transport( def set_msg_transport(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
type_key: Optional[tuple[str, str]] = None, type_key: tuple[str, str]|None = None,
) -> MsgTransport: ) -> MsgTransport:
type_key = type_key or self._transport_key type_key = type_key or self._transport_key
self.msgstream = get_msg_transport(type_key)(stream) self._transport = get_msg_transport(type_key)(stream)
return self.msgstream return self._transport
def __repr__(self) -> str: def __repr__(self) -> str:
if self.msgstream: if not self._transport:
return '<Channel with inactive transport?>'
return repr( return repr(
self.msgstream.stream.socket._sock self._transport.stream.socket._sock
).replace( # type: ignore ).replace( # type: ignore
"socket.socket", "socket.socket",
"Channel", "Channel",
) )
return object.__repr__(self)
@property @property
def laddr(self) -> Optional[tuple[str, int]]: def laddr(self) -> tuple[str, int]|None:
return self.msgstream.laddr if self.msgstream else None return self._transport.laddr if self._transport else None
@property @property
def raddr(self) -> Optional[tuple[str, int]]: def raddr(self) -> tuple[str, int]|None:
return self.msgstream.raddr if self.msgstream else None return self._transport.raddr if self._transport else None
async def connect( async def connect(
self, self,
@ -362,12 +381,12 @@ class Channel:
*destaddr, *destaddr,
**kwargs **kwargs
) )
msgstream = self.set_msg_transport(stream) transport = self.set_msg_transport(stream)
log.transport( log.transport(
f'Opened channel[{type(msgstream)}]: {self.laddr} -> {self.raddr}' f'Opened channel[{type(transport)}]: {self.laddr} -> {self.raddr}'
) )
return msgstream return transport
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
@ -375,16 +394,16 @@ class Channel:
'=> send IPC msg:\n\n' '=> send IPC msg:\n\n'
f'{pformat(item)}\n' f'{pformat(item)}\n'
) # type: ignore ) # type: ignore
assert self.msgstream assert self._transport
await self.msgstream.send(item) await self._transport.send(item)
async def recv(self) -> Any: async def recv(self) -> Any:
assert self.msgstream assert self._transport
return await self.msgstream.recv() return await self._transport.recv()
# try: # try:
# return await self.msgstream.recv() # return await self._transport.recv()
# except trio.BrokenResourceError: # except trio.BrokenResourceError:
# if self._autorecon: # if self._autorecon:
# await self._reconnect() # await self._reconnect()
@ -397,8 +416,8 @@ class Channel:
f'Closing channel to {self.uid} ' f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}' f'{self.laddr} -> {self.raddr}'
) )
assert self.msgstream assert self._transport
await self.msgstream.stream.aclose() await self._transport.stream.aclose()
self._closed = True self._closed = True
async def __aenter__(self): async def __aenter__(self):
@ -449,16 +468,16 @@ class Channel:
Async iterate items from underlying stream. Async iterate items from underlying stream.
''' '''
assert self.msgstream assert self._transport
while True: while True:
try: try:
async for item in self.msgstream: async for item in self._transport:
yield item yield item
# sent = yield item # sent = yield item
# if sent is not None: # if sent is not None:
# # optimization, passing None through all the # # optimization, passing None through all the
# # time is pointless # # time is pointless
# await self.msgstream.send(sent) # await self._transport.send(sent)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# if not self._autorecon: # if not self._autorecon:
@ -471,7 +490,7 @@ class Channel:
# continue # continue
def connected(self) -> bool: def connected(self) -> bool:
return self.msgstream.connected() if self.msgstream else False return self._transport.connected() if self._transport else False
@asynccontextmanager @asynccontextmanager