Compare commits
No commits in common. "9807318e3dd178892c28292cbadd4197a0f37ed2" and "c208bcbb1b550c94f2f827925a29f69895b5778c" have entirely different histories.
9807318e3d
...
c208bcbb1b
|
@ -138,19 +138,11 @@ def tpt_protos(request) -> list[str]:
|
||||||
yield proto_keys
|
yield proto_keys
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(
|
@pytest.fixture(scope='session')
|
||||||
scope='session',
|
|
||||||
autouse=True,
|
|
||||||
)
|
|
||||||
def tpt_proto(
|
def tpt_proto(
|
||||||
tpt_protos: list[str],
|
tpt_protos: list[str],
|
||||||
) -> str:
|
) -> str:
|
||||||
proto_key: str = tpt_protos[0]
|
yield tpt_protos[0]
|
||||||
from tractor import _state
|
|
||||||
if _state._def_tpt_proto != proto_key:
|
|
||||||
_state._def_tpt_proto = proto_key
|
|
||||||
# breakpoint()
|
|
||||||
yield proto_key
|
|
||||||
|
|
||||||
|
|
||||||
_ci_env: bool = os.environ.get('CI', False)
|
_ci_env: bool = os.environ.get('CI', False)
|
||||||
|
|
|
@ -948,8 +948,7 @@ class TransportClosed(Exception):
|
||||||
self._loglevel: str = loglevel
|
self._loglevel: str = loglevel
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
|
|
||||||
self._src_exc = src_exc
|
self.src_exc = src_exc
|
||||||
# set the cause manually if not already set by python
|
|
||||||
if (
|
if (
|
||||||
src_exc is not None
|
src_exc is not None
|
||||||
and
|
and
|
||||||
|
@ -961,18 +960,9 @@ class TransportClosed(Exception):
|
||||||
# the exc in its `TransportClosed` handler block.
|
# the exc in its `TransportClosed` handler block.
|
||||||
self._raise_on_report = raise_on_report
|
self._raise_on_report = raise_on_report
|
||||||
|
|
||||||
@property
|
|
||||||
def src_exc(self) -> Exception:
|
|
||||||
return (
|
|
||||||
self.__cause__
|
|
||||||
or
|
|
||||||
self._src_exc
|
|
||||||
)
|
|
||||||
|
|
||||||
def report_n_maybe_raise(
|
def report_n_maybe_raise(
|
||||||
self,
|
self,
|
||||||
message: str|None = None,
|
message: str|None = None,
|
||||||
hide_tb: bool = True,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -980,10 +970,9 @@ class TransportClosed(Exception):
|
||||||
for this error.
|
for this error.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
|
||||||
message: str = message or self.message
|
message: str = message or self.message
|
||||||
# when a cause is set, slap it onto the log emission.
|
# when a cause is set, slap it onto the log emission.
|
||||||
if cause := self.src_exc:
|
if cause := self.__cause__:
|
||||||
cause_tb_str: str = ''.join(
|
cause_tb_str: str = ''.join(
|
||||||
traceback.format_tb(cause.__traceback__)
|
traceback.format_tb(cause.__traceback__)
|
||||||
)
|
)
|
||||||
|
@ -1002,76 +991,26 @@ class TransportClosed(Exception):
|
||||||
if self._raise_on_report:
|
if self._raise_on_report:
|
||||||
raise self from cause
|
raise self from cause
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def repr_src_exc(
|
|
||||||
self,
|
|
||||||
src_exc: Exception|None = None,
|
|
||||||
) -> str:
|
|
||||||
|
|
||||||
if src_exc is None:
|
|
||||||
return '<unknown>'
|
|
||||||
|
|
||||||
src_msg: tuple[str] = src_exc.args
|
|
||||||
src_exc_repr: str = (
|
|
||||||
f'{type(src_exc).__name__}[ {src_msg} ]'
|
|
||||||
)
|
|
||||||
return src_exc_repr
|
|
||||||
|
|
||||||
def pformat(self) -> str:
|
def pformat(self) -> str:
|
||||||
from tractor.devx.pformat import (
|
from tractor.devx.pformat import (
|
||||||
pformat_exc,
|
pformat_exc,
|
||||||
)
|
)
|
||||||
|
src_err: Exception|None = self.src_exc or '<unknown>'
|
||||||
|
src_msg: tuple[str] = src_err.args
|
||||||
|
src_exc_repr: str = (
|
||||||
|
f'{type(src_err).__name__}[ {src_msg} ]'
|
||||||
|
)
|
||||||
return pformat_exc(
|
return pformat_exc(
|
||||||
exc=self,
|
exc=self,
|
||||||
|
# message=self.message, # implicit!
|
||||||
|
body=(
|
||||||
|
f' |_src_exc: {src_exc_repr}\n'
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# delegate to `str`-ified pformat
|
# delegate to `str`-ified pformat
|
||||||
__repr__ = pformat
|
__repr__ = pformat
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_src_exc(
|
|
||||||
cls,
|
|
||||||
src_exc: (
|
|
||||||
Exception|
|
|
||||||
trio.ClosedResource|
|
|
||||||
trio.BrokenResourceError
|
|
||||||
),
|
|
||||||
message: str,
|
|
||||||
body: str = '',
|
|
||||||
**init_kws,
|
|
||||||
) -> TransportClosed:
|
|
||||||
'''
|
|
||||||
Convenience constructor for creation from an underlying
|
|
||||||
`trio`-sourced async-resource/chan/stream error.
|
|
||||||
|
|
||||||
Embeds the original `src_exc`'s repr within the
|
|
||||||
`Exception.args` via a first-line-in-`.message`-put-in-header
|
|
||||||
pre-processing and allows inserting additional content beyond
|
|
||||||
the main message via a `body: str`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
repr_src_exc: str = cls.repr_src_exc(
|
|
||||||
src_exc,
|
|
||||||
)
|
|
||||||
next_line: str = f' src_exc: {repr_src_exc}\n'
|
|
||||||
if body:
|
|
||||||
body: str = textwrap.indent(
|
|
||||||
body,
|
|
||||||
prefix=' '*2,
|
|
||||||
)
|
|
||||||
|
|
||||||
return TransportClosed(
|
|
||||||
message=(
|
|
||||||
message
|
|
||||||
+
|
|
||||||
next_line
|
|
||||||
+
|
|
||||||
body
|
|
||||||
),
|
|
||||||
src_exc=src_exc,
|
|
||||||
**init_kws,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class NoResult(RuntimeError):
|
class NoResult(RuntimeError):
|
||||||
"No final result is expected for this actor"
|
"No final result is expected for this actor"
|
||||||
|
|
|
@ -52,8 +52,8 @@ from .msg import (
|
||||||
Return,
|
Return,
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
|
# unpack_error,
|
||||||
NoResult,
|
NoResult,
|
||||||
TransportClosed,
|
|
||||||
)
|
)
|
||||||
from ._context import (
|
from ._context import (
|
||||||
Context,
|
Context,
|
||||||
|
@ -305,34 +305,14 @@ class Portal:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except (
|
except (
|
||||||
# XXX, should never really get raised unless we aren't
|
|
||||||
# wrapping them in the below type by mistake?
|
|
||||||
#
|
|
||||||
# Leaving the catch here for now until we're very sure
|
|
||||||
# all the cases (for various tpt protos) have indeed been
|
|
||||||
# re-wrapped ;p
|
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
):
|
||||||
TransportClosed,
|
log.debug(
|
||||||
) as tpt_err:
|
'IPC chan for actor already closed or broken?\n\n'
|
||||||
report: str = (
|
|
||||||
f'IPC chan for actor already closed or broken?\n\n'
|
|
||||||
f'{self.channel.aid}\n'
|
f'{self.channel.aid}\n'
|
||||||
f' |_{self.channel}\n'
|
f' |_{self.channel}\n'
|
||||||
)
|
)
|
||||||
match tpt_err:
|
|
||||||
case TransportClosed():
|
|
||||||
log.debug(report)
|
|
||||||
case _:
|
|
||||||
report += (
|
|
||||||
f'\n'
|
|
||||||
f'Unhandled low-level transport-closed/error during\n'
|
|
||||||
f'Portal.cancel_actor()` request?\n'
|
|
||||||
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
|
|
||||||
)
|
|
||||||
log.warning(report)
|
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# TODO: do we still need this for low level `Actor`-runtime
|
# TODO: do we still need this for low level `Actor`-runtime
|
||||||
|
|
|
@ -149,10 +149,8 @@ async def open_root_actor(
|
||||||
arbiter_addr: tuple[UnwrappedAddress]|None = None,
|
arbiter_addr: tuple[UnwrappedAddress]|None = None,
|
||||||
|
|
||||||
enable_transports: list[
|
enable_transports: list[
|
||||||
# TODO, this should eventually be the pairs as
|
|
||||||
# defined by (codec, proto) as on `MsgTransport.
|
|
||||||
_state.TransportProtocolKey,
|
_state.TransportProtocolKey,
|
||||||
]|None = None,
|
] = [_state._def_tpt_proto],
|
||||||
|
|
||||||
name: str|None = 'root',
|
name: str|None = 'root',
|
||||||
|
|
||||||
|
@ -215,14 +213,6 @@ async def open_root_actor(
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
maybe_enable_greenback=maybe_enable_greenback,
|
maybe_enable_greenback=maybe_enable_greenback,
|
||||||
):
|
):
|
||||||
if enable_transports is None:
|
|
||||||
enable_transports: list[str] = _state.current_ipc_protos()
|
|
||||||
|
|
||||||
# TODO! support multi-tpts per actor! Bo
|
|
||||||
assert (
|
|
||||||
len(enable_transports) == 1
|
|
||||||
), 'No multi-tpt support yet!'
|
|
||||||
|
|
||||||
_debug.hide_runtime_frames()
|
_debug.hide_runtime_frames()
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
|
|
|
@ -102,7 +102,7 @@ def current_actor(
|
||||||
return _current_actor
|
return _current_actor
|
||||||
|
|
||||||
|
|
||||||
def is_root_process() -> bool:
|
def is_main_process() -> bool:
|
||||||
'''
|
'''
|
||||||
Bool determining if this actor is running in the top-most process.
|
Bool determining if this actor is running in the top-most process.
|
||||||
|
|
||||||
|
@ -111,10 +111,8 @@ def is_root_process() -> bool:
|
||||||
return mp.current_process().name == 'MainProcess'
|
return mp.current_process().name == 'MainProcess'
|
||||||
|
|
||||||
|
|
||||||
is_main_process = is_root_process
|
# TODO, more verby name?
|
||||||
|
def debug_mode() -> bool:
|
||||||
|
|
||||||
def is_debug_mode() -> bool:
|
|
||||||
'''
|
'''
|
||||||
Bool determining if "debug mode" is on which enables
|
Bool determining if "debug mode" is on which enables
|
||||||
remote subactor pdb entry on crashes.
|
remote subactor pdb entry on crashes.
|
||||||
|
@ -123,9 +121,6 @@ def is_debug_mode() -> bool:
|
||||||
return bool(_runtime_vars['_debug_mode'])
|
return bool(_runtime_vars['_debug_mode'])
|
||||||
|
|
||||||
|
|
||||||
debug_mode = is_debug_mode
|
|
||||||
|
|
||||||
|
|
||||||
def is_root_process() -> bool:
|
def is_root_process() -> bool:
|
||||||
return _runtime_vars['_is_root']
|
return _runtime_vars['_is_root']
|
||||||
|
|
||||||
|
@ -178,15 +173,3 @@ TransportProtocolKey = Literal[
|
||||||
'uds',
|
'uds',
|
||||||
]
|
]
|
||||||
_def_tpt_proto: TransportProtocolKey = 'tcp'
|
_def_tpt_proto: TransportProtocolKey = 'tcp'
|
||||||
|
|
||||||
|
|
||||||
def current_ipc_protos() -> list[str]:
|
|
||||||
'''
|
|
||||||
Return the list of IPC transport protocol keys currently
|
|
||||||
in use by this actor.
|
|
||||||
|
|
||||||
The keys are as declared by `MsgTransport` and `Address`
|
|
||||||
concrete-backend sub-types defined throughout `tractor.ipc`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return [_def_tpt_proto]
|
|
||||||
|
|
|
@ -316,9 +316,7 @@ class ActorNursery:
|
||||||
child_count: int = len(children)
|
child_count: int = len(children)
|
||||||
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery(
|
async with trio.open_nursery() as tn:
|
||||||
strict_exception_groups=False,
|
|
||||||
) as tn:
|
|
||||||
|
|
||||||
subactor: Actor
|
subactor: Actor
|
||||||
proc: trio.Process
|
proc: trio.Process
|
||||||
|
|
|
@ -49,7 +49,6 @@ from tractor.log import get_logger
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
TransportClosed,
|
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
Aid,
|
Aid,
|
||||||
|
@ -257,7 +256,7 @@ class Channel:
|
||||||
self,
|
self,
|
||||||
payload: Any,
|
payload: Any,
|
||||||
|
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -275,27 +274,18 @@ class Channel:
|
||||||
payload,
|
payload,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
except (
|
except BaseException as _err:
|
||||||
BaseException,
|
|
||||||
MsgTypeError,
|
|
||||||
TransportClosed,
|
|
||||||
)as _err:
|
|
||||||
err = _err # bind for introspection
|
err = _err # bind for introspection
|
||||||
match err:
|
if not isinstance(_err, MsgTypeError):
|
||||||
case MsgTypeError():
|
# assert err
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
else:
|
||||||
try:
|
try:
|
||||||
assert err.cid
|
assert err.cid
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise err
|
raise err
|
||||||
case TransportClosed():
|
|
||||||
log.transport(
|
|
||||||
f'Transport stream closed due to\n'
|
|
||||||
f'{err.repr_src_exc()}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
case _:
|
|
||||||
# never suppress non-tpt sources
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
|
|
|
@ -367,7 +367,7 @@ class MsgpackTransport(MsgTransport):
|
||||||
msg: msgtypes.MsgType,
|
msg: msgtypes.MsgType,
|
||||||
|
|
||||||
strict_types: bool = True,
|
strict_types: bool = True,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -430,9 +430,8 @@ class MsgpackTransport(MsgTransport):
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
) as bre:
|
) as trans_err:
|
||||||
trans_err = bre
|
loglevel = 'transport'
|
||||||
tpt_name: str = f'{type(self).__name__!r}'
|
|
||||||
match trans_err:
|
match trans_err:
|
||||||
case trio.BrokenResourceError() if (
|
case trio.BrokenResourceError() if (
|
||||||
'[Errno 32] Broken pipe' in trans_err.args[0]
|
'[Errno 32] Broken pipe' in trans_err.args[0]
|
||||||
|
@ -443,22 +442,21 @@ class MsgpackTransport(MsgTransport):
|
||||||
# as it pertains to rando pings from the
|
# as it pertains to rando pings from the
|
||||||
# `.discovery` subsys and protos.
|
# `.discovery` subsys and protos.
|
||||||
):
|
):
|
||||||
raise TransportClosed.from_src_exc(
|
raise TransportClosed(
|
||||||
message=(
|
message=(
|
||||||
f'{tpt_name} already closed by peer\n'
|
f'IPC transport already closed by peer\n'
|
||||||
|
f'x)> {type(trans_err)}\n'
|
||||||
|
f' |_{self}\n'
|
||||||
),
|
),
|
||||||
body=f'{self}\n',
|
loglevel=loglevel,
|
||||||
src_exc=trans_err,
|
) from trans_err
|
||||||
raise_on_report=True,
|
|
||||||
loglevel='transport',
|
|
||||||
) from bre
|
|
||||||
|
|
||||||
# unless the disconnect condition falls under "a
|
# unless the disconnect condition falls under "a
|
||||||
# normal operation breakage" we usualy console warn
|
# normal operation breakage" we usualy console warn
|
||||||
# about it.
|
# about it.
|
||||||
case _:
|
case _:
|
||||||
log.exception(
|
log.exception(
|
||||||
'{tpt_name} layer failed pre-send ??\n'
|
'Transport layer failed for {self.transport!r} ?\n'
|
||||||
)
|
)
|
||||||
raise trans_err
|
raise trans_err
|
||||||
|
|
||||||
|
@ -503,11 +501,11 @@ class MsgpackTransport(MsgTransport):
|
||||||
def pformat(self) -> str:
|
def pformat(self) -> str:
|
||||||
return (
|
return (
|
||||||
f'<{type(self).__name__}(\n'
|
f'<{type(self).__name__}(\n'
|
||||||
|
f' |_task: {self._task}\n'
|
||||||
|
f'\n'
|
||||||
f' |_peers: 2\n'
|
f' |_peers: 2\n'
|
||||||
f' laddr: {self._laddr}\n'
|
f' laddr: {self._laddr}\n'
|
||||||
f' raddr: {self._raddr}\n'
|
f' raddr: {self._raddr}\n'
|
||||||
# f'\n'
|
|
||||||
f' |_task: {self._task}\n'
|
|
||||||
f')>\n'
|
f')>\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue