Add `TransportClosed.from_src_exc()`
Such that re-wrapping/raising from a low-level `trio` resource error is simpler and includes the `.src_exc` in the `__repr__()` and `.message/.args` rendered at higher layers (like from `Channel` and `._rpc` machinery). Impl deats, - mainly leverages packing in a new cls-method `.repr_src_exc() -> str:` repr of the underlying error before an optional `body: str` all as handled by the previously augmented `.pformat()`'s delegation to `pformat_exc()`. - change `.src_exc` to be a property around a renamed `._src_exc`. But wait, why? - use it inside `MsgpackTransport.send()` to rewrap any `trio.BrokenResourceError`s so we always see the underlying `trio`-src-exc just like in the `.recv()._iter_packets()` handlers.structural_dynamics_of_flow
parent
c208bcbb1b
commit
477343af53
|
@ -948,7 +948,8 @@ class TransportClosed(Exception):
|
||||||
self._loglevel: str = loglevel
|
self._loglevel: str = loglevel
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
|
|
||||||
self.src_exc = src_exc
|
self._src_exc = src_exc
|
||||||
|
# set the cause manually if not already set by python
|
||||||
if (
|
if (
|
||||||
src_exc is not None
|
src_exc is not None
|
||||||
and
|
and
|
||||||
|
@ -960,9 +961,18 @@ class TransportClosed(Exception):
|
||||||
# the exc in its `TransportClosed` handler block.
|
# the exc in its `TransportClosed` handler block.
|
||||||
self._raise_on_report = raise_on_report
|
self._raise_on_report = raise_on_report
|
||||||
|
|
||||||
|
@property
|
||||||
|
def src_exc(self) -> Exception:
|
||||||
|
return (
|
||||||
|
self.__cause__
|
||||||
|
or
|
||||||
|
self._src_exc
|
||||||
|
)
|
||||||
|
|
||||||
def report_n_maybe_raise(
|
def report_n_maybe_raise(
|
||||||
self,
|
self,
|
||||||
message: str|None = None,
|
message: str|None = None,
|
||||||
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -970,9 +980,10 @@ class TransportClosed(Exception):
|
||||||
for this error.
|
for this error.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
message: str = message or self.message
|
message: str = message or self.message
|
||||||
# when a cause is set, slap it onto the log emission.
|
# when a cause is set, slap it onto the log emission.
|
||||||
if cause := self.__cause__:
|
if cause := self.src_exc:
|
||||||
cause_tb_str: str = ''.join(
|
cause_tb_str: str = ''.join(
|
||||||
traceback.format_tb(cause.__traceback__)
|
traceback.format_tb(cause.__traceback__)
|
||||||
)
|
)
|
||||||
|
@ -991,26 +1002,76 @@ class TransportClosed(Exception):
|
||||||
if self._raise_on_report:
|
if self._raise_on_report:
|
||||||
raise self from cause
|
raise self from cause
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def repr_src_exc(
|
||||||
|
self,
|
||||||
|
src_exc: Exception|None = None,
|
||||||
|
) -> str:
|
||||||
|
|
||||||
|
if src_exc is None:
|
||||||
|
return '<unknown>'
|
||||||
|
|
||||||
|
src_msg: tuple[str] = src_exc.args
|
||||||
|
src_exc_repr: str = (
|
||||||
|
f'{type(src_exc).__name__}[ {src_msg} ]'
|
||||||
|
)
|
||||||
|
return src_exc_repr
|
||||||
|
|
||||||
def pformat(self) -> str:
|
def pformat(self) -> str:
|
||||||
from tractor.devx.pformat import (
|
from tractor.devx.pformat import (
|
||||||
pformat_exc,
|
pformat_exc,
|
||||||
)
|
)
|
||||||
src_err: Exception|None = self.src_exc or '<unknown>'
|
|
||||||
src_msg: tuple[str] = src_err.args
|
|
||||||
src_exc_repr: str = (
|
|
||||||
f'{type(src_err).__name__}[ {src_msg} ]'
|
|
||||||
)
|
|
||||||
return pformat_exc(
|
return pformat_exc(
|
||||||
exc=self,
|
exc=self,
|
||||||
# message=self.message, # implicit!
|
|
||||||
body=(
|
|
||||||
f' |_src_exc: {src_exc_repr}\n'
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# delegate to `str`-ified pformat
|
# delegate to `str`-ified pformat
|
||||||
__repr__ = pformat
|
__repr__ = pformat
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_src_exc(
|
||||||
|
cls,
|
||||||
|
src_exc: (
|
||||||
|
Exception|
|
||||||
|
trio.ClosedResource|
|
||||||
|
trio.BrokenResourceError
|
||||||
|
),
|
||||||
|
message: str,
|
||||||
|
body: str = '',
|
||||||
|
**init_kws,
|
||||||
|
) -> TransportClosed:
|
||||||
|
'''
|
||||||
|
Convenience constructor for creation from an underlying
|
||||||
|
`trio`-sourced async-resource/chan/stream error.
|
||||||
|
|
||||||
|
Embeds the original `src_exc`'s repr within the
|
||||||
|
`Exception.args` via a first-line-in-`.message`-put-in-header
|
||||||
|
pre-processing and allows inserting additional content beyond
|
||||||
|
the main message via a `body: str`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
repr_src_exc: str = cls.repr_src_exc(
|
||||||
|
src_exc,
|
||||||
|
)
|
||||||
|
next_line: str = f' src_exc: {repr_src_exc}\n'
|
||||||
|
if body:
|
||||||
|
body: str = textwrap.indent(
|
||||||
|
body,
|
||||||
|
prefix=' '*2,
|
||||||
|
)
|
||||||
|
|
||||||
|
return TransportClosed(
|
||||||
|
message=(
|
||||||
|
message
|
||||||
|
+
|
||||||
|
next_line
|
||||||
|
+
|
||||||
|
body
|
||||||
|
),
|
||||||
|
src_exc=src_exc,
|
||||||
|
**init_kws,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class NoResult(RuntimeError):
|
class NoResult(RuntimeError):
|
||||||
"No final result is expected for this actor"
|
"No final result is expected for this actor"
|
||||||
|
|
|
@ -367,7 +367,7 @@ class MsgpackTransport(MsgTransport):
|
||||||
msg: msgtypes.MsgType,
|
msg: msgtypes.MsgType,
|
||||||
|
|
||||||
strict_types: bool = True,
|
strict_types: bool = True,
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -430,8 +430,9 @@ class MsgpackTransport(MsgTransport):
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
) as trans_err:
|
) as bre:
|
||||||
loglevel = 'transport'
|
trans_err = bre
|
||||||
|
tpt_name: str = f'{type(self).__name__!r}'
|
||||||
match trans_err:
|
match trans_err:
|
||||||
case trio.BrokenResourceError() if (
|
case trio.BrokenResourceError() if (
|
||||||
'[Errno 32] Broken pipe' in trans_err.args[0]
|
'[Errno 32] Broken pipe' in trans_err.args[0]
|
||||||
|
@ -442,21 +443,22 @@ class MsgpackTransport(MsgTransport):
|
||||||
# as it pertains to rando pings from the
|
# as it pertains to rando pings from the
|
||||||
# `.discovery` subsys and protos.
|
# `.discovery` subsys and protos.
|
||||||
):
|
):
|
||||||
raise TransportClosed(
|
raise TransportClosed.from_src_exc(
|
||||||
message=(
|
message=(
|
||||||
f'IPC transport already closed by peer\n'
|
f'{tpt_name} already closed by peer\n'
|
||||||
f'x)> {type(trans_err)}\n'
|
|
||||||
f' |_{self}\n'
|
|
||||||
),
|
),
|
||||||
loglevel=loglevel,
|
body=f'{self}\n',
|
||||||
) from trans_err
|
src_exc=trans_err,
|
||||||
|
raise_on_report=True,
|
||||||
|
loglevel='transport',
|
||||||
|
) from bre
|
||||||
|
|
||||||
# unless the disconnect condition falls under "a
|
# unless the disconnect condition falls under "a
|
||||||
# normal operation breakage" we usualy console warn
|
# normal operation breakage" we usualy console warn
|
||||||
# about it.
|
# about it.
|
||||||
case _:
|
case _:
|
||||||
log.exception(
|
log.exception(
|
||||||
'Transport layer failed for {self.transport!r} ?\n'
|
'{tpt_name} layer failed pre-send ??\n'
|
||||||
)
|
)
|
||||||
raise trans_err
|
raise trans_err
|
||||||
|
|
||||||
|
@ -501,11 +503,11 @@ class MsgpackTransport(MsgTransport):
|
||||||
def pformat(self) -> str:
|
def pformat(self) -> str:
|
||||||
return (
|
return (
|
||||||
f'<{type(self).__name__}(\n'
|
f'<{type(self).__name__}(\n'
|
||||||
f' |_task: {self._task}\n'
|
|
||||||
f'\n'
|
|
||||||
f' |_peers: 2\n'
|
f' |_peers: 2\n'
|
||||||
f' laddr: {self._laddr}\n'
|
f' laddr: {self._laddr}\n'
|
||||||
f' raddr: {self._raddr}\n'
|
f' raddr: {self._raddr}\n'
|
||||||
|
# f'\n'
|
||||||
|
f' |_task: {self._task}\n'
|
||||||
f')>\n'
|
f')>\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue