Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet 9be821a5cf More failed REPL-lock-request refinements
In `lock_stdio_for_peer()` better internal-error handling/reporting:
- only `Lock._blocked.remove(ctx.cid)` if that same cid was added on
  entry to avoid needless key-errors.
- drop all `Lock.release(force: bool)` usage remnants.
- if `req_ctx.cancel()` fails mention it with `ctx_err.add_note()`.
- add more explicit internal-failed-request log messaging via a new
  `fail_reason: str`.
- use and use new `x)<=\n|_` annots in any failure logging.

Other cleanups/niceties:
- drop `force: bool` flag entirely from the `Lock.release()`.
- use more supervisor-op-annots in `.pdb()` logging
  with both `_pause/crash_msg: str` instead of double '|' lines when
  `.pdb()`-reported from `._set_trace()`/`._post_mortem()`.
2024-07-02 17:06:50 -04:00
Tyler Goodlet b46400a86f Use `._entry` proto-ed "lifetime ops" in logging
As per a WIP scribbled out TODO in `._entry.nest_from_op()`, change
a bunch of "supervisor/lifetime mgmt ops" related log messages to
contain some supervisor-annotation "headers" in an effort to give
a terser "visual indication" of how some execution/scope/storage
primitive entity (like an actor/task/ctx/connection) is being operated
on (like, opening/started/closed/cancelled/erroring) from a "supervisor
action" POV.

Also tweak a bunch more emissions to lower levels to reduce noise around
normal inter-actor operations like process and IPC ctx supervision.
2024-07-02 16:31:58 -04:00
Tyler Goodlet 02812b9f51 Reraise RAEs in `MsgStream.receive()`; truncate tbs
To avoid showing lowlevel details of exception handling around the
underlying call to `return await self._ctx._pld_rx.recv_pld(ipc=self)`,
any time a `RemoteActorError` is unpacked (an raised locally) we re-raise
it directly from the captured `src_err` captured so as to present to
the user/app caller-code an exception raised directly from the `.receive()`
frame. This simplifies traceback call-stacks for any `log.exception()`
or `pdb`-REPL output filtering out the lower `PldRx` frames by default.
2024-07-02 16:31:15 -04:00
Tyler Goodlet 3c5816c977 Add `Portal.chan` property, to wrap `._chan` attr 2024-07-02 15:53:33 -04:00
7 changed files with 170 additions and 136 deletions

View File

