Compare commits
7 Commits
c208bcbb1b
...
9807318e3d
Author | SHA1 | Date |
---|---|---|
|
9807318e3d | |
|
b700d90e09 | |
|
6ff3b6c757 | |
|
8bda59c23d | |
|
1628fd1d7b | |
|
5f74ce9a95 | |
|
477343af53 |
|
@ -138,11 +138,19 @@ def tpt_protos(request) -> list[str]:
|
||||||
yield proto_keys
|
yield proto_keys
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(
|
||||||
|
scope='session',
|
||||||
|
autouse=True,
|
||||||
|
)
|
||||||
def tpt_proto(
|
def tpt_proto(
|
||||||
tpt_protos: list[str],
|
tpt_protos: list[str],
|
||||||
) -> str:
|
) -> str:
|
||||||
yield tpt_protos[0]
|
proto_key: str = tpt_protos[0]
|
||||||
|
from tractor import _state
|
||||||
|
if _state._def_tpt_proto != proto_key:
|
||||||
|
_state._def_tpt_proto = proto_key
|
||||||
|
# breakpoint()
|
||||||
|
yield proto_key
|
||||||
|
|
||||||
|
|
||||||
_ci_env: bool = os.environ.get('CI', False)
|
_ci_env: bool = os.environ.get('CI', False)
|
||||||
|
|
|
@ -948,7 +948,8 @@ class TransportClosed(Exception):
|
||||||
self._loglevel: str = loglevel
|
self._loglevel: str = loglevel
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
|
|
||||||
self.src_exc = src_exc
|
self._src_exc = src_exc
|
||||||
|
# set the cause manually if not already set by python
|
||||||
if (
|
if (
|
||||||
src_exc is not None
|
src_exc is not None
|
||||||
and
|
and
|
||||||
|
@ -960,9 +961,18 @@ class TransportClosed(Exception):
|
||||||
# the exc in its `TransportClosed` handler block.
|
# the exc in its `TransportClosed` handler block.
|
||||||
self._raise_on_report = raise_on_report
|
self._raise_on_report = raise_on_report
|
||||||
|
|
||||||
|
@property
|
||||||
|
def src_exc(self) -> Exception:
|
||||||
|
return (
|
||||||
|
self.__cause__
|
||||||
|
or
|
||||||
|
self._src_exc
|
||||||
|
)
|
||||||
|
|
||||||
def report_n_maybe_raise(
|
def report_n_maybe_raise(
|
||||||
self,
|
self,
|
||||||
message: str|None = None,
|
message: str|None = None,
|
||||||
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -970,9 +980,10 @@ class TransportClosed(Exception):
|
||||||
for this error.
|
for this error.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
message: str = message or self.message
|
message: str = message or self.message
|
||||||
# when a cause is set, slap it onto the log emission.
|
# when a cause is set, slap it onto the log emission.
|
||||||
if cause := self.__cause__:
|
if cause := self.src_exc:
|
||||||
cause_tb_str: str = ''.join(
|
cause_tb_str: str = ''.join(
|
||||||
traceback.format_tb(cause.__traceback__)
|
traceback.format_tb(cause.__traceback__)
|
||||||
)
|
)
|
||||||
|
@ -991,26 +1002,76 @@ class TransportClosed(Exception):
|
||||||
if self._raise_on_report:
|
if self._raise_on_report:
|
||||||
raise self from cause
|
raise self from cause
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def repr_src_exc(
|
||||||
|
self,
|
||||||
|
src_exc: Exception|None = None,
|
||||||
|
) -> str:
|
||||||
|
|
||||||
|
if src_exc is None:
|
||||||
|
return '<unknown>'
|
||||||
|
|
||||||
|
src_msg: tuple[str] = src_exc.args
|
||||||
|
src_exc_repr: str = (
|
||||||
|
f'{type(src_exc).__name__}[ {src_msg} ]'
|
||||||
|
)
|
||||||
|
return src_exc_repr
|
||||||
|
|
||||||
def pformat(self) -> str:
|
def pformat(self) -> str:
|
||||||
from tractor.devx.pformat import (
|
from tractor.devx.pformat import (
|
||||||
pformat_exc,
|
pformat_exc,
|
||||||
)
|
)
|
||||||
src_err: Exception|None = self.src_exc or '<unknown>'
|
|
||||||
src_msg: tuple[str] = src_err.args
|
|
||||||
src_exc_repr: str = (
|
|
||||||
f'{type(src_err).__name__}[ {src_msg} ]'
|
|
||||||
)
|
|
||||||
return pformat_exc(
|
return pformat_exc(
|
||||||
exc=self,
|
exc=self,
|
||||||
# message=self.message, # implicit!
|
|
||||||
body=(
|
|
||||||
f' |_src_exc: {src_exc_repr}\n'
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# delegate to `str`-ified pformat
|
# delegate to `str`-ified pformat
|
||||||
__repr__ = pformat
|
__repr__ = pformat
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_src_exc(
|
||||||
|
cls,
|
||||||
|
src_exc: (
|
||||||
|
Exception|
|
||||||
|
trio.ClosedResource|
|
||||||
|
trio.BrokenResourceError
|
||||||
|
),
|
||||||
|
message: str,
|
||||||
|
body: str = '',
|
||||||
|
**init_kws,
|
||||||
|
) -> TransportClosed:
|
||||||
|
'''
|
||||||
|
Convenience constructor for creation from an underlying
|
||||||
|
`trio`-sourced async-resource/chan/stream error.
|
||||||
|
|
||||||
|
Embeds the original `src_exc`'s repr within the
|
||||||
|
`Exception.args` via a first-line-in-`.message`-put-in-header
|
||||||
|
pre-processing and allows inserting additional content beyond
|
||||||
|
the main message via a `body: str`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
repr_src_exc: str = cls.repr_src_exc(
|
||||||
|
src_exc,
|
||||||
|
)
|
||||||
|
next_line: str = f' src_exc: {repr_src_exc}\n'
|
||||||
|
if body:
|
||||||
|
body: str = textwrap.indent(
|
||||||
|
body,
|
||||||
|
prefix=' '*2,
|
||||||
|
)
|
||||||
|
|
||||||
|
return TransportClosed(
|
||||||
|
message=(
|
||||||
|
message
|
||||||
|
+
|
||||||
|
next_line
|
||||||
|
+
|
||||||
|
body
|
||||||
|
),
|
||||||
|
src_exc=src_exc,
|
||||||
|
**init_kws,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class NoResult(RuntimeError):
|
class NoResult(RuntimeError):
|
||||||
"No final result is expected for this actor"
|
"No final result is expected for this actor"
|
||||||
|
|
|
@ -52,8 +52,8 @@ from .msg import (
|
||||||
Return,
|
Return,
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
# unpack_error,
|
|
||||||
NoResult,
|
NoResult,
|
||||||
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from ._context import (
|
from ._context import (
|
||||||
Context,
|
Context,
|
||||||
|
@ -305,14 +305,34 @@ class Portal:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except (
|
except (
|
||||||
|
# XXX, should never really get raised unless we aren't
|
||||||
|
# wrapping them in the below type by mistake?
|
||||||
|
#
|
||||||
|
# Leaving the catch here for now until we're very sure
|
||||||
|
# all the cases (for various tpt protos) have indeed been
|
||||||
|
# re-wrapped ;p
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
):
|
|
||||||
log.debug(
|
TransportClosed,
|
||||||
'IPC chan for actor already closed or broken?\n\n'
|
) as tpt_err:
|
||||||
|
report: str = (
|
||||||
|
f'IPC chan for actor already closed or broken?\n\n'
|
||||||
f'{self.channel.aid}\n'
|
f'{self.channel.aid}\n'
|
||||||
f' |_{self.channel}\n'
|
f' |_{self.channel}\n'
|
||||||
)
|
)
|
||||||
|
match tpt_err:
|
||||||
|
case TransportClosed():
|
||||||
|
log.debug(report)
|
||||||
|
case _:
|
||||||
|
report += (
|
||||||
|
f'\n'
|
||||||
|
f'Unhandled low-level transport-closed/error during\n'
|
||||||
|
f'Portal.cancel_actor()` request?\n'
|
||||||
|
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
|
||||||
|
)
|
||||||
|
log.warning(report)
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# TODO: do we still need this for low level `Actor`-runtime
|
# TODO: do we still need this for low level `Actor`-runtime
|
||||||
|
|
|
@ -149,8 +149,10 @@ async def open_root_actor(
|
||||||
arbiter_addr: tuple[UnwrappedAddress]|None = None,
|
arbiter_addr: tuple[UnwrappedAddress]|None = None,
|
||||||
|
|
||||||
enable_transports: list[
|
enable_transports: list[
|
||||||
|
# TODO, this should eventually be the pairs as
|
||||||
|
# defined by (codec, proto) as on `MsgTransport.
|
||||||
_state.TransportProtocolKey,
|
_state.TransportProtocolKey,
|
||||||
] = [_state._def_tpt_proto],
|
]|None = None,
|
||||||
|
|
||||||
name: str|None = 'root',
|
name: str|None = 'root',
|
||||||
|
|
||||||
|
@ -213,6 +215,14 @@ async def open_root_actor(
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
maybe_enable_greenback=maybe_enable_greenback,
|
maybe_enable_greenback=maybe_enable_greenback,
|
||||||
):
|
):
|
||||||
|
if enable_transports is None:
|
||||||
|
enable_transports: list[str] = _state.current_ipc_protos()
|
||||||
|
|
||||||
|
# TODO! support multi-tpts per actor! Bo
|
||||||
|
assert (
|
||||||
|
len(enable_transports) == 1
|
||||||
|
), 'No multi-tpt support yet!'
|
||||||
|
|
||||||
_debug.hide_runtime_frames()
|
_debug.hide_runtime_frames()
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
|
|
|
@ -102,7 +102,7 @@ def current_actor(
|
||||||
return _current_actor
|
return _current_actor
|
||||||
|
|
||||||
|
|
||||||
def is_main_process() -> bool:
|
def is_root_process() -> bool:
|
||||||
'''
|
'''
|
||||||
Bool determining if this actor is running in the top-most process.
|
Bool determining if this actor is running in the top-most process.
|
||||||
|
|
||||||
|
@ -111,8 +111,10 @@ def is_main_process() -> bool:
|
||||||
return mp.current_process().name == 'MainProcess'
|
return mp.current_process().name == 'MainProcess'
|
||||||
|
|
||||||
|
|
||||||
# TODO, more verby name?
|
is_main_process = is_root_process
|
||||||
def debug_mode() -> bool:
|
|
||||||
|
|
||||||
|
def is_debug_mode() -> bool:
|
||||||
'''
|
'''
|
||||||
Bool determining if "debug mode" is on which enables
|
Bool determining if "debug mode" is on which enables
|
||||||
remote subactor pdb entry on crashes.
|
remote subactor pdb entry on crashes.
|
||||||
|
@ -121,6 +123,9 @@ def debug_mode() -> bool:
|
||||||
return bool(_runtime_vars['_debug_mode'])
|
return bool(_runtime_vars['_debug_mode'])
|
||||||
|
|
||||||
|
|
||||||
|
debug_mode = is_debug_mode
|
||||||
|
|
||||||
|
|
||||||
def is_root_process() -> bool:
|
def is_root_process() -> bool:
|
||||||
return _runtime_vars['_is_root']
|
return _runtime_vars['_is_root']
|
||||||
|
|
||||||
|
@ -173,3 +178,15 @@ TransportProtocolKey = Literal[
|
||||||
'uds',
|
'uds',
|
||||||
]
|
]
|
||||||
_def_tpt_proto: TransportProtocolKey = 'tcp'
|
_def_tpt_proto: TransportProtocolKey = 'tcp'
|
||||||
|
|
||||||
|
|
||||||
|
def current_ipc_protos() -> list[str]:
|
||||||
|
'''
|
||||||
|
Return the list of IPC transport protocol keys currently
|
||||||
|
in use by this actor.
|
||||||
|
|
||||||
|
The keys are as declared by `MsgTransport` and `Address`
|
||||||
|
concrete-backend sub-types defined throughout `tractor.ipc`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return [_def_tpt_proto]
|
||||||
|
|
|
@ -316,7 +316,9 @@ class ActorNursery:
|
||||||
child_count: int = len(children)
|
child_count: int = len(children)
|
||||||
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery(
|
||||||
|
strict_exception_groups=False,
|
||||||
|
) as tn:
|
||||||
|
|
||||||
subactor: Actor
|
subactor: Actor
|
||||||
proc: trio.Process
|
proc: trio.Process
|
||||||
|
|
|
@ -49,6 +49,7 @@ from tractor.log import get_logger
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
Aid,
|
Aid,
|
||||||
|
@ -256,7 +257,7 @@ class Channel:
|
||||||
self,
|
self,
|
||||||
payload: Any,
|
payload: Any,
|
||||||
|
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -274,18 +275,27 @@ class Channel:
|
||||||
payload,
|
payload,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
except BaseException as _err:
|
except (
|
||||||
|
BaseException,
|
||||||
|
MsgTypeError,
|
||||||
|
TransportClosed,
|
||||||
|
)as _err:
|
||||||
err = _err # bind for introspection
|
err = _err # bind for introspection
|
||||||
if not isinstance(_err, MsgTypeError):
|
match err:
|
||||||
# assert err
|
case MsgTypeError():
|
||||||
__tracebackhide__: bool = False
|
try:
|
||||||
else:
|
assert err.cid
|
||||||
try:
|
except KeyError:
|
||||||
assert err.cid
|
raise err
|
||||||
|
case TransportClosed():
|
||||||
except KeyError:
|
log.transport(
|
||||||
raise err
|
f'Transport stream closed due to\n'
|
||||||
|
f'{err.repr_src_exc()}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
case _:
|
||||||
|
# never suppress non-tpt sources
|
||||||
|
__tracebackhide__: bool = False
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
|
|
|
@ -367,7 +367,7 @@ class MsgpackTransport(MsgTransport):
|
||||||
msg: msgtypes.MsgType,
|
msg: msgtypes.MsgType,
|
||||||
|
|
||||||
strict_types: bool = True,
|
strict_types: bool = True,
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -430,8 +430,9 @@ class MsgpackTransport(MsgTransport):
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
) as trans_err:
|
) as bre:
|
||||||
loglevel = 'transport'
|
trans_err = bre
|
||||||
|
tpt_name: str = f'{type(self).__name__!r}'
|
||||||
match trans_err:
|
match trans_err:
|
||||||
case trio.BrokenResourceError() if (
|
case trio.BrokenResourceError() if (
|
||||||
'[Errno 32] Broken pipe' in trans_err.args[0]
|
'[Errno 32] Broken pipe' in trans_err.args[0]
|
||||||
|
@ -442,21 +443,22 @@ class MsgpackTransport(MsgTransport):
|
||||||
# as it pertains to rando pings from the
|
# as it pertains to rando pings from the
|
||||||
# `.discovery` subsys and protos.
|
# `.discovery` subsys and protos.
|
||||||
):
|
):
|
||||||
raise TransportClosed(
|
raise TransportClosed.from_src_exc(
|
||||||
message=(
|
message=(
|
||||||
f'IPC transport already closed by peer\n'
|
f'{tpt_name} already closed by peer\n'
|
||||||
f'x)> {type(trans_err)}\n'
|
|
||||||
f' |_{self}\n'
|
|
||||||
),
|
),
|
||||||
loglevel=loglevel,
|
body=f'{self}\n',
|
||||||
) from trans_err
|
src_exc=trans_err,
|
||||||
|
raise_on_report=True,
|
||||||
|
loglevel='transport',
|
||||||
|
) from bre
|
||||||
|
|
||||||
# unless the disconnect condition falls under "a
|
# unless the disconnect condition falls under "a
|
||||||
# normal operation breakage" we usualy console warn
|
# normal operation breakage" we usualy console warn
|
||||||
# about it.
|
# about it.
|
||||||
case _:
|
case _:
|
||||||
log.exception(
|
log.exception(
|
||||||
'Transport layer failed for {self.transport!r} ?\n'
|
'{tpt_name} layer failed pre-send ??\n'
|
||||||
)
|
)
|
||||||
raise trans_err
|
raise trans_err
|
||||||
|
|
||||||
|
@ -501,11 +503,11 @@ class MsgpackTransport(MsgTransport):
|
||||||
def pformat(self) -> str:
|
def pformat(self) -> str:
|
||||||
return (
|
return (
|
||||||
f'<{type(self).__name__}(\n'
|
f'<{type(self).__name__}(\n'
|
||||||
f' |_task: {self._task}\n'
|
|
||||||
f'\n'
|
|
||||||
f' |_peers: 2\n'
|
f' |_peers: 2\n'
|
||||||
f' laddr: {self._laddr}\n'
|
f' laddr: {self._laddr}\n'
|
||||||
f' raddr: {self._raddr}\n'
|
f' raddr: {self._raddr}\n'
|
||||||
|
# f'\n'
|
||||||
|
f' |_task: {self._task}\n'
|
||||||
f')>\n'
|
f')>\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue