diff --git a/tractor/__init__.py b/tractor/__init__.py index ad3144d..bda4ac2 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -49,6 +49,7 @@ from ._exceptions import ( ModuleNotExposed as ModuleNotExposed, MsgTypeError as MsgTypeError, RemoteActorError as RemoteActorError, + TransportClosed as TransportClosed, ) from .devx import ( breakpoint as breakpoint, diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 265f506..a0b6ff3 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -906,8 +906,59 @@ class StreamOverrun( ''' -class TransportClosed(trio.ClosedResourceError): - "Underlying channel transport was closed prior to use" +class TransportClosed(trio.BrokenResourceError): + ''' + IPC transport (protocol) connection was closed or broke and + indicates that the wrapping communication `Channel` can no longer + be used to send/receive msgs from the remote peer. + + ''' + def __init__( + self, + message: str, + loglevel: str = 'transport', + cause: BaseException|None = None, + raise_on_report: bool = False, + + ) -> None: + self.message: str = message + self._loglevel = loglevel + super().__init__(message) + + if cause is not None: + self.__cause__ = cause + + # flag to toggle whether the msg loop should raise + # the exc in its `TransportClosed` handler block. + self._raise_on_report = raise_on_report + + def report_n_maybe_raise( + self, + message: str|None = None, + + ) -> None: + ''' + Using the init-specified log level emit a logging report + for this error. + + ''' + message: str = message or self.message + # when a cause is set, slap it onto the log emission. + if cause := self.__cause__: + cause_tb_str: str = ''.join( + traceback.format_tb(cause.__traceback__) + ) + message += ( + f'{cause_tb_str}\n' # tb + f' {cause}\n' # exc repr + ) + + getattr(log, self._loglevel)(message) + + # some errors we want to blow up from + # inside the RPC msg loop + if self._raise_on_report: + raise self from cause class NoResult(RuntimeError): diff --git a/tractor/_ipc.py b/tractor/_ipc.py index e5e3d10..a1cb035 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -54,7 +54,7 @@ from tractor._exceptions import ( ) from tractor.msg import ( _ctxvar_MsgCodec, - _codec, + # _codec, XXX see `self._codec` sanity/debug checks MsgCodec, types as msgtypes, pretty_struct, @@ -65,8 +65,18 @@ log = get_logger(__name__) _is_windows = platform.system() == 'Windows' -def get_stream_addrs(stream: trio.SocketStream) -> tuple: - # should both be IP sockets +def get_stream_addrs( + stream: trio.SocketStream +) -> tuple[ + tuple[str, int], # local + tuple[str, int], # remote +]: + ''' + Return the `trio` streaming transport prot's socket-addrs for + both the local and remote sides as a pair. + + ''' + # rn, should both be IP sockets lsockname = stream.socket.getsockname() rsockname = stream.socket.getpeername() return ( @@ -75,17 +85,22 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple: ) -# TODO: this should be our `Union[*msgtypes.__spec__]` now right? -MsgType = TypeVar("MsgType") - -# TODO: consider using a generic def and indexing with our eventual -# msg definition/types? -# - https://docs.python.org/3/library/typing.html#typing.Protocol -# - https://jcristharif.com/msgspec/usage.html#structs +# from tractor.msg.types import MsgType +# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? +# => BLEH, except can't bc prots must inherit typevar or param-spec +# vars.. +MsgType = TypeVar('MsgType') +# TODO: break up this mod into a subpkg so we can start adding new +# backends and move this type stuff into a dedicated file.. Bo +# @runtime_checkable class MsgTransport(Protocol[MsgType]): +# +# ^-TODO-^ consider using a generic def and indexing with our +# eventual msg definition/types? +# - https://docs.python.org/3/library/typing.html#typing.Protocol stream: trio.SocketStream drained: list[MsgType] @@ -120,9 +135,9 @@ class MsgTransport(Protocol[MsgType]): ... -# TODO: not sure why we have to inherit here, but it seems to be an -# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; -# probably should make a `mypy` issue? +# TODO: typing oddity.. not sure why we have to inherit here, but it +# seems to be an issue with `get_msg_transport()` returning +# a `Type[Protocol]`; probably should make a `mypy` issue? class MsgpackTCPStream(MsgTransport): ''' A ``trio.SocketStream`` delivering ``msgpack`` formatted data @@ -145,7 +160,7 @@ class MsgpackTCPStream(MsgTransport): # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # # TODO: define this as a `Codec` struct which can be - # overriden dynamically by the application/runtime. + # overriden dynamically by the application/runtime? codec: tuple[ Callable[[Any], Any]|None, # coder Callable[[type, Any], Any]|None, # decoder @@ -160,7 +175,7 @@ class MsgpackTCPStream(MsgTransport): self._laddr, self._raddr = get_stream_addrs(stream) # create read loop instance - self._agen = self._iter_packets() + self._aiter_pkts = self._iter_packets() self._send_lock = trio.StrictFIFOLock() # public i guess? @@ -174,15 +189,12 @@ class MsgpackTCPStream(MsgTransport): # allow for custom IPC msg interchange format # dynamic override Bo self._task = trio.lowlevel.current_task() - self._codec: MsgCodec = ( - codec - or - _codec._ctxvar_MsgCodec.get() - ) - # TODO: mask out before release? - # log.runtime( - # f'New {self} created with codec\n' - # f'codec: {self._codec}\n' + + # XXX for ctxvar debug only! + # self._codec: MsgCodec = ( + # codec + # or + # _codec._ctxvar_MsgCodec.get() # ) async def _iter_packets(self) -> AsyncGenerator[dict, None]: @@ -190,6 +202,11 @@ class MsgpackTCPStream(MsgTransport): Yield `bytes`-blob decoded packets from the underlying TCP stream using the current task's `MsgCodec`. + This is a streaming routine implemented as an async generator + func (which was the original design, but could be changed?) + and is allocated by a `.__call__()` inside `.__init__()` where + it is assigned to the `._aiter_pkts` attr. + ''' decodes_failed: int = 0 @@ -204,16 +221,82 @@ class MsgpackTCPStream(MsgTransport): # seem to be getting racy failures here on # arbiter/registry name subs.. trio.BrokenResourceError, - ): - raise TransportClosed( - f'transport {self} was already closed prior ro read' - ) + ) as trans_err: + + loglevel = 'transport' + match trans_err: + # case ( + # ConnectionResetError() + # ): + # loglevel = 'transport' + + # peer actor (graceful??) TCP EOF but `tricycle` + # seems to raise a 0-bytes-read? + case ValueError() if ( + 'unclean EOF' in trans_err.args[0] + ): + pass + + # peer actor (task) prolly shutdown quickly due + # to cancellation + case trio.BrokenResourceError() if ( + 'Connection reset by peer' in trans_err.args[0] + ): + pass + + # unless the disconnect condition falls under "a + # normal operation breakage" we usualy console warn + # about it. + case _: + loglevel: str = 'warning' + + + raise TransportClosed( + message=( + f'IPC transport already closed by peer\n' + f'x)> {type(trans_err)}\n' + f' |_{self}\n' + ), + loglevel=loglevel, + ) from trans_err + + # XXX definitely can happen if transport is closed + # manually by another `trio.lowlevel.Task` in the + # same actor; we use this in some simulated fault + # testing for ex, but generally should never happen + # under normal operation! + # + # NOTE: as such we always re-raise this error from the + # RPC msg loop! + except trio.ClosedResourceError as closure_err: + raise TransportClosed( + message=( + f'IPC transport already manually closed locally?\n' + f'x)> {type(closure_err)} \n' + f' |_{self}\n' + ), + loglevel='error', + raise_on_report=( + closure_err.args[0] == 'another task closed this fd' + or + closure_err.args[0] in ['another task closed this fd'] + ), + ) from closure_err + + # graceful TCP EOF disconnect if header == b'': raise TransportClosed( - f'transport {self} was already closed prior ro read' + message=( + f'IPC transport already gracefully closed\n' + f')>\n' + f'|_{self}\n' + ), + loglevel='transport', + # cause=??? # handy or no? ) + size: int size, = struct.unpack(" None: ''' Send a msgpack encoded py-object-blob-as-msg over TCP. @@ -304,21 +375,24 @@ class MsgpackTCPStream(MsgTransport): invalid msg type ''' - # __tracebackhide__: bool = hide_tb + __tracebackhide__: bool = hide_tb + + # XXX see `trio._sync.AsyncContextManagerMixin` for details + # on the `.acquire()`/`.release()` sequencing.. async with self._send_lock: # NOTE: lookup the `trio.Task.context`'s var for # the current `MsgCodec`. codec: MsgCodec = _ctxvar_MsgCodec.get() - # TODO: mask out before release? - if self._codec.pld_spec != codec.pld_spec: - self._codec = codec - log.runtime( - f'Using new codec in {self}.send()\n' - f'codec: {self._codec}\n\n' - f'msg: {msg}\n' - ) + # XXX for ctxvar debug only! + # if self._codec.pld_spec != codec.pld_spec: + # self._codec = codec + # log.runtime( + # f'Using new codec in {self}.send()\n' + # f'codec: {self._codec}\n\n' + # f'msg: {msg}\n' + # ) if type(msg) not in msgtypes.__msg_types__: if strict_types: @@ -352,6 +426,16 @@ class MsgpackTCPStream(MsgTransport): size: bytes = struct.pack(" + # except BaseException as _err: + # err = _err + # if not isinstance(err, MsgTypeError): + # __tracebackhide__: bool = False + # raise + @property def laddr(self) -> tuple[str, int]: return self._laddr @@ -361,7 +445,7 @@ class MsgpackTCPStream(MsgTransport): return self._raddr async def recv(self) -> Any: - return await self._agen.asend(None) + return await self._aiter_pkts.asend(None) async def drain(self) -> AsyncIterator[dict]: ''' @@ -378,7 +462,7 @@ class MsgpackTCPStream(MsgTransport): yield msg def __aiter__(self): - return self._agen + return self._aiter_pkts def connected(self) -> bool: return self.stream.socket.fileno() != -1 @@ -433,7 +517,7 @@ class Channel: # set after handshake - always uid of far end self.uid: tuple[str, str]|None = None - self._agen = self._aiter_recv() + self._aiter_msgs = self._iter_msgs() self._exc: Exception|None = None # set if far end actor errors self._closed: bool = False @@ -497,8 +581,6 @@ class Channel: ) return self._transport - # TODO: something simliar at the IPC-`Context` - # level so as to support @cm def apply_codec( self, @@ -517,6 +599,7 @@ class Channel: finally: self._transport.codec = orig + # TODO: do a .src/.dst: str for maddrs? def __repr__(self) -> str: if not self._transport: return '' @@ -560,27 +643,43 @@ class Channel: ) return transport + # TODO: something like, + # `pdbp.hideframe_on(errors=[MsgTypeError])` + # instead of the `try/except` hack we have rn.. + # seems like a pretty useful thing to have in general + # along with being able to filter certain stack frame(s / sets) + # possibly based on the current log-level? async def send( self, payload: Any, - # hide_tb: bool = False, + hide_tb: bool = False, ) -> None: ''' Send a coded msg-blob over the transport. ''' - # __tracebackhide__: bool = hide_tb - log.transport( - '=> send IPC msg:\n\n' - f'{pformat(payload)}\n' - ) # type: ignore - assert self._transport - await self._transport.send( - payload, - # hide_tb=hide_tb, - ) + __tracebackhide__: bool = hide_tb + try: + log.transport( + '=> send IPC msg:\n\n' + f'{pformat(payload)}\n' + ) + # assert self._transport # but why typing? + await self._transport.send( + payload, + hide_tb=hide_tb, + ) + except BaseException as _err: + err = _err # bind for introspection + if not isinstance(_err, MsgTypeError): + # assert err + __tracebackhide__: bool = False + else: + assert err.cid + + raise async def recv(self) -> Any: assert self._transport @@ -617,8 +716,11 @@ class Channel: await self.aclose(*args) def __aiter__(self): - return self._agen + return self._aiter_msgs + # ?TODO? run any reconnection sequence? + # -[ ] prolly should be impl-ed as deco-API? + # # async def _reconnect(self) -> None: # """Handle connection failures by polling until a reconnect can be # established. @@ -636,7 +738,6 @@ class Channel: # else: # log.transport("Stream connection re-established!") - # # TODO: run any reconnection sequence # # on_recon = self._recon_seq # # if on_recon: # # await on_recon(self) @@ -650,11 +751,17 @@ class Channel: # " for re-establishment") # await trio.sleep(1) - async def _aiter_recv( + async def _iter_msgs( self ) -> AsyncGenerator[Any, None]: ''' - Async iterate items from underlying stream. + Yield `MsgType` IPC msgs decoded and deliverd from + an underlying `MsgTransport` protocol. + + This is a streaming routine alo implemented as an async-gen + func (same a `MsgTransport._iter_pkts()`) gets allocated by + a `.__call__()` inside `.__init__()` where it is assigned to + the `._aiter_msgs` attr. ''' assert self._transport @@ -680,15 +787,6 @@ class Channel: case _: yield msg - # TODO: if we were gonna do this it should be - # done up at the `MsgStream` layer! - # - # sent = yield item - # if sent is not None: - # # optimization, passing None through all the - # # time is pointless - # await self._transport.send(sent) - except trio.BrokenResourceError: # if not self._autorecon: diff --git a/tractor/_rpc.py b/tractor/_rpc.py index fc8687c..1849cf6 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -1197,7 +1197,7 @@ async def process_messages( parent_chan=chan, ) - except TransportClosed: + except TransportClosed as tc: # channels "breaking" (for TCP streams by EOF or 104 # connection-reset) is ok since we don't have a teardown # handshake for them (yet) and instead we simply bail out of @@ -1205,12 +1205,20 @@ async def process_messages( # up.. # # TODO: maybe add a teardown handshake? and, - # -[ ] don't show this msg if it's an ephemeral discovery ep call? + # -[x] don't show this msg if it's an ephemeral discovery ep call? + # |_ see the below `.report_n_maybe_raise()` impl as well as + # tc-exc input details in `MsgpackTCPStream._iter_pkts()` + # for different read-failure cases. # -[ ] figure out how this will break with other transports? - log.runtime( - f'IPC channel closed abruptly\n' - f'<=x peer: {chan.uid}\n' - f' |_{chan.raddr}\n' + tc.report_n_maybe_raise( + message=( + f'peer IPC channel closed abruptly?\n\n' + f'<=x {chan}\n' + f' |_{chan.raddr}\n\n' + ) + + + tc.message + ) # transport **WAS** disconnected