Compare commits
7 Commits
c208bcbb1b
...
9807318e3d
Author | SHA1 | Date |
---|---|---|
|
9807318e3d | |
|
b700d90e09 | |
|
6ff3b6c757 | |
|
8bda59c23d | |
|
1628fd1d7b | |
|
5f74ce9a95 | |
|
477343af53 |
|
@ -138,11 +138,19 @@ def tpt_protos(request) -> list[str]:
|
|||
yield proto_keys
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
@pytest.fixture(
|
||||
scope='session',
|
||||
autouse=True,
|
||||
)
|
||||
def tpt_proto(
|
||||
tpt_protos: list[str],
|
||||
) -> str:
|
||||
yield tpt_protos[0]
|
||||
proto_key: str = tpt_protos[0]
|
||||
from tractor import _state
|
||||
if _state._def_tpt_proto != proto_key:
|
||||
_state._def_tpt_proto = proto_key
|
||||
# breakpoint()
|
||||
yield proto_key
|
||||
|
||||
|
||||
_ci_env: bool = os.environ.get('CI', False)
|
||||
|
|
|
@ -948,7 +948,8 @@ class TransportClosed(Exception):
|
|||
self._loglevel: str = loglevel
|
||||
super().__init__(message)
|
||||
|
||||
self.src_exc = src_exc
|
||||
self._src_exc = src_exc
|
||||
# set the cause manually if not already set by python
|
||||
if (
|
||||
src_exc is not None
|
||||
and
|
||||
|
@ -960,9 +961,18 @@ class TransportClosed(Exception):
|
|||
# the exc in its `TransportClosed` handler block.
|
||||
self._raise_on_report = raise_on_report
|
||||
|
||||
@property
|
||||
def src_exc(self) -> Exception:
|
||||
return (
|
||||
self.__cause__
|
||||
or
|
||||
self._src_exc
|
||||
)
|
||||
|
||||
def report_n_maybe_raise(
|
||||
self,
|
||||
message: str|None = None,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -970,9 +980,10 @@ class TransportClosed(Exception):
|
|||
for this error.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
message: str = message or self.message
|
||||
# when a cause is set, slap it onto the log emission.
|
||||
if cause := self.__cause__:
|
||||
if cause := self.src_exc:
|
||||
cause_tb_str: str = ''.join(
|
||||
traceback.format_tb(cause.__traceback__)
|
||||
)
|
||||
|
@ -991,26 +1002,76 @@ class TransportClosed(Exception):
|
|||
if self._raise_on_report:
|
||||
raise self from cause
|
||||
|
||||
@classmethod
|
||||
def repr_src_exc(
|
||||
self,
|
||||
src_exc: Exception|None = None,
|
||||
) -> str:
|
||||
|
||||
if src_exc is None:
|
||||
return '<unknown>'
|
||||
|
||||
src_msg: tuple[str] = src_exc.args
|
||||
src_exc_repr: str = (
|
||||
f'{type(src_exc).__name__}[ {src_msg} ]'
|
||||
)
|
||||
return src_exc_repr
|
||||
|
||||
def pformat(self) -> str:
|
||||
from tractor.devx.pformat import (
|
||||
pformat_exc,
|
||||
)
|
||||
src_err: Exception|None = self.src_exc or '<unknown>'
|
||||
src_msg: tuple[str] = src_err.args
|
||||
src_exc_repr: str = (
|
||||
f'{type(src_err).__name__}[ {src_msg} ]'
|
||||
)
|
||||
return pformat_exc(
|
||||
exc=self,
|
||||
# message=self.message, # implicit!
|
||||
body=(
|
||||
f' |_src_exc: {src_exc_repr}\n'
|
||||
),
|
||||
)
|
||||
|
||||
# delegate to `str`-ified pformat
|
||||
__repr__ = pformat
|
||||
|
||||
@classmethod
|
||||
def from_src_exc(
|
||||
cls,
|
||||
src_exc: (
|
||||
Exception|
|
||||
trio.ClosedResource|
|
||||
trio.BrokenResourceError
|
||||
),
|
||||
message: str,
|
||||
body: str = '',
|
||||
**init_kws,
|
||||
) -> TransportClosed:
|
||||
'''
|
||||
Convenience constructor for creation from an underlying
|
||||
`trio`-sourced async-resource/chan/stream error.
|
||||
|
||||
Embeds the original `src_exc`'s repr within the
|
||||
`Exception.args` via a first-line-in-`.message`-put-in-header
|
||||
pre-processing and allows inserting additional content beyond
|
||||
the main message via a `body: str`.
|
||||
|
||||
'''
|
||||
repr_src_exc: str = cls.repr_src_exc(
|
||||
src_exc,
|
||||
)
|
||||
next_line: str = f' src_exc: {repr_src_exc}\n'
|
||||
if body:
|
||||
body: str = textwrap.indent(
|
||||
body,
|
||||
prefix=' '*2,
|
||||
)
|
||||
|
||||
return TransportClosed(
|
||||
message=(
|
||||
message
|
||||
+
|
||||
next_line
|
||||
+
|
||||
body
|
||||
),
|
||||
src_exc=src_exc,
|
||||
**init_kws,
|
||||
)
|
||||
|
||||
|
||||
class NoResult(RuntimeError):
|
||||
"No final result is expected for this actor"
|
||||
|
|
|
@ -52,8 +52,8 @@ from .msg import (
|
|||
Return,
|
||||
)
|
||||
from ._exceptions import (
|
||||
# unpack_error,
|
||||
NoResult,
|
||||
TransportClosed,
|
||||
)
|
||||
from ._context import (
|
||||
Context,
|
||||
|
@ -305,14 +305,34 @@ class Portal:
|
|||
return False
|
||||
|
||||
except (
|
||||
# XXX, should never really get raised unless we aren't
|
||||
# wrapping them in the below type by mistake?
|
||||
#
|
||||
# Leaving the catch here for now until we're very sure
|
||||
# all the cases (for various tpt protos) have indeed been
|
||||
# re-wrapped ;p
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
):
|
||||
log.debug(
|
||||
'IPC chan for actor already closed or broken?\n\n'
|
||||
|
||||
TransportClosed,
|
||||
) as tpt_err:
|
||||
report: str = (
|
||||
f'IPC chan for actor already closed or broken?\n\n'
|
||||
f'{self.channel.aid}\n'
|
||||
f' |_{self.channel}\n'
|
||||
)
|
||||
match tpt_err:
|
||||
case TransportClosed():
|
||||
log.debug(report)
|
||||
case _:
|
||||
report += (
|
||||
f'\n'
|
||||
f'Unhandled low-level transport-closed/error during\n'
|
||||
f'Portal.cancel_actor()` request?\n'
|
||||
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
|
||||
)
|
||||
log.warning(report)
|
||||
|
||||
return False
|
||||
|
||||
# TODO: do we still need this for low level `Actor`-runtime
|
||||
|
|
|
@ -149,8 +149,10 @@ async def open_root_actor(
|
|||
arbiter_addr: tuple[UnwrappedAddress]|None = None,
|
||||
|
||||
enable_transports: list[
|
||||
# TODO, this should eventually be the pairs as
|
||||
# defined by (codec, proto) as on `MsgTransport.
|
||||
_state.TransportProtocolKey,
|
||||
] = [_state._def_tpt_proto],
|
||||
]|None = None,
|
||||
|
||||
name: str|None = 'root',
|
||||
|
||||
|
@ -213,6 +215,14 @@ async def open_root_actor(
|
|||
debug_mode=debug_mode,
|
||||
maybe_enable_greenback=maybe_enable_greenback,
|
||||
):
|
||||
if enable_transports is None:
|
||||
enable_transports: list[str] = _state.current_ipc_protos()
|
||||
|
||||
# TODO! support multi-tpts per actor! Bo
|
||||
assert (
|
||||
len(enable_transports) == 1
|
||||
), 'No multi-tpt support yet!'
|
||||
|
||||
_debug.hide_runtime_frames()
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ def current_actor(
|
|||
return _current_actor
|
||||
|
||||
|
||||
def is_main_process() -> bool:
|
||||
def is_root_process() -> bool:
|
||||
'''
|
||||
Bool determining if this actor is running in the top-most process.
|
||||
|
||||
|
@ -111,8 +111,10 @@ def is_main_process() -> bool:
|
|||
return mp.current_process().name == 'MainProcess'
|
||||
|
||||
|
||||
# TODO, more verby name?
|
||||
def debug_mode() -> bool:
|
||||
is_main_process = is_root_process
|
||||
|
||||
|
||||
def is_debug_mode() -> bool:
|
||||
'''
|
||||
Bool determining if "debug mode" is on which enables
|
||||
remote subactor pdb entry on crashes.
|
||||
|
@ -121,6 +123,9 @@ def debug_mode() -> bool:
|
|||
return bool(_runtime_vars['_debug_mode'])
|
||||
|
||||
|
||||
debug_mode = is_debug_mode
|
||||
|
||||
|
||||
def is_root_process() -> bool:
|
||||
return _runtime_vars['_is_root']
|
||||
|
||||
|
@ -173,3 +178,15 @@ TransportProtocolKey = Literal[
|
|||
'uds',
|
||||
]
|
||||
_def_tpt_proto: TransportProtocolKey = 'tcp'
|
||||
|
||||
|
||||
def current_ipc_protos() -> list[str]:
|
||||
'''
|
||||
Return the list of IPC transport protocol keys currently
|
||||
in use by this actor.
|
||||
|
||||
The keys are as declared by `MsgTransport` and `Address`
|
||||
concrete-backend sub-types defined throughout `tractor.ipc`.
|
||||
|
||||
'''
|
||||
return [_def_tpt_proto]
|
||||
|
|
|
@ -316,7 +316,9 @@ class ActorNursery:
|
|||
child_count: int = len(children)
|
||||
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
||||
with trio.move_on_after(3) as cs:
|
||||
async with trio.open_nursery() as tn:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn:
|
||||
|
||||
subactor: Actor
|
||||
proc: trio.Process
|
||||
|
|
|
@ -49,6 +49,7 @@ from tractor.log import get_logger
|
|||
from tractor._exceptions import (
|
||||
MsgTypeError,
|
||||
pack_from_raise,
|
||||
TransportClosed,
|
||||
)
|
||||
from tractor.msg import (
|
||||
Aid,
|
||||
|
@ -256,7 +257,7 @@ class Channel:
|
|||
self,
|
||||
payload: Any,
|
||||
|
||||
hide_tb: bool = False,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -274,18 +275,27 @@ class Channel:
|
|||
payload,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
except BaseException as _err:
|
||||
except (
|
||||
BaseException,
|
||||
MsgTypeError,
|
||||
TransportClosed,
|
||||
)as _err:
|
||||
err = _err # bind for introspection
|
||||
if not isinstance(_err, MsgTypeError):
|
||||
# assert err
|
||||
__tracebackhide__: bool = False
|
||||
else:
|
||||
try:
|
||||
assert err.cid
|
||||
|
||||
except KeyError:
|
||||
raise err
|
||||
match err:
|
||||
case MsgTypeError():
|
||||
try:
|
||||
assert err.cid
|
||||
except KeyError:
|
||||
raise err
|
||||
case TransportClosed():
|
||||
log.transport(
|
||||
f'Transport stream closed due to\n'
|
||||
f'{err.repr_src_exc()}\n'
|
||||
)
|
||||
|
||||
case _:
|
||||
# never suppress non-tpt sources
|
||||
__tracebackhide__: bool = False
|
||||
raise
|
||||
|
||||
async def recv(self) -> Any:
|
||||
|
|
|
@ -367,7 +367,7 @@ class MsgpackTransport(MsgTransport):
|
|||
msg: msgtypes.MsgType,
|
||||
|
||||
strict_types: bool = True,
|
||||
hide_tb: bool = False,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -430,8 +430,9 @@ class MsgpackTransport(MsgTransport):
|
|||
return await self.stream.send_all(size + bytes_data)
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
) as trans_err:
|
||||
loglevel = 'transport'
|
||||
) as bre:
|
||||
trans_err = bre
|
||||
tpt_name: str = f'{type(self).__name__!r}'
|
||||
match trans_err:
|
||||
case trio.BrokenResourceError() if (
|
||||
'[Errno 32] Broken pipe' in trans_err.args[0]
|
||||
|
@ -442,21 +443,22 @@ class MsgpackTransport(MsgTransport):
|
|||
# as it pertains to rando pings from the
|
||||
# `.discovery` subsys and protos.
|
||||
):
|
||||
raise TransportClosed(
|
||||
raise TransportClosed.from_src_exc(
|
||||
message=(
|
||||
f'IPC transport already closed by peer\n'
|
||||
f'x)> {type(trans_err)}\n'
|
||||
f' |_{self}\n'
|
||||
f'{tpt_name} already closed by peer\n'
|
||||
),
|
||||
loglevel=loglevel,
|
||||
) from trans_err
|
||||
body=f'{self}\n',
|
||||
src_exc=trans_err,
|
||||
raise_on_report=True,
|
||||
loglevel='transport',
|
||||
) from bre
|
||||
|
||||
# unless the disconnect condition falls under "a
|
||||
# normal operation breakage" we usualy console warn
|
||||
# about it.
|
||||
case _:
|
||||
log.exception(
|
||||
'Transport layer failed for {self.transport!r} ?\n'
|
||||
'{tpt_name} layer failed pre-send ??\n'
|
||||
)
|
||||
raise trans_err
|
||||
|
||||
|
@ -501,11 +503,11 @@ class MsgpackTransport(MsgTransport):
|
|||
def pformat(self) -> str:
|
||||
return (
|
||||
f'<{type(self).__name__}(\n'
|
||||
f' |_task: {self._task}\n'
|
||||
f'\n'
|
||||
f' |_peers: 2\n'
|
||||
f' laddr: {self._laddr}\n'
|
||||
f' raddr: {self._raddr}\n'
|
||||
# f'\n'
|
||||
f' |_task: {self._task}\n'
|
||||
f')>\n'
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue