Compare commits

...

7 Commits

Author SHA1 Message Date
Tyler Goodlet 9807318e3d Never hide non-[msgtype/tpt-closed] error tbs in `Channel.send()` 2025-04-11 00:00:12 -04:00
Tyler Goodlet b700d90e09 Set `_state._def_tpt_proto` in `tpt_proto` fixture
Such that the global test-session always (and only) runs against the CLI
specified `--tpt-proto=` transport protocol.
2025-04-10 23:56:47 -04:00
Tyler Goodlet 6ff3b6c757 Use `current_ipc_protos()` as the `enable_transports`-default-when-`None`
Also ensure we assertion-error whenever the list is > 1 entry for now!
2025-04-10 23:55:47 -04:00
Tyler Goodlet 8bda59c23d Add `_state.current_ipc_protos()`
For now just wrapping wtv the `._def_tpt_proto` per-actor setting is.
2025-04-10 23:53:44 -04:00
Tyler Goodlet 1628fd1d7b Another `tn` eg-loosify inside `ActorNursery.cancel()`.. 2025-04-10 23:53:35 -04:00
Tyler Goodlet 5f74ce9a95 Absorb `TransportClosed` in `Portal.cancel_actor()`
Just like we *were* for the `trio`-resource-errors it normally wraps
since we now also do the same wrapping in `MsgpackTransport.send()`
and we don't normally care to raise tpt-closure-errors on graceful actor
cancel requests.

Also, warn-report any non-tpt-closed low-level `trio` errors we haven't
yet re-wrapped (likely bc they haven't shown up).
2025-04-10 23:49:36 -04:00
Tyler Goodlet 477343af53 Add `TransportClosed.from_src_exc()`
Such that re-wrapping/raising from a low-level `trio` resource error is
simpler and includes the `.src_exc` in the `__repr__()` and
`.message/.args` rendered at higher layers (like from `Channel` and
`._rpc` machinery).

Impl deats,
- mainly leverages packing in a new cls-method `.repr_src_exc() -> str:`
  repr of the underlying error before an optional `body: str` all as
  handled by the previously augmented `.pformat()`'s delegation to
  `pformat_exc()`.
- change `.src_exc` to be a property around a renamed `._src_exc`.

But wait, why?
- use it inside `MsgpackTransport.send()` to rewrap any
  `trio.BrokenResourceError`s so we always see the underlying
  `trio`-src-exc just like in the `.recv()._iter_packets()` handlers.
2025-04-10 23:37:16 -04:00
8 changed files with 175 additions and 45 deletions

View File

@ -138,11 +138,19 @@ def tpt_protos(request) -> list[str]:
yield proto_keys
@pytest.fixture(scope='session')
@pytest.fixture(
scope='session',
autouse=True,
)
def tpt_proto(
tpt_protos: list[str],
) -> str:
yield tpt_protos[0]
proto_key: str = tpt_protos[0]
from tractor import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
# breakpoint()
yield proto_key
_ci_env: bool = os.environ.get('CI', False)

View File

@ -948,7 +948,8 @@ class TransportClosed(Exception):
self._loglevel: str = loglevel
super().__init__(message)
self.src_exc = src_exc
self._src_exc = src_exc
# set the cause manually if not already set by python
if (
src_exc is not None
and
@ -960,9 +961,18 @@ class TransportClosed(Exception):
# the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report
@property
def src_exc(self) -> Exception:
return (
self.__cause__
or
self._src_exc
)
def report_n_maybe_raise(
self,
message: str|None = None,
hide_tb: bool = True,
) -> None:
'''
@ -970,9 +980,10 @@ class TransportClosed(Exception):
for this error.
'''
__tracebackhide__: bool = hide_tb
message: str = message or self.message
# when a cause is set, slap it onto the log emission.
if cause := self.__cause__:
if cause := self.src_exc:
cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__)
)
@ -991,26 +1002,76 @@ class TransportClosed(Exception):
if self._raise_on_report:
raise self from cause
@classmethod
def repr_src_exc(
self,
src_exc: Exception|None = None,
) -> str:
if src_exc is None:
return '<unknown>'
src_msg: tuple[str] = src_exc.args
src_exc_repr: str = (
f'{type(src_exc).__name__}[ {src_msg} ]'
)
return src_exc_repr
def pformat(self) -> str:
from tractor.devx.pformat import (
pformat_exc,
)
src_err: Exception|None = self.src_exc or '<unknown>'
src_msg: tuple[str] = src_err.args
src_exc_repr: str = (
f'{type(src_err).__name__}[ {src_msg} ]'
)
return pformat_exc(
exc=self,
# message=self.message, # implicit!
body=(
f' |_src_exc: {src_exc_repr}\n'
),
)
# delegate to `str`-ified pformat
__repr__ = pformat
@classmethod
def from_src_exc(
cls,
src_exc: (
Exception|
trio.ClosedResource|
trio.BrokenResourceError
),
message: str,
body: str = '',
**init_kws,
) -> TransportClosed:
'''
Convenience constructor for creation from an underlying
`trio`-sourced async-resource/chan/stream error.
Embeds the original `src_exc`'s repr within the
`Exception.args` via a first-line-in-`.message`-put-in-header
pre-processing and allows inserting additional content beyond
the main message via a `body: str`.
'''
repr_src_exc: str = cls.repr_src_exc(
src_exc,
)
next_line: str = f' src_exc: {repr_src_exc}\n'
if body:
body: str = textwrap.indent(
body,
prefix=' '*2,
)
return TransportClosed(
message=(
message
+
next_line
+
body
),
src_exc=src_exc,
**init_kws,
)
class NoResult(RuntimeError):
"No final result is expected for this actor"