@ -933,13 +933,14 @@ class Context:
self.cancel_called = True
header: str = (
f'Cancelling ctx with peer from {side.upper()} side\n\n'
f'Cancelling ctx from {side.upper()}-side\n'
)
reminfo: str = (
# ' =>\n'
f'Context.cancel() => {self.chan.uid}\n'
# f'Context.cancel() => {self.chan.uid}\n'
f'c)=> {self.chan.uid}\n'
# f'{self.chan.uid}\n'
f' |_ @{self.dst_maddr}\n'
f' |_ @{self.dst_maddr}\n'
f' >> {self.repr_rpc}\n'
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
# TODO: pull msg-type from spec re #320
@ -1267,6 +1268,12 @@ class Context:
@property
def maybe_error(self) -> BaseException|None:
'''
Return the (remote) error as outcome or `None`.
Remote errors take precedence over local ones.
'''
le: BaseException|None = self._local_error
re: RemoteActorError|ContextCancelled|None = self._remote_error
@ -2182,9 +2189,16 @@ async def open_context_from_portal(
# handled in the block above ^^^ !!
# await _debug.pause()
# log.cancel(
log.exception(
f'{ctx.side}-side of `Context` terminated with '
f'.outcome => {ctx.repr_outcome()}\n'
match scope_err:
case trio.Cancelled:
logmeth = log.cancel
# XXX explicitly report on any non-graceful-taskc cases
case _:
logmeth = log.exception
logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
)
if debug_mode():

View File

@ -265,7 +265,7 @@ def _trio_main(
except BaseException as err:
logmeth = log.error
exit_status: str = (
'Main actor task crashed during exit?\n'
'Main actor task exited due to crash?\n'
+
nest_from_op(
input_op='x)>', # closed by error

View File

@ -97,7 +97,7 @@ class Portal:
channel: Channel,
) -> None:
self.chan = channel
self._chan: Channel = channel
# during the portal's lifetime
self._final_result_pld: Any|None = None
self._final_result_msg: PayloadMsg|None = None
@ -109,6 +109,10 @@ class Portal:
self._streams: set[MsgStream] = set()
self.actor: Actor = current_actor()
@property
def chan(self) -> Channel:
return self._chan
@property
def channel(self) -> Channel:
'''

View File

@ -66,10 +66,11 @@ from trio import (
)
from tractor.msg import (
pretty_struct,
NamespacePath,
types as msgtypes,
MsgType,
NamespacePath,
Stop,
pretty_struct,
types as msgtypes,
)
from ._ipc import Channel
from ._context import (
@ -545,7 +546,8 @@ class Actor:
):
log.cancel(
'Waiting on cancel request to peer\n'
f'`Portal.cancel_actor()` => {chan.uid}\n'
f'c)=>\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
@ -642,12 +644,14 @@ class Actor:
# and
an_exit_cs.cancelled_caught
):
log.warning(
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'{local_nursery}\n'
f' |_{pformat(local_nursery._children)}\n'
)
# await _debug.pause()
if children := local_nursery._children:
report += f' |_{pformat(children)}\n'
log.warning(report)
if disconnected:
# if the transport died and this actor is still
@ -819,14 +823,17 @@ class Actor:
# side,
)]
except KeyError:
log.warning(
report: str = (
'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n\n'
# XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n'
f'{pretty_struct.pformat(msg)}\n'
f'<=? {uid}\n\n'
f' |_{pretty_struct.pformat(msg)}\n'
)
match msg:
case Stop():
log.runtime(report)
case _:
log.warning(report)
return
# if isinstance(msg, MsgTypeError):
@ -1338,10 +1345,11 @@ class Actor:
return True
log.cancel(
'Cancel request for RPC task\n\n'
f'<= Actor._cancel_task(): {requesting_uid}\n\n'
f'=> {ctx._task}\n'
f' |_ >> {ctx.repr_rpc}\n'
'Rxed cancel request for RPC task\n'
f'<=c) {requesting_uid}\n'
f' |_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
# f'=> {ctx._task}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n'
# f' |_ {ctx._task}\n\n'

View File

@ -246,8 +246,9 @@ async def hard_kill(
'''
log.cancel(
'Terminating sub-proc:\n'
f'|_{proc}\n'
'Terminating sub-proc\n'
f'>x)\n'
f' |_{proc}\n'
)
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
@ -293,8 +294,8 @@ async def hard_kill(
log.critical(
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
'#T-800 deployed to collect zombie B0\n'
f'|\n'
f'|_{proc}\n'
f'>x)\n'
f' |_{proc}\n'
)
proc.kill()
@ -322,8 +323,9 @@ async def soft_kill(
uid: tuple[str, str] = portal.channel.uid
try:
log.cancel(
'Soft killing sub-actor via `Portal.cancel_actor()`\n'
f'|_{proc}\n'
'Soft killing sub-actor via portal request\n'
f'c)> {portal.chan.uid}\n'
f' |_{proc}\n'
)
# wait on sub-proc to signal termination
await wait_func(proc)
@ -552,8 +554,9 @@ async def trio_proc(
# cancel result waiter that may have been spawned in
# tandem if not done already
log.cancel(
'Cancelling existing result waiter task for '
f'{subactor.uid}'
'Cancelling portal result reaper task\n'
f'>c)\n'
f' |_{subactor.uid}\n'
)
nursery.cancel_scope.cancel()
@ -562,7 +565,11 @@ async def trio_proc(
# allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
log.cancel(
f'Hard reap sequence starting for subactor\n'
f'>x)\n'
f' |_{subactor}@{subactor.uid}\n'
)
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb

View File

@ -36,8 +36,8 @@ import warnings
import trio
from ._exceptions import (
# _raise_from_no_key_in_msg,
ContextCancelled,
RemoteActorError,
)
from .log import get_logger
from .trionics import (
@ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel):
@property
def ctx(self) -> Context:
'''
This stream's IPC `Context` ref.
A read-only ref to this stream's inter-actor-task `Context`.
'''
return self._ctx
@ -145,9 +145,8 @@ class MsgStream(trio.abc.Channel):
'''
__tracebackhide__: bool = hide_tb
# NOTE: `trio.ReceiveChannel` implements
# EOC handling as follows (aka uses it
# to gracefully exit async for loops):
# NOTE FYI: `trio.ReceiveChannel` implements EOC handling as
# follows (aka uses it to gracefully exit async for loops):
#
# async def __anext__(self) -> ReceiveType:
# try:
@ -165,48 +164,29 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb
try:
ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ipc=self)
# XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure
# by the rpc-runtime OR,
# - via a received `{'stop': ...}` msg from remote side.
# |_ NOTE: previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel inside
# `Actor._deliver_ctx_payload()`, but now the 'stop' message handling
# has been put just above inside `_raise_from_no_key_in_msg()`.
except (
trio.EndOfChannel,
) as eoc:
src_err = eoc
# - `self._rx_chan.receive()` raising after manual closure
# by the rpc-runtime,
# OR
# - via a `Stop`-msg received from remote peer task.
# NOTE
# |_ previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel
# inside `Actor._deliver_ctx_payload()`, but now the 'stop'
# message handling gets delegated to `PldRFx.recv_pld()`
# internals.
except trio.EndOfChannel as eoc:
# a graceful stream finished signal
self._eoc = eoc
src_err = eoc
# TODO: Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically.
# Once we have broadcast support, we **don't** want to be
# closing this stream and not flushing a final value to
# remaining (clone) consumers who may not have been
# scheduled to receive it yet.
# try:
# maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait()
# if maybe_err_msg_or_res:
# log.warning(
# 'Discarding un-processed msg:\n'
# f'{maybe_err_msg_or_res}'
# )
# except trio.WouldBlock:
# # no queued msgs that might be another remote
# # error, so just raise the original EoC
# pass
# raise eoc
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
# a `ClosedResourceError` indicates that the internal feeder
# memory receive channel was closed likely by the runtime
# after the associated transport-channel disconnected or
# broke.
except trio.ClosedResourceError as cre: # by self._rx_chan.receive()
src_err = cre
log.warning(
@ -218,14 +198,15 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose()
if drained:
# ?TODO? pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.wait_for_result()` call?
#
# from .devx import pause
# await pause()
log.warning(
'Drained context msgs during closure:\n'
'Drained context msgs during closure\n\n'
f'{drained}'
)
# TODO: pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.result()` call?
# NOTE XXX: if the context was cancelled or remote-errored
# but we received the stream close msg first, we
@ -238,28 +219,36 @@ class MsgStream(trio.abc.Channel):
from_src_exc=src_err,
)
# propagate any error but hide low-level frame details
# from the caller by default for debug noise reduction.
# propagate any error but hide low-level frame details from
# the caller by default for console/debug-REPL noise
# reduction.
if (
hide_tb
and (
# XXX NOTE XXX don't reraise on certain
# stream-specific internal error types like,
#
# - `trio.EoC` since we want to use the exact instance
# to ensure that it is the error that bubbles upward
# for silent absorption by `Context.open_stream()`.
and not self._eoc
# XXX NOTE special conditions: don't reraise on
# certain stream-specific internal error types like,
#
# - `trio.EoC` since we want to use the exact instance
# to ensure that it is the error that bubbles upward
# for silent absorption by `Context.open_stream()`.
not self._eoc
# - `RemoteActorError` (or `ContextCancelled`) if it gets
# raised from `_raise_from_no_key_in_msg()` since we
# want the same (as the above bullet) for any
# `.open_context()` block bubbled error raised by
# any nearby ctx API remote-failures.
# and not isinstance(src_err, RemoteActorError)
# - `RemoteActorError` (or subtypes like ctxc)
# since we want to present the error as though it is
# "sourced" directly from this `.receive()` call and
# generally NOT include the stack frames raised from
# inside the `PldRx` and/or the transport stack
# layers.
or isinstance(src_err, RemoteActorError)
)
):
raise type(src_err)(*src_err.args) from src_err
else:
# for any non-graceful-EOC we want to NOT hide this frame
if not self._eoc:
__tracebackhide__: bool = False
raise src_err
async def aclose(self) -> list[Exception|dict]:
@ -385,6 +374,8 @@ class MsgStream(trio.abc.Channel):
if not self._eoc:
message: str = (
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
# } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n'
f'|_{self}\n'
)
log.cancel(message)

View File

@ -299,7 +299,6 @@ class Lock:
@pdbp.hideframe
def release(
cls,
force: bool = False,
raise_on_thread: bool = True,
) -> bool:
@ -347,12 +346,9 @@ class Lock:
lock: trio.StrictFIFOLock = cls._debug_lock
owner: Task = lock.statistics().owner
if (
(lock.locked() or force)
# ^-TODO-NOTE-^ should we just remove this, since the
# RTE case above will always happen when you force
# from the wrong task?
and (owner is task)
lock.locked()
and
(owner is task)
# ^-NOTE-^ if we do NOT ensure this, `trio` will
# raise a RTE when a non-owner tries to releasee the
# lock.
@ -553,6 +549,7 @@ async def lock_stdio_for_peer(
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
we_finished = Lock.req_handler_finished = trio.Event()
lock_blocked: bool = False
try:
if ctx.cid in Lock._blocked:
raise RuntimeError(
@ -565,7 +562,8 @@ async def lock_stdio_for_peer(
'Consider that an internal bug exists given the TTY '
'`Lock`ing IPC dialog..\n'
)
Lock._blocked.add(ctx.cid)
lock_blocked = True
root_task_name: str = current_task().name
if tuple(subactor_uid) in Lock._blocked:
log.warning(
@ -575,7 +573,11 @@ async def lock_stdio_for_peer(
)
ctx._enter_debugger_on_cancel: bool = False
message: str = (
f'Debug lock blocked for {subactor_uid}\n'
f'Debug lock blocked for subactor\n\n'
f'x)<= {subactor_uid}\n\n'
f'Likely because the root actor already started shutdown and is '
'closing IPC connections for this child!\n\n'
'Cancelling debug request!\n'
)
log.cancel(message)
@ -589,7 +591,6 @@ async def lock_stdio_for_peer(
f'remote task: {subactor_task_uid}\n'
)
DebugStatus.shield_sigint()
Lock._blocked.add(ctx.cid)
# NOTE: we use the IPC ctx's cancel scope directly in order to
# ensure that on any transport failure, or cancellation request
@ -648,31 +649,34 @@ async def lock_stdio_for_peer(
)
except BaseException as req_err:
message: str = (
f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
'Forcing `Lock.release()` for req-ctx since likely an '
'internal error!\n\n'
f'{ctx}'
fail_reason: str = (
f'on behalf of peer\n\n'
f'x)<=\n'
f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
'Forcing `Lock.release()` due to acquire failure!\n\n'
f'x)=> {ctx}\n'
)
if isinstance(req_err, trio.Cancelled):
message = (
'Cancelled during root TTY-lock dialog\n'
fail_reason = (
'Cancelled during stdio-mutex request '
+
message
fail_reason
)
else:
message = (
'Errored during root TTY-lock dialog\n'
fail_reason = (
'Failed to deliver stdio-mutex request '
+
message
fail_reason
)
log.exception(message)
Lock.release() #force=True)
log.exception(fail_reason)
Lock.release()
raise
finally:
Lock._blocked.remove(ctx.cid)
if lock_blocked:
Lock._blocked.remove(ctx.cid)
# wakeup any waiters since the lock was (presumably)
# released, possibly only temporarily.
@ -1167,7 +1171,7 @@ async def request_root_stdio_lock(
):
log.cancel(
'Debug lock request was CANCELLED?\n\n'
f'{req_ctx}\n'
f'<=c) {req_ctx}\n'
# f'{pformat_cs(req_cs, var_name="req_cs")}\n\n'
# f'{pformat_cs(req_ctx._scope, var_name="req_ctx._scope")}\n\n'
)
@ -1179,22 +1183,26 @@ async def request_root_stdio_lock(
message: str = (
'Failed during debug request dialog with root actor?\n\n'
)
if req_ctx:
if (req_ctx := DebugStatus.req_ctx):
message += (
f'{req_ctx}\n'
f'<=x) {req_ctx}\n\n'
f'Cancelling IPC ctx!\n'
)
await req_ctx.cancel()
try:
await req_ctx.cancel()
except trio.ClosedResourceError as terr:
ctx_err.add_note(
# f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` '
f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} '
)
else:
message += 'Failed during `Portal.open_context()` ?\n'
message += 'Failed in `Portal.open_context()` call ??\n'
log.exception(message)
ctx_err.add_note(message)
raise ctx_err
except (
tractor.ContextCancelled,
trio.Cancelled,
@ -1218,9 +1226,10 @@ async def request_root_stdio_lock(
# -[ ]FURTHER, after we 'continue', we should be able to
# ctl-c out of the currently hanging task!
raise DebugRequestError(
'Failed to lock stdio from subactor IPC ctx!\n\n'
'Failed during stdio-locking dialog from root actor\n\n'
f'req_ctx: {DebugStatus.req_ctx}\n'
f'<=x)\n'
f'|_{DebugStatus.req_ctx}\n'
) from req_err
finally:
@ -1998,10 +2007,10 @@ async def _pause(
# sanity, for when hackin on all this?
if not isinstance(pause_err, trio.Cancelled):
req_ctx: Context = DebugStatus.req_ctx
if req_ctx:
# XXX, bc the child-task in root might cancel it?
# assert req_ctx._scope.cancel_called
assert req_ctx.maybe_error
# if req_ctx:
# # XXX, bc the child-task in root might cancel it?
# # assert req_ctx._scope.cancel_called
# assert req_ctx.maybe_error
raise
@ -2041,11 +2050,12 @@ def _set_trace(
# root here? Bo
log.pdb(
f'{_pause_msg}\n'
'|\n'
# TODO: more compact pformating?
# '|\n'
f'>(\n'
f' |_ {task} @ {actor.uid}\n'
# ^-TODO-^ more compact pformating?
# -[ ] make an `Actor.__repr()__`
# -[ ] should we use `log.pformat_task_uid()`?
f'|_ {task} @ {actor.uid}\n'
)
# presuming the caller passed in the "api frame"
# (the last frame before user code - like `.pause()`)
@ -2541,9 +2551,9 @@ def _post_mortem(
# here! Bo
log.pdb(
f'{_crash_msg}\n'
'|\n'
# f'|_ {current_task()}\n'
f'|_ {current_task()} @ {actor.uid}\n'
# '|\n'
f'x>(\n'
f' |_ {current_task()} @ {actor.uid}\n'
# f'|_ @{actor.uid}\n'
# TODO: make an `Actor.__repr()__`