forked from goodboy/tractor
1
0
Fork 0

More formal `TransportClosed` reporting/raising

Since it was all ad-hoc defined inside
`._ipc.MsgpackTCPStream._iter_pkts()` more or less, this starts
formalizing a way for particular transport backends to indicate whether
a disconnect condition should be re-raised in the RPC msg loop and if
not what log level to report it at (if any).

Based on our lone transport currently we try to suppress any logging
noise from ephemeral connections expected during normal actor
interaction and discovery subsys ops:
- any short lived discovery related TCP connects are only logged as
  `.transport()` level.
- both `.error()` and raise on any underlying `trio.ClosedResource`
  cause since that normally means some task touched transport layer
  internals that it shouldn't have.
- do a `.warning()` on anything else unexpected.

Impl deats:
- extend the `._exceptions.TransportClosed` to accept an input log
  level, raise-on-report toggle and custom reporting & raising via a new
  `.report_n_maybe_raise()` method.
- construct the TCs with inputs per case in (the newly named) `._iter_pkts().
- call ^ this method from the `TransportClosed` handler block inside the
  RPC msg loop thus delegating reporting levels and/or raising to the
  backend's per-case TC instantiating.

Related `._ipc` changes:
- mask out all the `MsgpackTCPStream._codec` debug helper stuff and drop
  any lingering cruft from the initial proto-ing of msg-codecs.
- rename some attrs/methods:
  |_`MsgpackTCPStream._iter_packets()` -> `._iter_pkts()` and
    `._agen` -> `_aiter_pkts`.
  |_`Channel._aiter_recv()` -> `._aiter_msgs()` and
    `._agen` -> `_aiter_msgs`.
- add `hide_tb: bool` support to `Channel.send()` and only show the
  frame on non-MTEs.
aio_abandons
Tyler Goodlet 2024-07-02 12:21:26 -04:00
parent 3907cba68e
commit af3745684c
4 changed files with 259 additions and 101 deletions

View File

@ -49,6 +49,7 @@ from ._exceptions import (
ModuleNotExposed as ModuleNotExposed, ModuleNotExposed as ModuleNotExposed,
MsgTypeError as MsgTypeError, MsgTypeError as MsgTypeError,
RemoteActorError as RemoteActorError, RemoteActorError as RemoteActorError,
TransportClosed as TransportClosed,
) )
from .devx import ( from .devx import (
breakpoint as breakpoint, breakpoint as breakpoint,

View File

@ -906,8 +906,59 @@ class StreamOverrun(
''' '''
class TransportClosed(trio.ClosedResourceError): class TransportClosed(trio.BrokenResourceError):
"Underlying channel transport was closed prior to use" '''
IPC transport (protocol) connection was closed or broke and
indicates that the wrapping communication `Channel` can no longer
be used to send/receive msgs from the remote peer.
'''
def __init__(
self,
message: str,
loglevel: str = 'transport',
cause: BaseException|None = None,
raise_on_report: bool = False,
) -> None:
self.message: str = message
self._loglevel = loglevel
super().__init__(message)
if cause is not None:
self.__cause__ = cause
# flag to toggle whether the msg loop should raise
# the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report
def report_n_maybe_raise(
self,
message: str|None = None,
) -> None:
'''
Using the init-specified log level emit a logging report
for this error.
'''
message: str = message or self.message
# when a cause is set, slap it onto the log emission.
if cause := self.__cause__:
cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__)
)
message += (
f'{cause_tb_str}\n' # tb
f' {cause}\n' # exc repr
)
getattr(log, self._loglevel)(message)
# some errors we want to blow up from
# inside the RPC msg loop
if self._raise_on_report:
raise self from cause
class NoResult(RuntimeError): class NoResult(RuntimeError):

View File

@ -54,7 +54,7 @@ from tractor._exceptions import (
) )
from tractor.msg import ( from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
_codec, # _codec, XXX see `self._codec` sanity/debug checks
MsgCodec, MsgCodec,
types as msgtypes, types as msgtypes,
pretty_struct, pretty_struct,
@ -65,8 +65,18 @@ log = get_logger(__name__)
_is_windows = platform.system() == 'Windows' _is_windows = platform.system() == 'Windows'
def get_stream_addrs(stream: trio.SocketStream) -> tuple: def get_stream_addrs(
# should both be IP sockets stream: trio.SocketStream
) -> tuple[
tuple[str, int], # local
tuple[str, int], # remote
]:
'''
Return the `trio` streaming transport prot's socket-addrs for
both the local and remote sides as a pair.
'''
# rn, should both be IP sockets
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
return ( return (
@ -75,17 +85,22 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple:
) )
# TODO: this should be our `Union[*msgtypes.__spec__]` now right? # from tractor.msg.types import MsgType
MsgType = TypeVar("MsgType") # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
# => BLEH, except can't bc prots must inherit typevar or param-spec
# TODO: consider using a generic def and indexing with our eventual # vars..
# msg definition/types? MsgType = TypeVar('MsgType')
# - https://docs.python.org/3/library/typing.html#typing.Protocol
# - https://jcristharif.com/msgspec/usage.html#structs
# TODO: break up this mod into a subpkg so we can start adding new
# backends and move this type stuff into a dedicated file.. Bo
#
@runtime_checkable @runtime_checkable
class MsgTransport(Protocol[MsgType]): class MsgTransport(Protocol[MsgType]):
#
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
stream: trio.SocketStream stream: trio.SocketStream
drained: list[MsgType] drained: list[MsgType]
@ -120,9 +135,9 @@ class MsgTransport(Protocol[MsgType]):
... ...
# TODO: not sure why we have to inherit here, but it seems to be an # TODO: typing oddity.. not sure why we have to inherit here, but it
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; # seems to be an issue with `get_msg_transport()` returning
# probably should make a `mypy` issue? # a `Type[Protocol]`; probably should make a `mypy` issue?
class MsgpackTCPStream(MsgTransport): class MsgpackTCPStream(MsgTransport):
''' '''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data A ``trio.SocketStream`` delivering ``msgpack`` formatted data
@ -145,7 +160,7 @@ class MsgpackTCPStream(MsgTransport):
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
# #
# TODO: define this as a `Codec` struct which can be # TODO: define this as a `Codec` struct which can be
# overriden dynamically by the application/runtime. # overriden dynamically by the application/runtime?
codec: tuple[ codec: tuple[
Callable[[Any], Any]|None, # coder Callable[[Any], Any]|None, # coder
Callable[[type, Any], Any]|None, # decoder Callable[[type, Any], Any]|None, # decoder
@ -160,7 +175,7 @@ class MsgpackTCPStream(MsgTransport):
self._laddr, self._raddr = get_stream_addrs(stream) self._laddr, self._raddr = get_stream_addrs(stream)
# create read loop instance # create read loop instance
self._agen = self._iter_packets() self._aiter_pkts = self._iter_packets()
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
# public i guess? # public i guess?
@ -174,15 +189,12 @@ class MsgpackTCPStream(MsgTransport):
# allow for custom IPC msg interchange format # allow for custom IPC msg interchange format
# dynamic override Bo # dynamic override Bo
self._task = trio.lowlevel.current_task() self._task = trio.lowlevel.current_task()
self._codec: MsgCodec = (
codec # XXX for ctxvar debug only!
or # self._codec: MsgCodec = (
_codec._ctxvar_MsgCodec.get() # codec
) # or
# TODO: mask out before release? # _codec._ctxvar_MsgCodec.get()
# log.runtime(
# f'New {self} created with codec\n'
# f'codec: {self._codec}\n'
# ) # )
async def _iter_packets(self) -> AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
@ -190,6 +202,11 @@ class MsgpackTCPStream(MsgTransport):
Yield `bytes`-blob decoded packets from the underlying TCP Yield `bytes`-blob decoded packets from the underlying TCP
stream using the current task's `MsgCodec`. stream using the current task's `MsgCodec`.
This is a streaming routine implemented as an async generator
func (which was the original design, but could be changed?)
and is allocated by a `.__call__()` inside `.__init__()` where
it is assigned to the `._aiter_pkts` attr.
''' '''
decodes_failed: int = 0 decodes_failed: int = 0
@ -204,16 +221,82 @@ class MsgpackTCPStream(MsgTransport):
# seem to be getting racy failures here on # seem to be getting racy failures here on
# arbiter/registry name subs.. # arbiter/registry name subs..
trio.BrokenResourceError, trio.BrokenResourceError,
):
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
) as trans_err:
loglevel = 'transport'
match trans_err:
# case (
# ConnectionResetError()
# ):
# loglevel = 'transport'
# peer actor (graceful??) TCP EOF but `tricycle`
# seems to raise a 0-bytes-read?
case ValueError() if (
'unclean EOF' in trans_err.args[0]
):
pass
# peer actor (task) prolly shutdown quickly due
# to cancellation
case trio.BrokenResourceError() if (
'Connection reset by peer' in trans_err.args[0]
):
pass
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
loglevel: str = 'warning'
raise TransportClosed(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
loglevel=loglevel,
) from trans_err
# XXX definitely can happen if transport is closed
# manually by another `trio.lowlevel.Task` in the
# same actor; we use this in some simulated fault
# testing for ex, but generally should never happen
# under normal operation!
#
# NOTE: as such we always re-raise this error from the
# RPC msg loop!
except trio.ClosedResourceError as closure_err:
raise TransportClosed(
message=(
f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n'
f' |_{self}\n'
),
loglevel='error',
raise_on_report=(
closure_err.args[0] == 'another task closed this fd'
or
closure_err.args[0] in ['another task closed this fd']
),
) from closure_err
# graceful TCP EOF disconnect
if header == b'': if header == b'':
raise TransportClosed( raise TransportClosed(
f'transport {self} was already closed prior ro read' message=(
f'IPC transport already gracefully closed\n'
f')>\n'
f'|_{self}\n'
),
loglevel='transport',
# cause=??? # handy or no?
) )
size: int
size, = struct.unpack("<I", header) size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore log.transport(f'received header {size}') # type: ignore
@ -225,33 +308,20 @@ class MsgpackTCPStream(MsgTransport):
# the current `MsgCodec`. # the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get() codec: MsgCodec = _ctxvar_MsgCodec.get()
# TODO: mask out before release? # XXX for ctxvar debug only!
if self._codec.pld_spec != codec.pld_spec: # if self._codec.pld_spec != codec.pld_spec:
# assert ( # assert (
# task := trio.lowlevel.current_task() # task := trio.lowlevel.current_task()
# ) is not self._task # ) is not self._task
# self._task = task # self._task = task
self._codec = codec # self._codec = codec
log.runtime( # log.runtime(
f'Using new codec in {self}.recv()\n' # f'Using new codec in {self}.recv()\n'
f'codec: {self._codec}\n\n' # f'codec: {self._codec}\n\n'
f'msg_bytes: {msg_bytes}\n' # f'msg_bytes: {msg_bytes}\n'
) # )
yield codec.decode(msg_bytes) yield codec.decode(msg_bytes)
# TODO: remove, was only for orig draft impl
# testing.
#
# curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
# obj = curr_codec.decode(msg_bytes)
# if (
# curr_codec is not
# _codec._def_msgspec_codec
# ):
# print(f'OBJ: {obj}\n')
#
# yield obj
# XXX NOTE: since the below error derives from # XXX NOTE: since the below error derives from
# `DecodeError` we need to catch is specially # `DecodeError` we need to catch is specially
# and always raise such that spec violations # and always raise such that spec violations
@ -295,7 +365,8 @@ class MsgpackTCPStream(MsgTransport):
msg: msgtypes.MsgType, msg: msgtypes.MsgType,
strict_types: bool = True, strict_types: bool = True,
# hide_tb: bool = False, hide_tb: bool = False,
) -> None: ) -> None:
''' '''
Send a msgpack encoded py-object-blob-as-msg over TCP. Send a msgpack encoded py-object-blob-as-msg over TCP.
@ -304,21 +375,24 @@ class MsgpackTCPStream(MsgTransport):
invalid msg type invalid msg type
''' '''
# __tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# XXX see `trio._sync.AsyncContextManagerMixin` for details
# on the `.acquire()`/`.release()` sequencing..
async with self._send_lock: async with self._send_lock:
# NOTE: lookup the `trio.Task.context`'s var for # NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`. # the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get() codec: MsgCodec = _ctxvar_MsgCodec.get()
# TODO: mask out before release? # XXX for ctxvar debug only!
if self._codec.pld_spec != codec.pld_spec: # if self._codec.pld_spec != codec.pld_spec:
self._codec = codec # self._codec = codec
log.runtime( # log.runtime(
f'Using new codec in {self}.send()\n' # f'Using new codec in {self}.send()\n'
f'codec: {self._codec}\n\n' # f'codec: {self._codec}\n\n'
f'msg: {msg}\n' # f'msg: {msg}\n'
) # )
if type(msg) not in msgtypes.__msg_types__: if type(msg) not in msgtypes.__msg_types__:
if strict_types: if strict_types:
@ -352,6 +426,16 @@ class MsgpackTCPStream(MsgTransport):
size: bytes = struct.pack("<I", len(bytes_data)) size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
# ?TODO? does it help ever to dynamically show this
# frame?
# try:
# <the-above_code>
# except BaseException as _err:
# err = _err
# if not isinstance(err, MsgTypeError):
# __tracebackhide__: bool = False
# raise
@property @property
def laddr(self) -> tuple[str, int]: def laddr(self) -> tuple[str, int]:
return self._laddr return self._laddr
@ -361,7 +445,7 @@ class MsgpackTCPStream(MsgTransport):
return self._raddr return self._raddr
async def recv(self) -> Any: async def recv(self) -> Any:
return await self._agen.asend(None) return await self._aiter_pkts.asend(None)
async def drain(self) -> AsyncIterator[dict]: async def drain(self) -> AsyncIterator[dict]:
''' '''
@ -378,7 +462,7 @@ class MsgpackTCPStream(MsgTransport):
yield msg yield msg
def __aiter__(self): def __aiter__(self):
return self._agen return self._aiter_pkts
def connected(self) -> bool: def connected(self) -> bool:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
@ -433,7 +517,7 @@ class Channel:
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None self.uid: tuple[str, str]|None = None
self._agen = self._aiter_recv() self._aiter_msgs = self._iter_msgs()
self._exc: Exception|None = None # set if far end actor errors self._exc: Exception|None = None # set if far end actor errors
self._closed: bool = False self._closed: bool = False
@ -497,8 +581,6 @@ class Channel:
) )
return self._transport return self._transport
# TODO: something simliar at the IPC-`Context`
# level so as to support
@cm @cm
def apply_codec( def apply_codec(
self, self,
@ -517,6 +599,7 @@ class Channel:
finally: finally:
self._transport.codec = orig self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs?
def __repr__(self) -> str: def __repr__(self) -> str:
if not self._transport: if not self._transport:
return '<Channel with inactive transport?>' return '<Channel with inactive transport?>'
@ -560,27 +643,43 @@ class Channel:
) )
return transport return transport
# TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])`
# instead of the `try/except` hack we have rn..
# seems like a pretty useful thing to have in general
# along with being able to filter certain stack frame(s / sets)
# possibly based on the current log-level?
async def send( async def send(
self, self,
payload: Any, payload: Any,
# hide_tb: bool = False, hide_tb: bool = False,
) -> None: ) -> None:
''' '''
Send a coded msg-blob over the transport. Send a coded msg-blob over the transport.
''' '''
# __tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
log.transport( try:
'=> send IPC msg:\n\n' log.transport(
f'{pformat(payload)}\n' '=> send IPC msg:\n\n'
) # type: ignore f'{pformat(payload)}\n'
assert self._transport )
await self._transport.send( # assert self._transport # but why typing?
payload, await self._transport.send(
# hide_tb=hide_tb, payload,
) hide_tb=hide_tb,
)
except BaseException as _err:
err = _err # bind for introspection
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
assert err.cid
raise
async def recv(self) -> Any: async def recv(self) -> Any:
assert self._transport assert self._transport
@ -617,8 +716,11 @@ class Channel:
await self.aclose(*args) await self.aclose(*args)
def __aiter__(self): def __aiter__(self):
return self._agen return self._aiter_msgs
# ?TODO? run any reconnection sequence?
# -[ ] prolly should be impl-ed as deco-API?
#
# async def _reconnect(self) -> None: # async def _reconnect(self) -> None:
# """Handle connection failures by polling until a reconnect can be # """Handle connection failures by polling until a reconnect can be
# established. # established.
@ -636,7 +738,6 @@ class Channel:
# else: # else:
# log.transport("Stream connection re-established!") # log.transport("Stream connection re-established!")
# # TODO: run any reconnection sequence
# # on_recon = self._recon_seq # # on_recon = self._recon_seq
# # if on_recon: # # if on_recon:
# # await on_recon(self) # # await on_recon(self)
@ -650,11 +751,17 @@ class Channel:
# " for re-establishment") # " for re-establishment")
# await trio.sleep(1) # await trio.sleep(1)
async def _aiter_recv( async def _iter_msgs(
self self
) -> AsyncGenerator[Any, None]: ) -> AsyncGenerator[Any, None]:
''' '''
Async iterate items from underlying stream. Yield `MsgType` IPC msgs decoded and deliverd from
an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an async-gen
func (same a `MsgTransport._iter_pkts()`) gets allocated by
a `.__call__()` inside `.__init__()` where it is assigned to
the `._aiter_msgs` attr.
''' '''
assert self._transport assert self._transport
@ -680,15 +787,6 @@ class Channel:
case _: case _:
yield msg yield msg
# TODO: if we were gonna do this it should be
# done up at the `MsgStream` layer!
#
# sent = yield item
# if sent is not None:
# # optimization, passing None through all the
# # time is pointless
# await self._transport.send(sent)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# if not self._autorecon: # if not self._autorecon:

View File

@ -1197,7 +1197,7 @@ async def process_messages(
parent_chan=chan, parent_chan=chan,
) )
except TransportClosed: except TransportClosed as tc:
# channels "breaking" (for TCP streams by EOF or 104 # channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown # connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of # handshake for them (yet) and instead we simply bail out of
@ -1205,12 +1205,20 @@ async def process_messages(
# up.. # up..
# #
# TODO: maybe add a teardown handshake? and, # TODO: maybe add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call? # -[x] don't show this msg if it's an ephemeral discovery ep call?
# |_ see the below `.report_n_maybe_raise()` impl as well as
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
# for different read-failure cases.
# -[ ] figure out how this will break with other transports? # -[ ] figure out how this will break with other transports?
log.runtime( tc.report_n_maybe_raise(
f'IPC channel closed abruptly\n' message=(
f'<=x peer: {chan.uid}\n' f'peer IPC channel closed abruptly?\n\n'
f' |_{chan.raddr}\n' f'<=x {chan}\n'
f' |_{chan.raddr}\n\n'
)
+
tc.message
) )
# transport **WAS** disconnected # transport **WAS** disconnected