View File

@ -52,8 +52,8 @@ from .msg import (
Return,
)
from ._exceptions import (
# unpack_error,
NoResult,
TransportClosed,
)
from ._context import (
Context,
@ -305,14 +305,34 @@ class Portal:
return False
except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
):
log.debug(
'IPC chan for actor already closed or broken?\n\n'
TransportClosed,
) as tpt_err:
report: str = (
f'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.aid}\n'
f' |_{self.channel}\n'
)
match tpt_err:
case TransportClosed():
log.debug(report)
case _:
report += (
f'\n'
f'Unhandled low-level transport-closed/error during\n'
f'Portal.cancel_actor()` request?\n'
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
)
log.warning(report)
return False
# TODO: do we still need this for low level `Actor`-runtime

View File

@ -149,8 +149,10 @@ async def open_root_actor(
arbiter_addr: tuple[UnwrappedAddress]|None = None,
enable_transports: list[
# TODO, this should eventually be the pairs as
# defined by (codec, proto) as on `MsgTransport.
_state.TransportProtocolKey,
] = [_state._def_tpt_proto],
]|None = None,
name: str|None = 'root',
@ -213,6 +215,14 @@ async def open_root_actor(
debug_mode=debug_mode,
maybe_enable_greenback=maybe_enable_greenback,
):
if enable_transports is None:
enable_transports: list[str] = _state.current_ipc_protos()
# TODO! support multi-tpts per actor! Bo
assert (
len(enable_transports) == 1
), 'No multi-tpt support yet!'
_debug.hide_runtime_frames()
__tracebackhide__: bool = hide_tb

View File

@ -102,7 +102,7 @@ def current_actor(
return _current_actor
def is_main_process() -> bool:
def is_root_process() -> bool:
'''
Bool determining if this actor is running in the top-most process.
@ -111,8 +111,10 @@ def is_main_process() -> bool:
return mp.current_process().name == 'MainProcess'
# TODO, more verby name?
def debug_mode() -> bool:
is_main_process = is_root_process
def is_debug_mode() -> bool:
'''
Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes.
@ -121,6 +123,9 @@ def debug_mode() -> bool:
return bool(_runtime_vars['_debug_mode'])
debug_mode = is_debug_mode
def is_root_process() -> bool:
return _runtime_vars['_is_root']
@ -173,3 +178,15 @@ TransportProtocolKey = Literal[
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'
def current_ipc_protos() -> list[str]:
'''
Return the list of IPC transport protocol keys currently
in use by this actor.
The keys are as declared by `MsgTransport` and `Address`
concrete-backend sub-types defined throughout `tractor.ipc`.
'''
return [_def_tpt_proto]

View File

@ -316,7 +316,9 @@ class ActorNursery:
child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n'
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as tn:
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
subactor: Actor
proc: trio.Process

View File

@ -49,6 +49,7 @@ from tractor.log import get_logger
from tractor._exceptions import (
MsgTypeError,
pack_from_raise,
TransportClosed,
)
from tractor.msg import (
Aid,
@ -256,7 +257,7 @@ class Channel:
self,
payload: Any,
hide_tb: bool = False,
hide_tb: bool = True,
) -> None:
'''
@ -274,18 +275,27 @@ class Channel:
payload,
hide_tb=hide_tb,
)
except BaseException as _err:
except (
BaseException,
MsgTypeError,
TransportClosed,
)as _err:
err = _err # bind for introspection
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
match err:
case MsgTypeError():
try:
assert err.cid
except KeyError:
raise err
case TransportClosed():
log.transport(
f'Transport stream closed due to\n'
f'{err.repr_src_exc()}\n'
)
case _:
# never suppress non-tpt sources
__tracebackhide__: bool = False
raise
async def recv(self) -> Any:

View File

@ -367,7 +367,7 @@ class MsgpackTransport(MsgTransport):
msg: msgtypes.MsgType,
strict_types: bool = True,
hide_tb: bool = False,
hide_tb: bool = True,
) -> None:
'''
@ -430,8 +430,9 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as trans_err:
loglevel = 'transport'
) as bre:
trans_err = bre
tpt_name: str = f'{type(self).__name__!r}'
match trans_err:
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
@ -442,21 +443,22 @@ class MsgpackTransport(MsgTransport):
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
):
raise TransportClosed(
raise TransportClosed.from_src_exc(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
f'{tpt_name} already closed by peer\n'
),
loglevel=loglevel,
) from trans_err
body=f'{self}\n',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
) from bre
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
'Transport layer failed for {self.transport!r} ?\n'
'{tpt_name} layer failed pre-send ??\n'
)
raise trans_err
@ -501,11 +503,11 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_task: {self._task}\n'
f'\n'
f' |_peers: 2\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
# f'\n'
f' |_task: {self._task}\n'
f')>\n'
)