Compare commits
4 Commits
af3745684c
...
9be821a5cf
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 9be821a5cf | |
Tyler Goodlet | b46400a86f | |
Tyler Goodlet | 02812b9f51 | |
Tyler Goodlet | 3c5816c977 |
|
@ -933,13 +933,14 @@ class Context:
|
||||||
self.cancel_called = True
|
self.cancel_called = True
|
||||||
|
|
||||||
header: str = (
|
header: str = (
|
||||||
f'Cancelling ctx with peer from {side.upper()} side\n\n'
|
f'Cancelling ctx from {side.upper()}-side\n'
|
||||||
)
|
)
|
||||||
reminfo: str = (
|
reminfo: str = (
|
||||||
# ' =>\n'
|
# ' =>\n'
|
||||||
f'Context.cancel() => {self.chan.uid}\n'
|
# f'Context.cancel() => {self.chan.uid}\n'
|
||||||
|
f'c)=> {self.chan.uid}\n'
|
||||||
# f'{self.chan.uid}\n'
|
# f'{self.chan.uid}\n'
|
||||||
f' |_ @{self.dst_maddr}\n'
|
f' |_ @{self.dst_maddr}\n'
|
||||||
f' >> {self.repr_rpc}\n'
|
f' >> {self.repr_rpc}\n'
|
||||||
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
|
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
|
||||||
# TODO: pull msg-type from spec re #320
|
# TODO: pull msg-type from spec re #320
|
||||||
|
@ -1267,6 +1268,12 @@ class Context:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def maybe_error(self) -> BaseException|None:
|
def maybe_error(self) -> BaseException|None:
|
||||||
|
'''
|
||||||
|
Return the (remote) error as outcome or `None`.
|
||||||
|
|
||||||
|
Remote errors take precedence over local ones.
|
||||||
|
|
||||||
|
'''
|
||||||
le: BaseException|None = self._local_error
|
le: BaseException|None = self._local_error
|
||||||
re: RemoteActorError|ContextCancelled|None = self._remote_error
|
re: RemoteActorError|ContextCancelled|None = self._remote_error
|
||||||
|
|
||||||
|
@ -2182,9 +2189,16 @@ async def open_context_from_portal(
|
||||||
# handled in the block above ^^^ !!
|
# handled in the block above ^^^ !!
|
||||||
# await _debug.pause()
|
# await _debug.pause()
|
||||||
# log.cancel(
|
# log.cancel(
|
||||||
log.exception(
|
match scope_err:
|
||||||
f'{ctx.side}-side of `Context` terminated with '
|
case trio.Cancelled:
|
||||||
f'.outcome => {ctx.repr_outcome()}\n'
|
logmeth = log.cancel
|
||||||
|
|
||||||
|
# XXX explicitly report on any non-graceful-taskc cases
|
||||||
|
case _:
|
||||||
|
logmeth = log.exception
|
||||||
|
|
||||||
|
logmeth(
|
||||||
|
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
if debug_mode():
|
if debug_mode():
|
||||||
|
|
|
@ -265,7 +265,7 @@ def _trio_main(
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
logmeth = log.error
|
logmeth = log.error
|
||||||
exit_status: str = (
|
exit_status: str = (
|
||||||
'Main actor task crashed during exit?\n'
|
'Main actor task exited due to crash?\n'
|
||||||
+
|
+
|
||||||
nest_from_op(
|
nest_from_op(
|
||||||
input_op='x)>', # closed by error
|
input_op='x)>', # closed by error
|
||||||
|
|
|
@ -97,7 +97,7 @@ class Portal:
|
||||||
channel: Channel,
|
channel: Channel,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
self.chan = channel
|
self._chan: Channel = channel
|
||||||
# during the portal's lifetime
|
# during the portal's lifetime
|
||||||
self._final_result_pld: Any|None = None
|
self._final_result_pld: Any|None = None
|
||||||
self._final_result_msg: PayloadMsg|None = None
|
self._final_result_msg: PayloadMsg|None = None
|
||||||
|
@ -109,6 +109,10 @@ class Portal:
|
||||||
self._streams: set[MsgStream] = set()
|
self._streams: set[MsgStream] = set()
|
||||||
self.actor: Actor = current_actor()
|
self.actor: Actor = current_actor()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def chan(self) -> Channel:
|
||||||
|
return self._chan
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def channel(self) -> Channel:
|
def channel(self) -> Channel:
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -66,10 +66,11 @@ from trio import (
|
||||||
)
|
)
|
||||||
|
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
pretty_struct,
|
|
||||||
NamespacePath,
|
|
||||||
types as msgtypes,
|
|
||||||
MsgType,
|
MsgType,
|
||||||
|
NamespacePath,
|
||||||
|
Stop,
|
||||||
|
pretty_struct,
|
||||||
|
types as msgtypes,
|
||||||
)
|
)
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._context import (
|
from ._context import (
|
||||||
|
@ -545,7 +546,8 @@ class Actor:
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Waiting on cancel request to peer\n'
|
'Waiting on cancel request to peer\n'
|
||||||
f'`Portal.cancel_actor()` => {chan.uid}\n'
|
f'c)=>\n'
|
||||||
|
f' |_{chan.uid}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
# XXX: this is a soft wait on the channel (and its
|
||||||
|
@ -642,12 +644,14 @@ class Actor:
|
||||||
# and
|
# and
|
||||||
an_exit_cs.cancelled_caught
|
an_exit_cs.cancelled_caught
|
||||||
):
|
):
|
||||||
log.warning(
|
report: str = (
|
||||||
'Timed out waiting on local actor-nursery to exit?\n'
|
'Timed out waiting on local actor-nursery to exit?\n'
|
||||||
f'{local_nursery}\n'
|
f'{local_nursery}\n'
|
||||||
f' |_{pformat(local_nursery._children)}\n'
|
|
||||||
)
|
)
|
||||||
# await _debug.pause()
|
if children := local_nursery._children:
|
||||||
|
report += f' |_{pformat(children)}\n'
|
||||||
|
|
||||||
|
log.warning(report)
|
||||||
|
|
||||||
if disconnected:
|
if disconnected:
|
||||||
# if the transport died and this actor is still
|
# if the transport died and this actor is still
|
||||||
|
@ -819,14 +823,17 @@ class Actor:
|
||||||
# side,
|
# side,
|
||||||
)]
|
)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(
|
report: str = (
|
||||||
'Ignoring invalid IPC ctx msg!\n\n'
|
'Ignoring invalid IPC ctx msg!\n\n'
|
||||||
f'<= sender: {uid}\n\n'
|
f'<=? {uid}\n\n'
|
||||||
# XXX don't need right since it's always in msg?
|
f' |_{pretty_struct.pformat(msg)}\n'
|
||||||
# f'=> cid: {cid}\n\n'
|
|
||||||
|
|
||||||
f'{pretty_struct.pformat(msg)}\n'
|
|
||||||
)
|
)
|
||||||
|
match msg:
|
||||||
|
case Stop():
|
||||||
|
log.runtime(report)
|
||||||
|
case _:
|
||||||
|
log.warning(report)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# if isinstance(msg, MsgTypeError):
|
# if isinstance(msg, MsgTypeError):
|
||||||
|
@ -1338,10 +1345,11 @@ class Actor:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancel request for RPC task\n\n'
|
'Rxed cancel request for RPC task\n'
|
||||||
f'<= Actor._cancel_task(): {requesting_uid}\n\n'
|
f'<=c) {requesting_uid}\n'
|
||||||
f'=> {ctx._task}\n'
|
f' |_{ctx._task}\n'
|
||||||
f' |_ >> {ctx.repr_rpc}\n'
|
f' >> {ctx.repr_rpc}\n'
|
||||||
|
# f'=> {ctx._task}\n'
|
||||||
# f' >> Actor._cancel_task() => {ctx._task}\n'
|
# f' >> Actor._cancel_task() => {ctx._task}\n'
|
||||||
# f' |_ {ctx._task}\n\n'
|
# f' |_ {ctx._task}\n\n'
|
||||||
|
|
||||||
|
|
|
@ -246,8 +246,9 @@ async def hard_kill(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Terminating sub-proc:\n'
|
'Terminating sub-proc\n'
|
||||||
f'|_{proc}\n'
|
f'>x)\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
# NOTE: this timeout used to do nothing since we were shielding
|
# NOTE: this timeout used to do nothing since we were shielding
|
||||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||||
|
@ -293,8 +294,8 @@ async def hard_kill(
|
||||||
log.critical(
|
log.critical(
|
||||||
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
|
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
|
||||||
'#T-800 deployed to collect zombie B0\n'
|
'#T-800 deployed to collect zombie B0\n'
|
||||||
f'|\n'
|
f'>x)\n'
|
||||||
f'|_{proc}\n'
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
proc.kill()
|
proc.kill()
|
||||||
|
|
||||||
|
@ -322,8 +323,9 @@ async def soft_kill(
|
||||||
uid: tuple[str, str] = portal.channel.uid
|
uid: tuple[str, str] = portal.channel.uid
|
||||||
try:
|
try:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Soft killing sub-actor via `Portal.cancel_actor()`\n'
|
'Soft killing sub-actor via portal request\n'
|
||||||
f'|_{proc}\n'
|
f'c)> {portal.chan.uid}\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
# wait on sub-proc to signal termination
|
# wait on sub-proc to signal termination
|
||||||
await wait_func(proc)
|
await wait_func(proc)
|
||||||
|
@ -552,8 +554,9 @@ async def trio_proc(
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
# tandem if not done already
|
# tandem if not done already
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancelling existing result waiter task for '
|
'Cancelling portal result reaper task\n'
|
||||||
f'{subactor.uid}'
|
f'>c)\n'
|
||||||
|
f' |_{subactor.uid}\n'
|
||||||
)
|
)
|
||||||
nursery.cancel_scope.cancel()
|
nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
@ -562,7 +565,11 @@ async def trio_proc(
|
||||||
# allowed! Do this **after** cancellation/teardown to avoid
|
# allowed! Do this **after** cancellation/teardown to avoid
|
||||||
# killing the process too early.
|
# killing the process too early.
|
||||||
if proc:
|
if proc:
|
||||||
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
log.cancel(
|
||||||
|
f'Hard reap sequence starting for subactor\n'
|
||||||
|
f'>x)\n'
|
||||||
|
f' |_{subactor}@{subactor.uid}\n'
|
||||||
|
)
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
# don't clobber an ongoing pdb
|
# don't clobber an ongoing pdb
|
||||||
|
|
|
@ -36,8 +36,8 @@ import warnings
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
# _raise_from_no_key_in_msg,
|
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
RemoteActorError,
|
||||||
)
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .trionics import (
|
from .trionics import (
|
||||||
|
@ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
@property
|
@property
|
||||||
def ctx(self) -> Context:
|
def ctx(self) -> Context:
|
||||||
'''
|
'''
|
||||||
This stream's IPC `Context` ref.
|
A read-only ref to this stream's inter-actor-task `Context`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._ctx
|
return self._ctx
|
||||||
|
@ -145,9 +145,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
# NOTE: `trio.ReceiveChannel` implements
|
# NOTE FYI: `trio.ReceiveChannel` implements EOC handling as
|
||||||
# EOC handling as follows (aka uses it
|
# follows (aka uses it to gracefully exit async for loops):
|
||||||
# to gracefully exit async for loops):
|
|
||||||
#
|
#
|
||||||
# async def __anext__(self) -> ReceiveType:
|
# async def __anext__(self) -> ReceiveType:
|
||||||
# try:
|
# try:
|
||||||
|
@ -165,48 +164,29 @@ class MsgStream(trio.abc.Channel):
|
||||||
|
|
||||||
src_err: Exception|None = None # orig tb
|
src_err: Exception|None = None # orig tb
|
||||||
try:
|
try:
|
||||||
|
|
||||||
ctx: Context = self._ctx
|
ctx: Context = self._ctx
|
||||||
return await ctx._pld_rx.recv_pld(ipc=self)
|
return await ctx._pld_rx.recv_pld(ipc=self)
|
||||||
|
|
||||||
# XXX: the stream terminates on either of:
|
# XXX: the stream terminates on either of:
|
||||||
# - via `self._rx_chan.receive()` raising after manual closure
|
# - `self._rx_chan.receive()` raising after manual closure
|
||||||
# by the rpc-runtime OR,
|
# by the rpc-runtime,
|
||||||
# - via a received `{'stop': ...}` msg from remote side.
|
# OR
|
||||||
# |_ NOTE: previously this was triggered by calling
|
# - via a `Stop`-msg received from remote peer task.
|
||||||
# ``._rx_chan.aclose()`` on the send side of the channel inside
|
# NOTE
|
||||||
# `Actor._deliver_ctx_payload()`, but now the 'stop' message handling
|
# |_ previously this was triggered by calling
|
||||||
# has been put just above inside `_raise_from_no_key_in_msg()`.
|
# ``._rx_chan.aclose()`` on the send side of the channel
|
||||||
except (
|
# inside `Actor._deliver_ctx_payload()`, but now the 'stop'
|
||||||
trio.EndOfChannel,
|
# message handling gets delegated to `PldRFx.recv_pld()`
|
||||||
) as eoc:
|
# internals.
|
||||||
src_err = eoc
|
except trio.EndOfChannel as eoc:
|
||||||
|
# a graceful stream finished signal
|
||||||
self._eoc = eoc
|
self._eoc = eoc
|
||||||
|
src_err = eoc
|
||||||
|
|
||||||
# TODO: Locally, we want to close this stream gracefully, by
|
# a `ClosedResourceError` indicates that the internal feeder
|
||||||
# terminating any local consumers tasks deterministically.
|
# memory receive channel was closed likely by the runtime
|
||||||
# Once we have broadcast support, we **don't** want to be
|
# after the associated transport-channel disconnected or
|
||||||
# closing this stream and not flushing a final value to
|
# broke.
|
||||||
# remaining (clone) consumers who may not have been
|
|
||||||
# scheduled to receive it yet.
|
|
||||||
# try:
|
|
||||||
# maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait()
|
|
||||||
# if maybe_err_msg_or_res:
|
|
||||||
# log.warning(
|
|
||||||
# 'Discarding un-processed msg:\n'
|
|
||||||
# f'{maybe_err_msg_or_res}'
|
|
||||||
# )
|
|
||||||
# except trio.WouldBlock:
|
|
||||||
# # no queued msgs that might be another remote
|
|
||||||
# # error, so just raise the original EoC
|
|
||||||
# pass
|
|
||||||
|
|
||||||
# raise eoc
|
|
||||||
|
|
||||||
# a ``ClosedResourceError`` indicates that the internal
|
|
||||||
# feeder memory receive channel was closed likely by the
|
|
||||||
# runtime after the associated transport-channel
|
|
||||||
# disconnected or broke.
|
|
||||||
except trio.ClosedResourceError as cre: # by self._rx_chan.receive()
|
except trio.ClosedResourceError as cre: # by self._rx_chan.receive()
|
||||||
src_err = cre
|
src_err = cre
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -218,14 +198,15 @@ class MsgStream(trio.abc.Channel):
|
||||||
# terminated and signal this local iterator to stop
|
# terminated and signal this local iterator to stop
|
||||||
drained: list[Exception|dict] = await self.aclose()
|
drained: list[Exception|dict] = await self.aclose()
|
||||||
if drained:
|
if drained:
|
||||||
|
# ?TODO? pass these to the `._ctx._drained_msgs: deque`
|
||||||
|
# and then iterate them as part of any `.wait_for_result()` call?
|
||||||
|
#
|
||||||
# from .devx import pause
|
# from .devx import pause
|
||||||
# await pause()
|
# await pause()
|
||||||
log.warning(
|
log.warning(
|
||||||
'Drained context msgs during closure:\n'
|
'Drained context msgs during closure\n\n'
|
||||||
f'{drained}'
|
f'{drained}'
|
||||||
)
|
)
|
||||||
# TODO: pass these to the `._ctx._drained_msgs: deque`
|
|
||||||
# and then iterate them as part of any `.result()` call?
|
|
||||||
|
|
||||||
# NOTE XXX: if the context was cancelled or remote-errored
|
# NOTE XXX: if the context was cancelled or remote-errored
|
||||||
# but we received the stream close msg first, we
|
# but we received the stream close msg first, we
|
||||||
|
@ -238,28 +219,36 @@ class MsgStream(trio.abc.Channel):
|
||||||
from_src_exc=src_err,
|
from_src_exc=src_err,
|
||||||
)
|
)
|
||||||
|
|
||||||
# propagate any error but hide low-level frame details
|
# propagate any error but hide low-level frame details from
|
||||||
# from the caller by default for debug noise reduction.
|
# the caller by default for console/debug-REPL noise
|
||||||
|
# reduction.
|
||||||
if (
|
if (
|
||||||
hide_tb
|
hide_tb
|
||||||
|
and (
|
||||||
|
|
||||||
# XXX NOTE XXX don't reraise on certain
|
# XXX NOTE special conditions: don't reraise on
|
||||||
# stream-specific internal error types like,
|
# certain stream-specific internal error types like,
|
||||||
#
|
#
|
||||||
# - `trio.EoC` since we want to use the exact instance
|
# - `trio.EoC` since we want to use the exact instance
|
||||||
# to ensure that it is the error that bubbles upward
|
# to ensure that it is the error that bubbles upward
|
||||||
# for silent absorption by `Context.open_stream()`.
|
# for silent absorption by `Context.open_stream()`.
|
||||||
and not self._eoc
|
not self._eoc
|
||||||
|
|
||||||
# - `RemoteActorError` (or `ContextCancelled`) if it gets
|
# - `RemoteActorError` (or subtypes like ctxc)
|
||||||
# raised from `_raise_from_no_key_in_msg()` since we
|
# since we want to present the error as though it is
|
||||||
# want the same (as the above bullet) for any
|
# "sourced" directly from this `.receive()` call and
|
||||||
# `.open_context()` block bubbled error raised by
|
# generally NOT include the stack frames raised from
|
||||||
# any nearby ctx API remote-failures.
|
# inside the `PldRx` and/or the transport stack
|
||||||
# and not isinstance(src_err, RemoteActorError)
|
# layers.
|
||||||
|
or isinstance(src_err, RemoteActorError)
|
||||||
|
)
|
||||||
):
|
):
|
||||||
raise type(src_err)(*src_err.args) from src_err
|
raise type(src_err)(*src_err.args) from src_err
|
||||||
else:
|
else:
|
||||||
|
# for any non-graceful-EOC we want to NOT hide this frame
|
||||||
|
if not self._eoc:
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
raise src_err
|
raise src_err
|
||||||
|
|
||||||
async def aclose(self) -> list[Exception|dict]:
|
async def aclose(self) -> list[Exception|dict]:
|
||||||
|
@ -385,6 +374,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
if not self._eoc:
|
if not self._eoc:
|
||||||
message: str = (
|
message: str = (
|
||||||
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
|
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
|
||||||
|
# } bc a stream is a "scope"/msging-phase inside an IPC
|
||||||
|
f'x}}>\n'
|
||||||
f'|_{self}\n'
|
f'|_{self}\n'
|
||||||
)
|
)
|
||||||
log.cancel(message)
|
log.cancel(message)
|
||||||
|
|
|
@ -299,7 +299,6 @@ class Lock:
|
||||||
@pdbp.hideframe
|
@pdbp.hideframe
|
||||||
def release(
|
def release(
|
||||||
cls,
|
cls,
|
||||||
force: bool = False,
|
|
||||||
raise_on_thread: bool = True,
|
raise_on_thread: bool = True,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
@ -347,12 +346,9 @@ class Lock:
|
||||||
lock: trio.StrictFIFOLock = cls._debug_lock
|
lock: trio.StrictFIFOLock = cls._debug_lock
|
||||||
owner: Task = lock.statistics().owner
|
owner: Task = lock.statistics().owner
|
||||||
if (
|
if (
|
||||||
(lock.locked() or force)
|
lock.locked()
|
||||||
# ^-TODO-NOTE-^ should we just remove this, since the
|
and
|
||||||
# RTE case above will always happen when you force
|
(owner is task)
|
||||||
# from the wrong task?
|
|
||||||
|
|
||||||
and (owner is task)
|
|
||||||
# ^-NOTE-^ if we do NOT ensure this, `trio` will
|
# ^-NOTE-^ if we do NOT ensure this, `trio` will
|
||||||
# raise a RTE when a non-owner tries to releasee the
|
# raise a RTE when a non-owner tries to releasee the
|
||||||
# lock.
|
# lock.
|
||||||
|
@ -553,6 +549,7 @@ async def lock_stdio_for_peer(
|
||||||
# can try to avoid clobbering any connection from a child
|
# can try to avoid clobbering any connection from a child
|
||||||
# that's currently relying on it.
|
# that's currently relying on it.
|
||||||
we_finished = Lock.req_handler_finished = trio.Event()
|
we_finished = Lock.req_handler_finished = trio.Event()
|
||||||
|
lock_blocked: bool = False
|
||||||
try:
|
try:
|
||||||
if ctx.cid in Lock._blocked:
|
if ctx.cid in Lock._blocked:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
@ -565,7 +562,8 @@ async def lock_stdio_for_peer(
|
||||||
'Consider that an internal bug exists given the TTY '
|
'Consider that an internal bug exists given the TTY '
|
||||||
'`Lock`ing IPC dialog..\n'
|
'`Lock`ing IPC dialog..\n'
|
||||||
)
|
)
|
||||||
|
Lock._blocked.add(ctx.cid)
|
||||||
|
lock_blocked = True
|
||||||
root_task_name: str = current_task().name
|
root_task_name: str = current_task().name
|
||||||
if tuple(subactor_uid) in Lock._blocked:
|
if tuple(subactor_uid) in Lock._blocked:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -575,7 +573,11 @@ async def lock_stdio_for_peer(
|
||||||
)
|
)
|
||||||
ctx._enter_debugger_on_cancel: bool = False
|
ctx._enter_debugger_on_cancel: bool = False
|
||||||
message: str = (
|
message: str = (
|
||||||
f'Debug lock blocked for {subactor_uid}\n'
|
f'Debug lock blocked for subactor\n\n'
|
||||||
|
f'x)<= {subactor_uid}\n\n'
|
||||||
|
|
||||||
|
f'Likely because the root actor already started shutdown and is '
|
||||||
|
'closing IPC connections for this child!\n\n'
|
||||||
'Cancelling debug request!\n'
|
'Cancelling debug request!\n'
|
||||||
)
|
)
|
||||||
log.cancel(message)
|
log.cancel(message)
|
||||||
|
@ -589,7 +591,6 @@ async def lock_stdio_for_peer(
|
||||||
f'remote task: {subactor_task_uid}\n'
|
f'remote task: {subactor_task_uid}\n'
|
||||||
)
|
)
|
||||||
DebugStatus.shield_sigint()
|
DebugStatus.shield_sigint()
|
||||||
Lock._blocked.add(ctx.cid)
|
|
||||||
|
|
||||||
# NOTE: we use the IPC ctx's cancel scope directly in order to
|
# NOTE: we use the IPC ctx's cancel scope directly in order to
|
||||||
# ensure that on any transport failure, or cancellation request
|
# ensure that on any transport failure, or cancellation request
|
||||||
|
@ -648,31 +649,34 @@ async def lock_stdio_for_peer(
|
||||||
)
|
)
|
||||||
|
|
||||||
except BaseException as req_err:
|
except BaseException as req_err:
|
||||||
message: str = (
|
fail_reason: str = (
|
||||||
f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
|
f'on behalf of peer\n\n'
|
||||||
'Forcing `Lock.release()` for req-ctx since likely an '
|
f'x)<=\n'
|
||||||
'internal error!\n\n'
|
f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
|
||||||
f'{ctx}'
|
|
||||||
|
'Forcing `Lock.release()` due to acquire failure!\n\n'
|
||||||
|
f'x)=> {ctx}\n'
|
||||||
)
|
)
|
||||||
if isinstance(req_err, trio.Cancelled):
|
if isinstance(req_err, trio.Cancelled):
|
||||||
message = (
|
fail_reason = (
|
||||||
'Cancelled during root TTY-lock dialog\n'
|
'Cancelled during stdio-mutex request '
|
||||||
+
|
+
|
||||||
message
|
fail_reason
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
message = (
|
fail_reason = (
|
||||||
'Errored during root TTY-lock dialog\n'
|
'Failed to deliver stdio-mutex request '
|
||||||
+
|
+
|
||||||
message
|
fail_reason
|
||||||
)
|
)
|
||||||
|
|
||||||
log.exception(message)
|
log.exception(fail_reason)
|
||||||
Lock.release() #force=True)
|
Lock.release()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
Lock._blocked.remove(ctx.cid)
|
if lock_blocked:
|
||||||
|
Lock._blocked.remove(ctx.cid)
|
||||||
|
|
||||||
# wakeup any waiters since the lock was (presumably)
|
# wakeup any waiters since the lock was (presumably)
|
||||||
# released, possibly only temporarily.
|
# released, possibly only temporarily.
|
||||||
|
@ -1167,7 +1171,7 @@ async def request_root_stdio_lock(
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Debug lock request was CANCELLED?\n\n'
|
'Debug lock request was CANCELLED?\n\n'
|
||||||
f'{req_ctx}\n'
|
f'<=c) {req_ctx}\n'
|
||||||
# f'{pformat_cs(req_cs, var_name="req_cs")}\n\n'
|
# f'{pformat_cs(req_cs, var_name="req_cs")}\n\n'
|
||||||
# f'{pformat_cs(req_ctx._scope, var_name="req_ctx._scope")}\n\n'
|
# f'{pformat_cs(req_ctx._scope, var_name="req_ctx._scope")}\n\n'
|
||||||
)
|
)
|
||||||
|
@ -1179,22 +1183,26 @@ async def request_root_stdio_lock(
|
||||||
message: str = (
|
message: str = (
|
||||||
'Failed during debug request dialog with root actor?\n\n'
|
'Failed during debug request dialog with root actor?\n\n'
|
||||||
)
|
)
|
||||||
|
if (req_ctx := DebugStatus.req_ctx):
|
||||||
if req_ctx:
|
|
||||||
message += (
|
message += (
|
||||||
f'{req_ctx}\n'
|
f'<=x) {req_ctx}\n\n'
|
||||||
f'Cancelling IPC ctx!\n'
|
f'Cancelling IPC ctx!\n'
|
||||||
)
|
)
|
||||||
await req_ctx.cancel()
|
try:
|
||||||
|
await req_ctx.cancel()
|
||||||
|
except trio.ClosedResourceError as terr:
|
||||||
|
ctx_err.add_note(
|
||||||
|
# f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` '
|
||||||
|
f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} '
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
message += 'Failed during `Portal.open_context()` ?\n'
|
message += 'Failed in `Portal.open_context()` call ??\n'
|
||||||
|
|
||||||
log.exception(message)
|
log.exception(message)
|
||||||
ctx_err.add_note(message)
|
ctx_err.add_note(message)
|
||||||
raise ctx_err
|
raise ctx_err
|
||||||
|
|
||||||
|
|
||||||
except (
|
except (
|
||||||
tractor.ContextCancelled,
|
tractor.ContextCancelled,
|
||||||
trio.Cancelled,
|
trio.Cancelled,
|
||||||
|
@ -1218,9 +1226,10 @@ async def request_root_stdio_lock(
|
||||||
# -[ ]FURTHER, after we 'continue', we should be able to
|
# -[ ]FURTHER, after we 'continue', we should be able to
|
||||||
# ctl-c out of the currently hanging task!
|
# ctl-c out of the currently hanging task!
|
||||||
raise DebugRequestError(
|
raise DebugRequestError(
|
||||||
'Failed to lock stdio from subactor IPC ctx!\n\n'
|
'Failed during stdio-locking dialog from root actor\n\n'
|
||||||
|
|
||||||
f'req_ctx: {DebugStatus.req_ctx}\n'
|
f'<=x)\n'
|
||||||
|
f'|_{DebugStatus.req_ctx}\n'
|
||||||
) from req_err
|
) from req_err
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -1998,10 +2007,10 @@ async def _pause(
|
||||||
# sanity, for when hackin on all this?
|
# sanity, for when hackin on all this?
|
||||||
if not isinstance(pause_err, trio.Cancelled):
|
if not isinstance(pause_err, trio.Cancelled):
|
||||||
req_ctx: Context = DebugStatus.req_ctx
|
req_ctx: Context = DebugStatus.req_ctx
|
||||||
if req_ctx:
|
# if req_ctx:
|
||||||
# XXX, bc the child-task in root might cancel it?
|
# # XXX, bc the child-task in root might cancel it?
|
||||||
# assert req_ctx._scope.cancel_called
|
# # assert req_ctx._scope.cancel_called
|
||||||
assert req_ctx.maybe_error
|
# assert req_ctx.maybe_error
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -2041,11 +2050,12 @@ def _set_trace(
|
||||||
# root here? Bo
|
# root here? Bo
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'{_pause_msg}\n'
|
f'{_pause_msg}\n'
|
||||||
'|\n'
|
# '|\n'
|
||||||
# TODO: more compact pformating?
|
f'>(\n'
|
||||||
|
f' |_ {task} @ {actor.uid}\n'
|
||||||
|
# ^-TODO-^ more compact pformating?
|
||||||
# -[ ] make an `Actor.__repr()__`
|
# -[ ] make an `Actor.__repr()__`
|
||||||
# -[ ] should we use `log.pformat_task_uid()`?
|
# -[ ] should we use `log.pformat_task_uid()`?
|
||||||
f'|_ {task} @ {actor.uid}\n'
|
|
||||||
)
|
)
|
||||||
# presuming the caller passed in the "api frame"
|
# presuming the caller passed in the "api frame"
|
||||||
# (the last frame before user code - like `.pause()`)
|
# (the last frame before user code - like `.pause()`)
|
||||||
|
@ -2541,9 +2551,9 @@ def _post_mortem(
|
||||||
# here! Bo
|
# here! Bo
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'{_crash_msg}\n'
|
f'{_crash_msg}\n'
|
||||||
'|\n'
|
# '|\n'
|
||||||
# f'|_ {current_task()}\n'
|
f'x>(\n'
|
||||||
f'|_ {current_task()} @ {actor.uid}\n'
|
f' |_ {current_task()} @ {actor.uid}\n'
|
||||||
|
|
||||||
# f'|_ @{actor.uid}\n'
|
# f'|_ @{actor.uid}\n'
|
||||||
# TODO: make an `Actor.__repr()__`
|
# TODO: make an `Actor.__repr()__`
|
||||||
|
|
Loading…
Reference in New Issue