Compare commits

..

No commits in common. "9807318e3dd178892c28292cbadd4197a0f37ed2" and "c208bcbb1b550c94f2f827925a29f69895b5778c" have entirely different histories.

8 changed files with 45 additions and 175 deletions

View File

@ -138,19 +138,11 @@ def tpt_protos(request) -> list[str]:
yield proto_keys
@pytest.fixture(
scope='session',
autouse=True,
)
@pytest.fixture(scope='session')
def tpt_proto(
tpt_protos: list[str],
) -> str:
proto_key: str = tpt_protos[0]
from tractor import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
# breakpoint()
yield proto_key
yield tpt_protos[0]
_ci_env: bool = os.environ.get('CI', False)

View File

@ -948,8 +948,7 @@ class TransportClosed(Exception):
self._loglevel: str = loglevel
super().__init__(message)
self._src_exc = src_exc
# set the cause manually if not already set by python
self.src_exc = src_exc
if (
src_exc is not None
and
@ -961,18 +960,9 @@ class TransportClosed(Exception):
# the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report
@property
def src_exc(self) -> Exception:
return (
self.__cause__
or
self._src_exc
)
def report_n_maybe_raise(
self,
message: str|None = None,
hide_tb: bool = True,
) -> None:
'''
@ -980,10 +970,9 @@ class TransportClosed(Exception):
for this error.
'''
__tracebackhide__: bool = hide_tb
message: str = message or self.message
# when a cause is set, slap it onto the log emission.
if cause := self.src_exc:
if cause := self.__cause__:
cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__)
)
@ -1002,76 +991,26 @@ class TransportClosed(Exception):
if self._raise_on_report:
raise self from cause
@classmethod
def repr_src_exc(
self,
src_exc: Exception|None = None,
) -> str:
if src_exc is None:
return '<unknown>'
src_msg: tuple[str] = src_exc.args
src_exc_repr: str = (
f'{type(src_exc).__name__}[ {src_msg} ]'
)
return src_exc_repr
def pformat(self) -> str:
from tractor.devx.pformat import (
pformat_exc,
)
src_err: Exception|None = self.src_exc or '<unknown>'
src_msg: tuple[str] = src_err.args
src_exc_repr: str = (
f'{type(src_err).__name__}[ {src_msg} ]'
)
return pformat_exc(
exc=self,
# message=self.message, # implicit!
body=(
f' |_src_exc: {src_exc_repr}\n'
),
)
# delegate to `str`-ified pformat
__repr__ = pformat
@classmethod
def from_src_exc(
cls,
src_exc: (
Exception|
trio.ClosedResource|
trio.BrokenResourceError
),
message: str,
body: str = '',
**init_kws,
) -> TransportClosed:
'''
Convenience constructor for creation from an underlying
`trio`-sourced async-resource/chan/stream error.
Embeds the original `src_exc`'s repr within the
`Exception.args` via a first-line-in-`.message`-put-in-header
pre-processing and allows inserting additional content beyond
the main message via a `body: str`.
'''
repr_src_exc: str = cls.repr_src_exc(
src_exc,
)
next_line: str = f' src_exc: {repr_src_exc}\n'
if body:
body: str = textwrap.indent(
body,
prefix=' '*2,
)
return TransportClosed(
message=(
message
+
next_line
+
body
),
src_exc=src_exc,
**init_kws,
)
class NoResult(RuntimeError):
"No final result is expected for this actor"

View File

@ -52,8 +52,8 @@ from .msg import (
Return,
)
from ._exceptions import (
# unpack_error,
NoResult,
TransportClosed,
)
from ._context import (
Context,
@ -305,34 +305,14 @@ class Portal:
return False
except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
TransportClosed,
) as tpt_err:
report: str = (
f'IPC chan for actor already closed or broken?\n\n'
):
log.debug(
'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.aid}\n'
f' |_{self.channel}\n'
)
match tpt_err:
case TransportClosed():
log.debug(report)
case _:
report += (
f'\n'
f'Unhandled low-level transport-closed/error during\n'
f'Portal.cancel_actor()` request?\n'
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
)
log.warning(report)
return False
# TODO: do we still need this for low level `Actor`-runtime

