Compare commits

..

No commits in common. "9807318e3dd178892c28292cbadd4197a0f37ed2" and "c208bcbb1b550c94f2f827925a29f69895b5778c" have entirely different histories.

8 changed files with 45 additions and 175 deletions

View File

@ -138,19 +138,11 @@ def tpt_protos(request) -> list[str]:
yield proto_keys yield proto_keys
@pytest.fixture( @pytest.fixture(scope='session')
scope='session',
autouse=True,
)
def tpt_proto( def tpt_proto(
tpt_protos: list[str], tpt_protos: list[str],
) -> str: ) -> str:
proto_key: str = tpt_protos[0] yield tpt_protos[0]
from tractor import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
# breakpoint()
yield proto_key
_ci_env: bool = os.environ.get('CI', False) _ci_env: bool = os.environ.get('CI', False)

View File

@ -948,8 +948,7 @@ class TransportClosed(Exception):
self._loglevel: str = loglevel self._loglevel: str = loglevel
super().__init__(message) super().__init__(message)
self._src_exc = src_exc self.src_exc = src_exc
# set the cause manually if not already set by python
if ( if (
src_exc is not None src_exc is not None
and and
@ -961,18 +960,9 @@ class TransportClosed(Exception):
# the exc in its `TransportClosed` handler block. # the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report self._raise_on_report = raise_on_report
@property
def src_exc(self) -> Exception:
return (
self.__cause__
or
self._src_exc
)
def report_n_maybe_raise( def report_n_maybe_raise(
self, self,
message: str|None = None, message: str|None = None,
hide_tb: bool = True,
) -> None: ) -> None:
''' '''
@ -980,10 +970,9 @@ class TransportClosed(Exception):
for this error. for this error.
''' '''
__tracebackhide__: bool = hide_tb
message: str = message or self.message message: str = message or self.message
# when a cause is set, slap it onto the log emission. # when a cause is set, slap it onto the log emission.
if cause := self.src_exc: if cause := self.__cause__:
cause_tb_str: str = ''.join( cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__) traceback.format_tb(cause.__traceback__)
) )
@ -1002,76 +991,26 @@ class TransportClosed(Exception):
if self._raise_on_report: if self._raise_on_report:
raise self from cause raise self from cause
@classmethod
def repr_src_exc(
self,
src_exc: Exception|None = None,
) -> str:
if src_exc is None:
return '<unknown>'
src_msg: tuple[str] = src_exc.args
src_exc_repr: str = (
f'{type(src_exc).__name__}[ {src_msg} ]'
)
return src_exc_repr
def pformat(self) -> str: def pformat(self) -> str:
from tractor.devx.pformat import ( from tractor.devx.pformat import (
pformat_exc, pformat_exc,
) )
src_err: Exception|None = self.src_exc or '<unknown>'
src_msg: tuple[str] = src_err.args
src_exc_repr: str = (
f'{type(src_err).__name__}[ {src_msg} ]'
)
return pformat_exc( return pformat_exc(
exc=self, exc=self,
# message=self.message, # implicit!
body=(
f' |_src_exc: {src_exc_repr}\n'
),
) )
# delegate to `str`-ified pformat # delegate to `str`-ified pformat
__repr__ = pformat __repr__ = pformat
@classmethod
def from_src_exc(
cls,
src_exc: (
Exception|
trio.ClosedResource|
trio.BrokenResourceError
),
message: str,
body: str = '',
**init_kws,
) -> TransportClosed:
'''
Convenience constructor for creation from an underlying
`trio`-sourced async-resource/chan/stream error.
Embeds the original `src_exc`'s repr within the
`Exception.args` via a first-line-in-`.message`-put-in-header
pre-processing and allows inserting additional content beyond
the main message via a `body: str`.
'''
repr_src_exc: str = cls.repr_src_exc(
src_exc,
)
next_line: str = f' src_exc: {repr_src_exc}\n'
if body:
body: str = textwrap.indent(
body,
prefix=' '*2,
)
return TransportClosed(
message=(
message
+
next_line
+
body
),
src_exc=src_exc,
**init_kws,
)
class NoResult(RuntimeError): class NoResult(RuntimeError):
"No final result is expected for this actor" "No final result is expected for this actor"

View File

@ -52,8 +52,8 @@ from .msg import (
Return, Return,
) )
from ._exceptions import ( from ._exceptions import (
# unpack_error,
NoResult, NoResult,
TransportClosed,
) )
from ._context import ( from ._context import (
Context, Context,
@ -305,34 +305,14 @@ class Portal:
return False return False
except ( except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
):
TransportClosed, log.debug(
) as tpt_err: 'IPC chan for actor already closed or broken?\n\n'
report: str = (
f'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.aid}\n' f'{self.channel.aid}\n'
f' |_{self.channel}\n' f' |_{self.channel}\n'
) )
match tpt_err:
case TransportClosed():
log.debug(report)
case _:
report += (
f'\n'
f'Unhandled low-level transport-closed/error during\n'
f'Portal.cancel_actor()` request?\n'
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
)
log.warning(report)
return False return False
# TODO: do we still need this for low level `Actor`-runtime # TODO: do we still need this for low level `Actor`-runtime

View File

@ -149,10 +149,8 @@ async def open_root_actor(
arbiter_addr: tuple[UnwrappedAddress]|None = None, arbiter_addr: tuple[UnwrappedAddress]|None = None,
enable_transports: list[ enable_transports: list[
# TODO, this should eventually be the pairs as
# defined by (codec, proto) as on `MsgTransport.
_state.TransportProtocolKey, _state.TransportProtocolKey,
]|None = None, ] = [_state._def_tpt_proto],
name: str|None = 'root', name: str|None = 'root',
@ -215,14 +213,6 @@ async def open_root_actor(
debug_mode=debug_mode, debug_mode=debug_mode,
maybe_enable_greenback=maybe_enable_greenback, maybe_enable_greenback=maybe_enable_greenback,
): ):
if enable_transports is None:
enable_transports: list[str] = _state.current_ipc_protos()
# TODO! support multi-tpts per actor! Bo
assert (
len(enable_transports) == 1
), 'No multi-tpt support yet!'
_debug.hide_runtime_frames() _debug.hide_runtime_frames()
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb

View File

@ -102,7 +102,7 @@ def current_actor(
return _current_actor return _current_actor
def is_root_process() -> bool: def is_main_process() -> bool:
''' '''
Bool determining if this actor is running in the top-most process. Bool determining if this actor is running in the top-most process.
@ -111,10 +111,8 @@ def is_root_process() -> bool:
return mp.current_process().name == 'MainProcess' return mp.current_process().name == 'MainProcess'
is_main_process = is_root_process # TODO, more verby name?
def debug_mode() -> bool:
def is_debug_mode() -> bool:
''' '''
Bool determining if "debug mode" is on which enables Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes. remote subactor pdb entry on crashes.
@ -123,9 +121,6 @@ def is_debug_mode() -> bool:
return bool(_runtime_vars['_debug_mode']) return bool(_runtime_vars['_debug_mode'])
debug_mode = is_debug_mode
def is_root_process() -> bool: def is_root_process() -> bool:
return _runtime_vars['_is_root'] return _runtime_vars['_is_root']
@ -178,15 +173,3 @@ TransportProtocolKey = Literal[
'uds', 'uds',
] ]
_def_tpt_proto: TransportProtocolKey = 'tcp' _def_tpt_proto: TransportProtocolKey = 'tcp'
def current_ipc_protos() -> list[str]:
'''
Return the list of IPC transport protocol keys currently
in use by this actor.
The keys are as declared by `MsgTransport` and `Address`
concrete-backend sub-types defined throughout `tractor.ipc`.
'''
return [_def_tpt_proto]

View File

@ -316,9 +316,7 @@ class ActorNursery:
child_count: int = len(children) child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n' msg: str = f'Cancelling actor nursery with {child_count} children\n'
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery( async with trio.open_nursery() as tn:
strict_exception_groups=False,
) as tn:
subactor: Actor subactor: Actor
proc: trio.Process proc: trio.Process

View File

@ -49,7 +49,6 @@ from tractor.log import get_logger
from tractor._exceptions import ( from tractor._exceptions import (
MsgTypeError, MsgTypeError,
pack_from_raise, pack_from_raise,
TransportClosed,
) )
from tractor.msg import ( from tractor.msg import (
Aid, Aid,
@ -257,7 +256,7 @@ class Channel:
self, self,
payload: Any, payload: Any,
hide_tb: bool = True, hide_tb: bool = False,
) -> None: ) -> None:
''' '''
@ -275,27 +274,18 @@ class Channel:
payload, payload,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
except ( except BaseException as _err:
BaseException,
MsgTypeError,
TransportClosed,
)as _err:
err = _err # bind for introspection err = _err # bind for introspection
match err: if not isinstance(_err, MsgTypeError):
case MsgTypeError(): # assert err
try: __tracebackhide__: bool = False
assert err.cid else:
except KeyError: try:
raise err assert err.cid
case TransportClosed():
log.transport( except KeyError:
f'Transport stream closed due to\n' raise err
f'{err.repr_src_exc()}\n'
)
case _:
# never suppress non-tpt sources
__tracebackhide__: bool = False
raise raise
async def recv(self) -> Any: async def recv(self) -> Any:

View File

@ -367,7 +367,7 @@ class MsgpackTransport(MsgTransport):
msg: msgtypes.MsgType, msg: msgtypes.MsgType,
strict_types: bool = True, strict_types: bool = True,
hide_tb: bool = True, hide_tb: bool = False,
) -> None: ) -> None:
''' '''
@ -430,9 +430,8 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
) as bre: ) as trans_err:
trans_err = bre loglevel = 'transport'
tpt_name: str = f'{type(self).__name__!r}'
match trans_err: match trans_err:
case trio.BrokenResourceError() if ( case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0] '[Errno 32] Broken pipe' in trans_err.args[0]
@ -443,22 +442,21 @@ class MsgpackTransport(MsgTransport):
# as it pertains to rando pings from the # as it pertains to rando pings from the
# `.discovery` subsys and protos. # `.discovery` subsys and protos.
): ):
raise TransportClosed.from_src_exc( raise TransportClosed(
message=( message=(
f'{tpt_name} already closed by peer\n' f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
), ),
body=f'{self}\n', loglevel=loglevel,
src_exc=trans_err, ) from trans_err
raise_on_report=True,
loglevel='transport',
) from bre
# unless the disconnect condition falls under "a # unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn # normal operation breakage" we usualy console warn
# about it. # about it.
case _: case _:
log.exception( log.exception(
'{tpt_name} layer failed pre-send ??\n' 'Transport layer failed for {self.transport!r} ?\n'
) )
raise trans_err raise trans_err
@ -503,11 +501,11 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str: def pformat(self) -> str:
return ( return (
f'<{type(self).__name__}(\n' f'<{type(self).__name__}(\n'
f' |_task: {self._task}\n'
f'\n'
f' |_peers: 2\n' f' |_peers: 2\n'
f' laddr: {self._laddr}\n' f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n' f' raddr: {self._raddr}\n'
# f'\n'
f' |_task: {self._task}\n'
f')>\n' f')>\n'
) )