View File

@ -149,10 +149,8 @@ async def open_root_actor(
arbiter_addr: tuple[UnwrappedAddress]|None = None,
enable_transports: list[
# TODO, this should eventually be the pairs as
# defined by (codec, proto) as on `MsgTransport.
_state.TransportProtocolKey,
]|None = None,
] = [_state._def_tpt_proto],
name: str|None = 'root',
@ -215,14 +213,6 @@ async def open_root_actor(
debug_mode=debug_mode,
maybe_enable_greenback=maybe_enable_greenback,
):
if enable_transports is None:
enable_transports: list[str] = _state.current_ipc_protos()
# TODO! support multi-tpts per actor! Bo
assert (
len(enable_transports) == 1
), 'No multi-tpt support yet!'
_debug.hide_runtime_frames()
__tracebackhide__: bool = hide_tb

View File

@ -102,7 +102,7 @@ def current_actor(
return _current_actor
def is_root_process() -> bool:
def is_main_process() -> bool:
'''
Bool determining if this actor is running in the top-most process.
@ -111,10 +111,8 @@ def is_root_process() -> bool:
return mp.current_process().name == 'MainProcess'
is_main_process = is_root_process
def is_debug_mode() -> bool:
# TODO, more verby name?
def debug_mode() -> bool:
'''
Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes.
@ -123,9 +121,6 @@ def is_debug_mode() -> bool:
return bool(_runtime_vars['_debug_mode'])
debug_mode = is_debug_mode
def is_root_process() -> bool:
return _runtime_vars['_is_root']
@ -178,15 +173,3 @@ TransportProtocolKey = Literal[
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'
def current_ipc_protos() -> list[str]:
'''
Return the list of IPC transport protocol keys currently
in use by this actor.
The keys are as declared by `MsgTransport` and `Address`
concrete-backend sub-types defined throughout `tractor.ipc`.
'''
return [_def_tpt_proto]

View File

@ -316,9 +316,7 @@ class ActorNursery:
child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n'
with trio.move_on_after(3) as cs:
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
async with trio.open_nursery() as tn:
subactor: Actor
proc: trio.Process

View File

@ -49,7 +49,6 @@ from tractor.log import get_logger
from tractor._exceptions import (
MsgTypeError,
pack_from_raise,
TransportClosed,
)
from tractor.msg import (
Aid,
@ -257,7 +256,7 @@ class Channel:
self,
payload: Any,
hide_tb: bool = True,
hide_tb: bool = False,
) -> None:
'''
@ -275,27 +274,18 @@ class Channel:
payload,
hide_tb=hide_tb,
)
except (
BaseException,
MsgTypeError,
TransportClosed,
)as _err:
except BaseException as _err:
err = _err # bind for introspection
match err:
case MsgTypeError():
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
try:
assert err.cid
except KeyError:
raise err
case TransportClosed():
log.transport(
f'Transport stream closed due to\n'
f'{err.repr_src_exc()}\n'
)
case _:
# never suppress non-tpt sources
__tracebackhide__: bool = False
raise
async def recv(self) -> Any:

View File

@ -367,7 +367,7 @@ class MsgpackTransport(MsgTransport):
msg: msgtypes.MsgType,
strict_types: bool = True,
hide_tb: bool = True,
hide_tb: bool = False,
) -> None:
'''
@ -430,9 +430,8 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as bre:
trans_err = bre
tpt_name: str = f'{type(self).__name__!r}'
) as trans_err:
loglevel = 'transport'
match trans_err:
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
@ -443,22 +442,21 @@ class MsgpackTransport(MsgTransport):
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
):
raise TransportClosed.from_src_exc(
raise TransportClosed(
message=(
f'{tpt_name} already closed by peer\n'
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
body=f'{self}\n',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
) from bre
loglevel=loglevel,
) from trans_err
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
'{tpt_name} layer failed pre-send ??\n'
'Transport layer failed for {self.transport!r} ?\n'
)
raise trans_err
@ -503,11 +501,11 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_task: {self._task}\n'
f'\n'
f' |_peers: 2\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
# f'\n'
f' |_task: {self._task}\n'
f')>\n'
)