Compare commits
8 Commits
7c22f76274
...
c025761f15
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | c025761f15 | |
Tyler Goodlet | 2e797ef7ee | |
Tyler Goodlet | c36deb1f4d | |
Tyler Goodlet | fa7e37d6ed | |
Tyler Goodlet | 364ea91983 | |
Tyler Goodlet | 7ae9b5319b | |
Tyler Goodlet | 6156ff95f8 | |
Tyler Goodlet | 9e3f41a5b1 |
|
@ -1,6 +1,7 @@
|
|||
"""
|
||||
``tractor`` testing!!
|
||||
"""
|
||||
from contextlib import asynccontextmanager as acm
|
||||
import sys
|
||||
import subprocess
|
||||
import os
|
||||
|
@ -292,3 +293,26 @@ def daemon(
|
|||
time.sleep(_PROC_SPAWN_WAIT)
|
||||
yield proc
|
||||
sig_prog(proc, _INT_SIGNAL)
|
||||
|
||||
|
||||
@acm
|
||||
async def expect_ctxc(
|
||||
yay: bool,
|
||||
reraise: bool = False,
|
||||
) -> None:
|
||||
'''
|
||||
Small acm to catch `ContextCancelled` errors when expected
|
||||
below it in a `async with ()` block.
|
||||
|
||||
'''
|
||||
if yay:
|
||||
try:
|
||||
yield
|
||||
raise RuntimeError('Never raised ctxc?')
|
||||
except tractor.ContextCancelled:
|
||||
if reraise:
|
||||
raise
|
||||
else:
|
||||
return
|
||||
else:
|
||||
yield
|
||||
|
|
|
@ -5,7 +5,6 @@ Verify the we raise errors when streams are opened prior to
|
|||
sync-opening a ``tractor.Context`` beforehand.
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from itertools import count
|
||||
import platform
|
||||
from pprint import pformat
|
||||
|
@ -26,7 +25,10 @@ from tractor._exceptions import (
|
|||
ContextCancelled,
|
||||
)
|
||||
|
||||
from conftest import tractor_test
|
||||
from conftest import (
|
||||
tractor_test,
|
||||
expect_ctxc,
|
||||
)
|
||||
|
||||
# ``Context`` semantics are as follows,
|
||||
# ------------------------------------
|
||||
|
@ -194,12 +196,13 @@ def test_simple_context(
|
|||
)
|
||||
|
||||
try:
|
||||
async with portal.open_context(
|
||||
async with (
|
||||
portal.open_context(
|
||||
simple_setup_teardown,
|
||||
data=10,
|
||||
block_forever=callee_blocks_forever,
|
||||
) as (ctx, sent):
|
||||
|
||||
) as (ctx, sent),
|
||||
):
|
||||
assert sent == 11
|
||||
|
||||
if callee_blocks_forever:
|
||||
|
@ -250,17 +253,6 @@ def test_simple_context(
|
|||
trio.run(main)
|
||||
|
||||
|
||||
@acm
|
||||
async def expect_ctxc(yay: bool) -> None:
|
||||
if yay:
|
||||
try:
|
||||
yield
|
||||
except ContextCancelled:
|
||||
return
|
||||
else:
|
||||
yield
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'callee_returns_early',
|
||||
[True, False],
|
||||
|
@ -293,6 +285,7 @@ def test_caller_cancels(
|
|||
) -> None:
|
||||
actor: Actor = current_actor()
|
||||
uid: tuple = actor.uid
|
||||
_ctxc: ContextCancelled|None = None
|
||||
|
||||
if (
|
||||
cancel_method == 'portal'
|
||||
|
@ -303,6 +296,9 @@ def test_caller_cancels(
|
|||
assert 0, 'Portal cancel should raise!'
|
||||
|
||||
except ContextCancelled as ctxc:
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await tractor.pause()
|
||||
_ctxc = ctxc
|
||||
assert ctx.chan._cancel_called
|
||||
assert ctxc.canceller == uid
|
||||
assert ctxc is ctx.maybe_error
|
||||
|
@ -311,7 +307,10 @@ def test_caller_cancels(
|
|||
# case since self-cancellation should swallow the ctxc
|
||||
# silently!
|
||||
else:
|
||||
try:
|
||||
res = await ctx.result()
|
||||
except ContextCancelled as ctxc:
|
||||
pytest.fail(f'should not have raised ctxc\n{ctxc}')
|
||||
|
||||
# we actually get a result
|
||||
if callee_returns_early:
|
||||
|
@ -342,6 +341,10 @@ def test_caller_cancels(
|
|||
# await tractor.pause()
|
||||
# assert ctx._local_error is None
|
||||
|
||||
# TODO: don't need this right?
|
||||
# if _ctxc:
|
||||
# raise _ctxc
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
|
@ -352,11 +355,19 @@ def test_caller_cancels(
|
|||
'simple_context',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
timeout = 0.5 if not callee_returns_early else 2
|
||||
timeout: float = (
|
||||
0.5
|
||||
if not callee_returns_early
|
||||
else 2
|
||||
)
|
||||
with trio.fail_after(timeout):
|
||||
async with (
|
||||
|
||||
expect_ctxc(yay=cancel_method == 'portal'),
|
||||
expect_ctxc(
|
||||
yay=(
|
||||
not callee_returns_early
|
||||
and cancel_method == 'portal'
|
||||
)
|
||||
),
|
||||
|
||||
portal.open_context(
|
||||
simple_setup_teardown,
|
||||
|
@ -372,10 +383,18 @@ def test_caller_cancels(
|
|||
await trio.sleep(0.5)
|
||||
|
||||
if cancel_method == 'ctx':
|
||||
print('cancelling with `Context.cancel()`')
|
||||
await ctx.cancel()
|
||||
else:
|
||||
|
||||
elif cancel_method == 'portal':
|
||||
print('cancelling with `Portal.cancel_actor()`')
|
||||
await portal.cancel_actor()
|
||||
|
||||
else:
|
||||
pytest.fail(
|
||||
f'Unknown `cancel_method={cancel_method} ?'
|
||||
)
|
||||
|
||||
if chk_ctx_result_before_exit:
|
||||
await check_canceller(ctx)
|
||||
|
||||
|
@ -385,15 +404,22 @@ def test_caller_cancels(
|
|||
if cancel_method != 'portal':
|
||||
await portal.cancel_actor()
|
||||
|
||||
# since the `.cancel_actor()` call just above
|
||||
# will cause the `.open_context().__aexit__()` raise
|
||||
# a ctxc which should in turn cause `ctx._scope` to
|
||||
# XXX NOTE XXX: non-normal yet purposeful
|
||||
# test-specific ctxc suppression is implemented!
|
||||
#
|
||||
# WHY: the `.cancel_actor()` case (cancel_method='portal')
|
||||
# will cause both:
|
||||
# * the `ctx.result()` inside `.open_context().__aexit__()`
|
||||
# * AND the `ctx.result()` inside `check_canceller()`
|
||||
# to raise ctxc.
|
||||
#
|
||||
# which should in turn cause `ctx._scope` to
|
||||
# catch any cancellation?
|
||||
if (
|
||||
not callee_returns_early
|
||||
and cancel_method == 'portal'
|
||||
and cancel_method != 'portal'
|
||||
):
|
||||
assert ctx._scope.cancelled_caught
|
||||
assert not ctx._scope.cancelled_caught
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
@ -511,6 +537,23 @@ async def expect_cancelled(
|
|||
await stream.send(msg) # echo server
|
||||
|
||||
except trio.Cancelled:
|
||||
|
||||
# on ctx.cancel() the internal RPC scope is cancelled but
|
||||
# never caught until the func exits.
|
||||
assert ctx._scope.cancel_called
|
||||
assert not ctx._scope.cancelled_caught
|
||||
|
||||
# should be the RPC cmd request for `._cancel_task()`
|
||||
assert ctx._cancel_msg
|
||||
# which, has not yet resolved to an error outcome
|
||||
# since this rpc func has not yet exited.
|
||||
assert not ctx.maybe_error
|
||||
assert not ctx._final_result_is_set()
|
||||
|
||||
# debug REPL if needed
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await tractor.pause()
|
||||
|
||||
# expected case
|
||||
_state = False
|
||||
raise
|
||||
|
@ -594,16 +637,16 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
with trio.fail_after(0.2):
|
||||
await ctx.result()
|
||||
assert 0, "Callee should have blocked!?"
|
||||
|
||||
except trio.TooSlowError:
|
||||
# NO-OP -> since already called above
|
||||
await ctx.cancel()
|
||||
|
||||
# NOTE: local scope should have absorbed the cancellation since
|
||||
# in this case we call `ctx.cancel()` and the local
|
||||
# `._scope` gets `.cancel_called` on the ctxc ack.
|
||||
# `._scope` does not get `.cancel_called` and thus
|
||||
# `.cancelled_caught` neither will ever bet set.
|
||||
if use_ctx_cancel_method:
|
||||
assert ctx._scope.cancelled_caught
|
||||
assert not ctx._scope.cancelled_caught
|
||||
|
||||
# rxed ctxc response from far end
|
||||
assert ctx.cancel_acked
|
||||
|
|
|
@ -19,6 +19,8 @@ from tractor import (
|
|||
)
|
||||
from tractor.trionics import BroadcastReceiver
|
||||
|
||||
from conftest import expect_ctxc
|
||||
|
||||
|
||||
async def sleep_and_err(
|
||||
sleep_for: float = 0.1,
|
||||
|
@ -190,7 +192,8 @@ async def trio_ctx(
|
|||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'parent_cancels', [False, True],
|
||||
'parent_cancels',
|
||||
['context', 'actor', False],
|
||||
ids='parent_actor_cancels_child={}'.format
|
||||
)
|
||||
def test_context_spawns_aio_task_that_errors(
|
||||
|
@ -214,18 +217,36 @@ def test_context_spawns_aio_task_that_errors(
|
|||
# debug_mode=True,
|
||||
loglevel='cancel',
|
||||
)
|
||||
async with p.open_context(
|
||||
async with (
|
||||
expect_ctxc(
|
||||
yay=parent_cancels == 'actor',
|
||||
),
|
||||
p.open_context(
|
||||
trio_ctx,
|
||||
) as (ctx, first):
|
||||
) as (ctx, first),
|
||||
):
|
||||
|
||||
assert first == 'start'
|
||||
|
||||
if parent_cancels:
|
||||
if parent_cancels == 'actor':
|
||||
await p.cancel_actor()
|
||||
|
||||
elif parent_cancels == 'context':
|
||||
await ctx.cancel()
|
||||
|
||||
else:
|
||||
await trio.sleep_forever()
|
||||
|
||||
return await ctx.result()
|
||||
async with expect_ctxc(
|
||||
yay=parent_cancels == 'actor',
|
||||
):
|
||||
await ctx.result()
|
||||
|
||||
if parent_cancels == 'context':
|
||||
# to tear down sub-acor
|
||||
await p.cancel_actor()
|
||||
|
||||
return ctx.outcome
|
||||
|
||||
if parent_cancels:
|
||||
# bc the parent made the cancel request,
|
||||
|
|
|
@ -220,11 +220,12 @@ async def stream_from_peer(
|
|||
# - what about IPC-transport specific errors, should
|
||||
# they bubble from the async for and trigger
|
||||
# other special cases?
|
||||
#
|
||||
# NOTE: current ctl flow:
|
||||
# - stream raises `trio.EndOfChannel` and
|
||||
# exits the loop
|
||||
# - `.open_context()` will raise the ctxcanc
|
||||
# received from the sleeper.
|
||||
# - `.open_context()` will raise the ctxc received
|
||||
# from the sleeper.
|
||||
async for msg in stream:
|
||||
assert msg is not None
|
||||
print(msg)
|
||||
|
@ -237,7 +238,12 @@ async def stream_from_peer(
|
|||
|
||||
assert peer_ctx._remote_error is ctxerr
|
||||
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
||||
assert peer_ctx.canceller == ctxerr.canceller
|
||||
|
||||
# the peer ctx is the canceller even though it's canceller
|
||||
# is the "canceller" XD
|
||||
assert peer_name in peer_ctx.canceller
|
||||
|
||||
assert "canceller" in ctxerr.canceller
|
||||
|
||||
# caller peer should not be the cancel requester
|
||||
assert not ctx.cancel_called
|
||||
|
@ -271,7 +277,6 @@ async def stream_from_peer(
|
|||
|
||||
# root/parent actor task should NEVER HAVE cancelled us!
|
||||
assert not ctx.canceller
|
||||
assert 'canceller' in peer_ctx.canceller
|
||||
|
||||
raise
|
||||
# TODO: IN THEORY we could have other cases depending on
|
||||
|
@ -291,6 +296,7 @@ async def stream_from_peer(
|
|||
@pytest.mark.parametrize(
|
||||
'error_during_ctxerr_handling',
|
||||
[False, True],
|
||||
ids=lambda item: f'rte_during_ctxerr={item}',
|
||||
)
|
||||
def test_peer_canceller(
|
||||
error_during_ctxerr_handling: bool,
|
||||
|
@ -383,11 +389,11 @@ def test_peer_canceller(
|
|||
) as (canceller_ctx, sent),
|
||||
|
||||
):
|
||||
ctxs: list[Context] = [
|
||||
sleeper_ctx,
|
||||
caller_ctx,
|
||||
canceller_ctx,
|
||||
]
|
||||
ctxs: dict[str, Context] = {
|
||||
'sleeper': sleeper_ctx,
|
||||
'caller': caller_ctx,
|
||||
'canceller': canceller_ctx,
|
||||
}
|
||||
|
||||
try:
|
||||
print('PRE CONTEXT RESULT')
|
||||
|
@ -491,6 +497,15 @@ def test_peer_canceller(
|
|||
# should be cancelled by US.
|
||||
#
|
||||
if error_during_ctxerr_handling:
|
||||
# since we do a rte reraise above, the
|
||||
# `.open_context()` error handling should have
|
||||
# raised a local rte, thus the internal
|
||||
# `.open_context()` enterer task's
|
||||
# cancel-scope should have raised the RTE, NOT
|
||||
# a `trio.Cancelled` due to a local
|
||||
# `._scope.cancel()` call.
|
||||
assert not sleeper_ctx._scope.cancelled_caught
|
||||
|
||||
assert isinstance(loc_err, RuntimeError)
|
||||
print(f'_loc_err: {_loc_err}\n')
|
||||
# assert sleeper_ctx._local_error is _loc_err
|
||||
|
@ -505,35 +520,35 @@ def test_peer_canceller(
|
|||
# NOTE: this root actor task should have
|
||||
# called `Context.cancel()` on the
|
||||
# `.__aexit__()` to every opened ctx.
|
||||
for ctx in ctxs:
|
||||
assert ctx.cancel_called
|
||||
for name, ctx in ctxs.items():
|
||||
|
||||
# this root actor task should have
|
||||
# cancelled all opened contexts except the
|
||||
# sleeper which is obvi by the "canceller"
|
||||
# peer.
|
||||
re = ctx._remote_error
|
||||
if (
|
||||
ctx is sleeper_ctx
|
||||
or ctx is caller_ctx
|
||||
):
|
||||
assert (
|
||||
re.canceller
|
||||
==
|
||||
ctx.canceller
|
||||
==
|
||||
canceller.channel.uid
|
||||
)
|
||||
le = ctx._local_error
|
||||
|
||||
else:
|
||||
assert ctx.cancel_called
|
||||
|
||||
if ctx is sleeper_ctx:
|
||||
assert 'canceller' in re.canceller
|
||||
assert 'sleeper' in ctx.canceller
|
||||
|
||||
if ctx is canceller_ctx:
|
||||
assert (
|
||||
re.canceller
|
||||
==
|
||||
ctx.canceller
|
||||
==
|
||||
root.uid
|
||||
)
|
||||
|
||||
else: # the other 2 ctxs
|
||||
assert (
|
||||
re.canceller
|
||||
==
|
||||
canceller.channel.uid
|
||||
)
|
||||
|
||||
# since the sleeper errors while handling a
|
||||
# peer-cancelled (by ctxc) scenario, we expect
|
||||
# that the `.open_context()` block DOES call
|
||||
|
@ -554,44 +569,64 @@ def test_peer_canceller(
|
|||
# propagated
|
||||
#
|
||||
else:
|
||||
# since sleeper_ctx.result() IS called above
|
||||
# we should have (silently) absorbed the
|
||||
# corresponding `ContextCancelled` for it and
|
||||
# `._scope.cancel()` should never have been
|
||||
# called.
|
||||
assert not sleeper_ctx._scope.cancelled_caught
|
||||
|
||||
assert isinstance(loc_err, ContextCancelled)
|
||||
assert loc_err.canceller == sleeper_ctx.canceller
|
||||
assert (
|
||||
loc_err.canceller[0]
|
||||
==
|
||||
sleeper_ctx.canceller[0]
|
||||
==
|
||||
'canceller'
|
||||
)
|
||||
|
||||
# the received remote error's `.canceller`
|
||||
# will of course be the "canceller" actor BUT
|
||||
# the canceller set on the local handle to
|
||||
# `sleeper_ctx` will be the "sleeper" uid
|
||||
# since it's the actor that relayed us the
|
||||
# error which was **caused** by the
|
||||
# "canceller".
|
||||
assert 'sleeper' in sleeper_ctx.canceller
|
||||
assert 'canceller' == loc_err.canceller[0]
|
||||
|
||||
# the sleeper's remote error is the error bubbled
|
||||
# out of the context-stack above!
|
||||
re = sleeper_ctx.outcome
|
||||
final_err = sleeper_ctx.outcome
|
||||
assert (
|
||||
re is loc_err
|
||||
final_err is loc_err
|
||||
is sleeper_ctx.maybe_error
|
||||
is sleeper_ctx._remote_error
|
||||
)
|
||||
|
||||
for ctx in ctxs:
|
||||
for name, ctx in ctxs.items():
|
||||
|
||||
re: BaseException|None = ctx._remote_error
|
||||
re: BaseException|None = ctx.outcome
|
||||
assert (
|
||||
re and
|
||||
(
|
||||
re is ctx.maybe_error
|
||||
is ctx._remote_error
|
||||
)
|
||||
)
|
||||
le: trio.MultiError = ctx._local_error
|
||||
le: BaseException|None = ctx._local_error
|
||||
err = ctx.maybe_error
|
||||
out = ctx.outcome
|
||||
|
||||
# every ctx should error!
|
||||
assert out is err
|
||||
|
||||
# the recorded local erro should always be
|
||||
# the same as the one raised by the
|
||||
# `sleeper_ctx.result()` call
|
||||
assert (
|
||||
le
|
||||
and ctx._local_error
|
||||
and
|
||||
le is loc_err
|
||||
)
|
||||
|
||||
# root doesn't cancel sleeper since it's
|
||||
# cancelled by its peer.
|
||||
if ctx is sleeper_ctx:
|
||||
assert re
|
||||
assert (
|
||||
ctx._remote_error
|
||||
is ctx.maybe_error
|
||||
is ctx.outcome
|
||||
is ctx._local_error
|
||||
)
|
||||
|
||||
assert not ctx.cancel_called
|
||||
assert not ctx.cancel_acked
|
||||
|
||||
|
@ -601,21 +636,49 @@ def test_peer_canceller(
|
|||
# `ContextCancelled` for it and thus
|
||||
# the logic inside `.cancelled_caught`
|
||||
# should trigger!
|
||||
assert ctx._scope.cancelled_caught
|
||||
assert not ctx._scope.cancelled_caught
|
||||
|
||||
elif ctx is caller_ctx:
|
||||
# since its context was remotely
|
||||
# cancelled, we never needed to
|
||||
# call `Context.cancel()` bc it was
|
||||
# done by the peer and also we never
|
||||
assert ctx.cancel_called
|
||||
elif ctx in (
|
||||
caller_ctx,
|
||||
canceller_ctx,
|
||||
):
|
||||
|
||||
assert not ctx._remote_error
|
||||
|
||||
# neither of the `caller/canceller_ctx` should
|
||||
# have called `ctx.cancel()` bc the
|
||||
# canceller's task internally issues
|
||||
# a `Portal.cancel_actor()` to the
|
||||
# sleeper and thus never should call
|
||||
# `ctx.cancel()` per say UNLESS the
|
||||
# sleeper's `.result()` call above
|
||||
# ctxc exception results in the
|
||||
# canceller's
|
||||
# `.open_context().__aexit__()` error
|
||||
# handling to kick in BEFORE a remote
|
||||
# error is delivered - which since
|
||||
# we're asserting what we are above,
|
||||
# that should normally be the case
|
||||
# right?
|
||||
#
|
||||
assert not ctx.cancel_called
|
||||
#
|
||||
# assert ctx.cancel_called
|
||||
# orig ^
|
||||
|
||||
# TODO: figure out the details of this..?
|
||||
# if you look the `._local_error` here
|
||||
# is a multi of ctxc + 2 Cancelleds?
|
||||
# assert not ctx.cancelled_caught
|
||||
# assert not ctx._scope.cancelled_caught
|
||||
|
||||
elif ctx is canceller_ctx:
|
||||
assert (
|
||||
not ctx.cancel_called
|
||||
and not ctx.cancel_acked
|
||||
)
|
||||
assert not ctx._scope.cancelled_caught
|
||||
|
||||
# elif ctx is canceller_ctx:
|
||||
# assert not ctx._remote_error
|
||||
|
||||
# XXX NOTE XXX: ONLY the canceller
|
||||
# will get a self-cancelled outcome
|
||||
|
@ -626,11 +689,6 @@ def test_peer_canceller(
|
|||
# .cancel() whenever an interpeer
|
||||
# cancel takes place since each
|
||||
# reception of a ctxc
|
||||
assert (
|
||||
ctx.cancel_called
|
||||
and ctx.cancel_acked
|
||||
)
|
||||
assert not ctx._scope.cancelled_caught
|
||||
|
||||
else:
|
||||
pytest.fail(
|
||||
|
@ -663,7 +721,7 @@ def test_peer_canceller(
|
|||
# `.open_context()` block has exited and should be
|
||||
# set in both outcomes including the case where
|
||||
# ctx-cancel handling itself errors.
|
||||
assert sleeper_ctx._scope.cancelled_caught
|
||||
assert not sleeper_ctx._scope.cancelled_caught
|
||||
assert _loc_err is sleeper_ctx._local_error
|
||||
assert (
|
||||
sleeper_ctx.outcome
|
||||
|
|
|
@ -364,6 +364,9 @@ class Context:
|
|||
'''
|
||||
chan: Channel
|
||||
cid: str # "context id", more or less a unique linked-task-pair id
|
||||
|
||||
_actor: Actor
|
||||
|
||||
# the "feeder" channels for delivering message values to the
|
||||
# local task from the runtime's msg processing loop.
|
||||
_recv_chan: trio.MemoryReceiveChannel
|
||||
|
@ -486,6 +489,13 @@ class Context:
|
|||
f' {stream}\n'
|
||||
)
|
||||
|
||||
outcome_str: str = self.repr_outcome(
|
||||
show_error_fields=True
|
||||
)
|
||||
outcome_typ_str: str = self.repr_outcome(
|
||||
type_only=True
|
||||
)
|
||||
|
||||
return (
|
||||
f'<Context(\n'
|
||||
# f'\n'
|
||||
|
@ -505,8 +515,16 @@ class Context:
|
|||
# f' ---\n'
|
||||
f'\n'
|
||||
# f' -----\n'
|
||||
f' |_state: {self.repr_outcome()}\n'
|
||||
f' outcome{ds}{self.repr_outcome(show_error_fields=True)}\n'
|
||||
#
|
||||
# TODO: better state `str`ids?
|
||||
# -[ ] maybe map err-types to strs like 'cancelled',
|
||||
# 'errored', 'streaming', 'started', .. etc.
|
||||
# -[ ] as well as a final result wrapper like
|
||||
# `outcome.Value`?
|
||||
#
|
||||
f' |_state: {outcome_typ_str}\n'
|
||||
|
||||
f' outcome{ds}{outcome_str}\n'
|
||||
f' result{ds}{self._result}\n'
|
||||
f' cancel_called{ds}{self.cancel_called}\n'
|
||||
f' cancel_acked{ds}{self.cancel_acked}\n'
|
||||
|
@ -552,7 +570,39 @@ class Context:
|
|||
(side of the) context to also be cancelled.
|
||||
|
||||
'''
|
||||
return self._canceller
|
||||
if canc := self._canceller:
|
||||
return tuple(canc)
|
||||
|
||||
return None
|
||||
|
||||
def _is_self_cancelled(
|
||||
self,
|
||||
remote_error: Exception|None = None,
|
||||
|
||||
) -> bool:
|
||||
|
||||
if not self._cancel_called:
|
||||
return False
|
||||
|
||||
re: BaseException|None = (
|
||||
remote_error
|
||||
or self._remote_error
|
||||
)
|
||||
if not re:
|
||||
return False
|
||||
|
||||
if from_uid := re.src_actor_uid:
|
||||
from_uid: tuple = tuple(from_uid)
|
||||
|
||||
our_uid: tuple = self._actor.uid
|
||||
our_canceller = self.canceller
|
||||
|
||||
return bool(
|
||||
isinstance(re, ContextCancelled)
|
||||
and from_uid == self.chan.uid
|
||||
and re.canceller == our_uid
|
||||
and our_canceller == from_uid
|
||||
)
|
||||
|
||||
@property
|
||||
def cancel_acked(self) -> bool:
|
||||
|
@ -568,22 +618,7 @@ class Context:
|
|||
equal to the uid of the calling task's actor.
|
||||
|
||||
'''
|
||||
portal: Portal|None = self._portal
|
||||
if portal:
|
||||
our_uid: tuple = portal.actor.uid
|
||||
|
||||
return bool(
|
||||
self._cancel_called
|
||||
and (re := self._remote_error)
|
||||
and isinstance(re, ContextCancelled)
|
||||
and (
|
||||
re.canceller
|
||||
==
|
||||
self.canceller
|
||||
==
|
||||
our_uid
|
||||
)
|
||||
)
|
||||
return self._is_self_cancelled()
|
||||
|
||||
@property
|
||||
def cancelled_caught(self) -> bool:
|
||||
|
@ -762,30 +797,15 @@ class Context:
|
|||
# self-cancel (ack) or,
|
||||
# peer propagated remote cancellation.
|
||||
if isinstance(error, ContextCancelled):
|
||||
ctxc_src: tuple = error.canceller
|
||||
|
||||
whom: str = (
|
||||
'us' if ctxc_src == current_actor().uid
|
||||
'us' if error.canceller == self._actor.uid
|
||||
else 'peer'
|
||||
)
|
||||
log.cancel(
|
||||
f'IPC context cancelled by {whom}!\n\n'
|
||||
f'{error}'
|
||||
)
|
||||
# always record the cancelling actor's uid since its
|
||||
# cancellation state is linked and we want to know
|
||||
# which process was the cause / requester of the
|
||||
# cancellation.
|
||||
self._canceller = ctxc_src
|
||||
|
||||
|
||||
if self._cancel_called:
|
||||
# this is an expected cancel request response
|
||||
# message and we **don't need to raise it** in the
|
||||
# local cancel `._scope` since it will potentially
|
||||
# override a real error. After this returns
|
||||
# `.cancel_acked == True`.
|
||||
return
|
||||
|
||||
else:
|
||||
log.error(
|
||||
|
@ -794,7 +814,23 @@ class Context:
|
|||
f'{error}\n'
|
||||
f'{pformat(self)}\n'
|
||||
)
|
||||
self._canceller = self.chan.uid
|
||||
|
||||
# always record the cancelling actor's uid since its
|
||||
# cancellation state is linked and we want to know
|
||||
# which process was the cause / requester of the
|
||||
# cancellation.
|
||||
maybe_error_src: tuple = getattr(
|
||||
error,
|
||||
'src_actor_uid',
|
||||
None,
|
||||
)
|
||||
self._canceller = (
|
||||
maybe_error_src
|
||||
or
|
||||
# XXX: in the case we get a non-boxed error?
|
||||
# -> wait but this should never happen right?
|
||||
self.chan.uid
|
||||
)
|
||||
|
||||
# Cancel the local `._scope`, catch that
|
||||
# `._scope.cancelled_caught` and re-raise any remote error
|
||||
|
@ -803,6 +839,15 @@ class Context:
|
|||
cs: trio.CancelScope = self._scope
|
||||
if (
|
||||
cs
|
||||
|
||||
# XXX this is an expected cancel request response
|
||||
# message and we **don't need to raise it** in the
|
||||
# local cancel `._scope` since it will potentially
|
||||
# override a real error. After this method returns
|
||||
# if `._cancel_called` then `.cancel_acked and .cancel_called`
|
||||
# always should be set.
|
||||
and not self._is_self_cancelled()
|
||||
|
||||
and not cs.cancel_called
|
||||
and not cs.cancelled_caught
|
||||
):
|
||||
|
@ -840,9 +885,13 @@ class Context:
|
|||
) -> str:
|
||||
# TODO: how to show the transport interchange fmt?
|
||||
# codec: str = self.chan.transport.codec_key
|
||||
outcome_str: str = self.repr_outcome(
|
||||
show_error_fields=True,
|
||||
type_only=True,
|
||||
)
|
||||
return (
|
||||
# f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:'
|
||||
f'{self._nsf}() -> {self.repr_outcome()}:'
|
||||
f'{self._nsf}() -> {outcome_str}:'
|
||||
)
|
||||
|
||||
async def cancel(
|
||||
|
@ -851,10 +900,32 @@ class Context:
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Cancel this inter-actor-task context.
|
||||
Cancel this inter-actor IPC context by requestng the
|
||||
remote side's cancel-scope-linked `trio.Task` by calling
|
||||
`._scope.cancel()` and delivering an `ContextCancelled`
|
||||
ack msg in reponse.
|
||||
|
||||
Request that the far side cancel it's current linked context,
|
||||
Timeout quickly in an attempt to sidestep 2-generals...
|
||||
Behaviour:
|
||||
---------
|
||||
- after the far end cancels, the `.cancel()` calling side
|
||||
should receive a `ContextCancelled` with the
|
||||
`.canceller: tuple` uid set to the current `Actor.uid`.
|
||||
|
||||
- timeout (quickly) on failure to rx this ACK error-msg in
|
||||
an attempt to sidestep 2-generals when the transport
|
||||
layer fails.
|
||||
|
||||
Note, that calling this method DOES NOT also necessarily
|
||||
result in `Context._scope.cancel()` being called
|
||||
**locally**!
|
||||
|
||||
=> That is, an IPC `Context` (this) **does not**
|
||||
have the same semantics as a `trio.CancelScope`.
|
||||
|
||||
If the caller (who entered the `Portal.open_context()`)
|
||||
desires that the internal block's cancel-scope be
|
||||
cancelled it should open its own `trio.CancelScope` and
|
||||
manage it as needed.
|
||||
|
||||
'''
|
||||
side: str = self.side
|
||||
|
@ -976,7 +1047,7 @@ class Context:
|
|||
``trio``'s cancellation system.
|
||||
|
||||
'''
|
||||
actor: Actor = current_actor()
|
||||
actor: Actor = self._actor
|
||||
|
||||
# If the surrounding context has been cancelled by some
|
||||
# task with a handle to THIS, we error here immediately
|
||||
|
@ -1149,32 +1220,30 @@ class Context:
|
|||
a cancellation (if any).
|
||||
|
||||
'''
|
||||
if ((
|
||||
# NOTE: whenever the context's "opener" side (task) **is**
|
||||
# the side which requested the cancellation (likekly via
|
||||
# ``Context.cancel()``), we don't want to re-raise that
|
||||
# cancellation signal locally (would be akin to
|
||||
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
|
||||
# whenever ``CancelScope.cancel()`` was called) and
|
||||
# instead silently reap the expected cancellation
|
||||
# "error"-msg-as-ack. In this case the `err:
|
||||
# ContextCancelled` must have a `.canceller` set to the
|
||||
# uid of the requesting task's actor and we only do NOT
|
||||
# raise that error locally if WE ARE THAT ACTOR which
|
||||
# requested the cancellation.
|
||||
our_uid: tuple = self.chan.uid
|
||||
|
||||
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
|
||||
# for "graceful cancellation" case:
|
||||
#
|
||||
# Whenever a "side" of a context (a `trio.Task` running in
|
||||
# an actor) **is** the side which requested ctx
|
||||
# cancellation (likekly via ``Context.cancel()``), we
|
||||
# **don't** want to re-raise any eventually received
|
||||
# `ContextCancelled` response locally (would be akin to
|
||||
# a `trio.Nursery` nursery raising `trio.Cancelled`
|
||||
# whenever `CancelScope.cancel()` was called).
|
||||
#
|
||||
# Instead, silently reap the remote delivered ctxc
|
||||
# (`ContextCancelled`) as an expected
|
||||
# error-msg-is-cancellation-ack IFF said
|
||||
# `remote_error: ContextCancelled` has `.canceller`
|
||||
# set to the `Actor.uid` of THIS task (i.e. the
|
||||
# cancellation requesting task's actor is the actor
|
||||
# checking whether it should absorb the ctxc).
|
||||
if (
|
||||
not raise_ctxc_from_self_call
|
||||
and isinstance(remote_error, ContextCancelled)
|
||||
and (
|
||||
self._cancel_called
|
||||
and self._is_self_cancelled(remote_error)
|
||||
|
||||
# or self.chan._cancel_called
|
||||
# TODO: ^ should we have a special separate case
|
||||
# for this ^ ?
|
||||
)
|
||||
and ( # one of,
|
||||
|
||||
(portal := self._portal)
|
||||
and (our_uid := portal.actor.uid)
|
||||
# TODO: ?potentially it is useful to emit certain
|
||||
# warning/cancel logs for the cases where the
|
||||
# cancellation is due to a lower level cancel
|
||||
|
@ -1182,12 +1251,11 @@ class Context:
|
|||
# that case it's not actually this specific ctx that
|
||||
# made a `.cancel()` call, but it is the same
|
||||
# actor-process?
|
||||
and tuple(remote_error.canceller) == our_uid
|
||||
or self.chan._cancel_called
|
||||
or self.canceller == our_uid
|
||||
)
|
||||
) or (
|
||||
# or self.chan._cancel_called
|
||||
# XXX: ^ should we have a special separate case
|
||||
# for this ^, NO right?
|
||||
|
||||
) or (
|
||||
# NOTE: whenever this context is the cause of an
|
||||
# overrun on the remote side (aka we sent msgs too
|
||||
# fast that the remote task was overrun according
|
||||
|
@ -1204,7 +1272,6 @@ class Context:
|
|||
and isinstance(remote_error, RemoteActorError)
|
||||
and remote_error.msgdata['type_str'] == 'StreamOverrun'
|
||||
and tuple(remote_error.msgdata['sender']) == our_uid
|
||||
)
|
||||
):
|
||||
# NOTE: we set the local scope error to any "self
|
||||
# cancellation" error-response thus "absorbing"
|
||||
|
@ -1236,7 +1303,7 @@ class Context:
|
|||
# TODO: change to `.wait_for_result()`?
|
||||
async def result(
|
||||
self,
|
||||
hide_tb: bool = True,
|
||||
hide_tb: bool = False,
|
||||
|
||||
) -> Any|Exception:
|
||||
'''
|
||||
|
@ -1378,7 +1445,20 @@ class Context:
|
|||
if error:
|
||||
return error
|
||||
|
||||
assert not self._cancel_msg
|
||||
if cancmsg := self._cancel_msg:
|
||||
# NOTE: means we're prolly in the process of
|
||||
# processing the cancellation caused by
|
||||
# this msg (eg. logging from `Actor._cancel_task()`
|
||||
# method after receiving a `Context.cancel()` RPC)
|
||||
# though there shouldn't ever be a `._cancel_msg`
|
||||
# without it eventually resulting in this property
|
||||
# delivering a value!
|
||||
log.debug(
|
||||
'`Context._cancel_msg` is set but has not yet resolved to `.maybe_error`?\n\n'
|
||||
f'{cancmsg}\n'
|
||||
)
|
||||
|
||||
# assert not self._cancel_msg
|
||||
return None
|
||||
|
||||
def _final_result_is_set(self) -> bool:
|
||||
|
@ -1411,6 +1491,7 @@ class Context:
|
|||
def repr_outcome(
|
||||
self,
|
||||
show_error_fields: bool = False,
|
||||
type_only: bool = False,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
|
@ -1420,6 +1501,9 @@ class Context:
|
|||
'''
|
||||
merr: Exception|None = self.maybe_error
|
||||
if merr:
|
||||
if type_only:
|
||||
return type(merr).__name__
|
||||
|
||||
# if the error-type is one of ours and has the custom
|
||||
# defined "repr-(in)-one-line" method call it, ow
|
||||
# just deliver the type name.
|
||||
|
@ -1616,8 +1700,6 @@ class Context:
|
|||
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# NOTE: if an error is deteced we should always still
|
||||
# send it through the feeder-mem-chan and expect
|
||||
|
@ -1666,7 +1748,7 @@ class Context:
|
|||
# overrun state and that msg isn't stuck in an
|
||||
# overflow queue what happens?!?
|
||||
|
||||
local_uid = current_actor().uid
|
||||
local_uid = self._actor.uid
|
||||
txt: str = (
|
||||
'on IPC context:\n'
|
||||
|
||||
|
@ -1765,6 +1847,7 @@ def mk_context(
|
|||
ctx = Context(
|
||||
chan=chan,
|
||||
cid=cid,
|
||||
_actor=current_actor(),
|
||||
_send_chan=send_chan,
|
||||
_recv_chan=recv_chan,
|
||||
_nsf=nsf,
|
||||
|
|
|
@ -302,7 +302,7 @@ async def _errors_relayed_via_ipc(
|
|||
)
|
||||
)
|
||||
):
|
||||
# await pause()
|
||||
# await _debug.pause()
|
||||
# XXX QUESTION XXX: is there any case where we'll
|
||||
# want to debug IPC disconnects as a default?
|
||||
# => I can't think of a reason that inspecting this
|
||||
|
@ -322,6 +322,12 @@ async def _errors_relayed_via_ipc(
|
|||
cid=ctx.cid,
|
||||
)
|
||||
|
||||
# NOTE: the src actor should always be packed into the
|
||||
# error.. but how should we verify this?
|
||||
# assert err_msg['src_actor_uid']
|
||||
# if not err_msg['error'].get('src_actor_uid'):
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
if is_rpc:
|
||||
try:
|
||||
await chan.send(err_msg)
|
||||
|
@ -566,6 +572,7 @@ async def _invoke(
|
|||
# inside ._context._drain_to_final_msg()`..
|
||||
# # TODO: remove this ^ right?
|
||||
if ctx._scope.cancelled_caught:
|
||||
our_uid: tuple = actor.uid
|
||||
|
||||
# first check for and raise any remote error
|
||||
# before raising any context cancelled case
|
||||
|
@ -575,8 +582,9 @@ async def _invoke(
|
|||
ctx._maybe_raise_remote_err(re)
|
||||
|
||||
cs: CancelScope = ctx._scope
|
||||
|
||||
if cs.cancel_called:
|
||||
our_uid: tuple = actor.uid
|
||||
|
||||
canceller: tuple = ctx.canceller
|
||||
msg: str = (
|
||||
'actor was cancelled by '
|
||||
|
@ -632,15 +640,6 @@ async def _invoke(
|
|||
# f' |_{ctx}'
|
||||
)
|
||||
|
||||
# TODO: does this ever get set any more or can
|
||||
# we remove it?
|
||||
if ctx._cancel_msg:
|
||||
msg += (
|
||||
# '------ - ------\n'
|
||||
# 'IPC msg:\n'
|
||||
f'\n\n{ctx._cancel_msg}'
|
||||
)
|
||||
|
||||
# task-contex was either cancelled by request using
|
||||
# ``Portal.cancel_actor()`` or ``Context.cancel()``
|
||||
# on the far end, or it was cancelled by the local
|
||||
|
@ -1753,7 +1752,9 @@ class Actor:
|
|||
self,
|
||||
cid: str,
|
||||
parent_chan: Channel,
|
||||
|
||||
requesting_uid: tuple[str, str]|None = None,
|
||||
ipc_msg: dict|None|bool = False,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
|
@ -1764,16 +1765,13 @@ class Actor:
|
|||
in the signature (for now).
|
||||
|
||||
'''
|
||||
# this ctx based lookup ensures the requested task to
|
||||
# be cancelled was indeed spawned by a request from
|
||||
# this channel
|
||||
|
||||
# this ctx based lookup ensures the requested task to be
|
||||
# cancelled was indeed spawned by a request from its
|
||||
# parent (or some grandparent's) channel
|
||||
ctx: Context
|
||||
func: Callable
|
||||
is_complete: trio.Event
|
||||
|
||||
# NOTE: right now this is only implicitly called by
|
||||
# streaming IPC but it should be called
|
||||
# to cancel any remotely spawned task
|
||||
try:
|
||||
(
|
||||
ctx,
|
||||
|
@ -1801,20 +1799,23 @@ class Actor:
|
|||
|
||||
log.cancel(
|
||||
'Cancel request for RPC task\n\n'
|
||||
f'<= ._cancel_task(): {requesting_uid}\n'
|
||||
f' |_ @{ctx.dmaddr}\n\n'
|
||||
f'<= Actor.cancel_task(): {requesting_uid}\n\n'
|
||||
f'=> {ctx._task}\n'
|
||||
f' |_ >> {ctx.repr_rpc}\n'
|
||||
# f' >> Actor._cancel_task() => {ctx._task}\n'
|
||||
# f' |_ {ctx._task}\n\n'
|
||||
|
||||
# TODO: better ascii repr for "supervisor" like
|
||||
# a nursery or context scope?
|
||||
# f'=> {parent_chan}\n'
|
||||
f'=> {ctx._task}\n'
|
||||
# f' |_{ctx._task}\n'
|
||||
# TODO: simplified `Context.__repr__()` fields output
|
||||
# shows only application state-related stuff like,
|
||||
# - ._stream
|
||||
# - .closed
|
||||
# - .started_called
|
||||
# - .. etc.
|
||||
f' >> {ctx.repr_rpc}\n'
|
||||
# f' >> {ctx.repr_rpc}\n'
|
||||
# f' |_ctx: {cid}\n'
|
||||
# f' >> {ctx._nsf}()\n'
|
||||
)
|
||||
|
@ -1824,6 +1825,16 @@ class Actor:
|
|||
):
|
||||
ctx._canceller: tuple = requesting_uid
|
||||
|
||||
# TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and
|
||||
# then raise and pack it here?
|
||||
if (
|
||||
ipc_msg
|
||||
and ctx._cancel_msg is None
|
||||
):
|
||||
# assign RPC msg directly from the loop which usually
|
||||
# the case with `ctx.cancel()` on the other side.
|
||||
ctx._cancel_msg = ipc_msg
|
||||
|
||||
# don't allow cancelling this function mid-execution
|
||||
# (is this necessary?)
|
||||
if func is self._cancel_task:
|
||||
|
@ -1904,10 +1915,15 @@ class Actor:
|
|||
else
|
||||
"IPC channel's "
|
||||
)
|
||||
|
||||
rent_chan_repr: str = (
|
||||
f'|_{parent_chan}'
|
||||
if parent_chan
|
||||
else ''
|
||||
)
|
||||
log.cancel(
|
||||
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
|
||||
f'<= .cancel_rpc_tasks(): {req_uid}\n'
|
||||
f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
|
||||
f' {rent_chan_repr}\n'
|
||||
# f'{self}\n'
|
||||
# f'{tasks_str}'
|
||||
)
|
||||
|
@ -1927,9 +1943,6 @@ class Actor:
|
|||
):
|
||||
continue
|
||||
|
||||
# if func == self._cancel_task:
|
||||
# continue
|
||||
|
||||
# TODO: this maybe block on the task cancellation
|
||||
# and so should really done in a nursery batch?
|
||||
await self._cancel_task(
|
||||
|
@ -2339,6 +2352,8 @@ async def process_messages(
|
|||
await actor._cancel_task(
|
||||
cid,
|
||||
channel,
|
||||
|
||||
ipc_msg=msg,
|
||||
)
|
||||
break
|
||||
|
||||
|
@ -2449,6 +2464,7 @@ async def process_messages(
|
|||
# cancel it!
|
||||
'parent_chan': chan,
|
||||
'requesting_uid': chan.uid,
|
||||
'ipc_msg': msg,
|
||||
}
|
||||
# TODO: remove? already have emit in meth.
|
||||
# log.runtime(
|
||||
|
@ -2737,7 +2753,7 @@ class Arbiter(Actor):
|
|||
sockaddr: tuple[str, int]
|
||||
|
||||
for (aname, _), sockaddr in self._registry.items():
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Actor mailbox info:\n'
|
||||
f'aname: {aname}\n'
|
||||
f'sockaddr: {sockaddr}\n'
|
||||
|
|
|
@ -619,13 +619,15 @@ def _set_trace(
|
|||
actor: tractor.Actor | None = None,
|
||||
pdb: MultiActorPdb | None = None,
|
||||
shield: bool = False,
|
||||
|
||||
extra_frames_up_when_async: int = 1,
|
||||
):
|
||||
__tracebackhide__: bool = True
|
||||
actor: tractor.Actor = actor or current_actor()
|
||||
|
||||
# start 2 levels up in user code
|
||||
frame: FrameType | None = sys._getframe()
|
||||
if frame:
|
||||
# always start 1 level up from THIS in user code.
|
||||
frame: FrameType|None
|
||||
if frame := sys._getframe():
|
||||
frame: FrameType = frame.f_back # type: ignore
|
||||
|
||||
if (
|
||||
|
@ -633,23 +635,39 @@ def _set_trace(
|
|||
and (
|
||||
pdb
|
||||
and actor is not None
|
||||
) or shield
|
||||
)
|
||||
# or shield
|
||||
):
|
||||
msg: str = _pause_msg
|
||||
if shield:
|
||||
# log.warning(
|
||||
msg = (
|
||||
'\n\n'
|
||||
' ------ - ------\n'
|
||||
'Debugger invoked with `shield=True` so an extra\n'
|
||||
'`trio.CancelScope.__exit__()` frame is shown..\n'
|
||||
'\n'
|
||||
'Try going up one frame to see your pause point!\n'
|
||||
'\n'
|
||||
' SORRY we need to fix this!\n'
|
||||
' ------ - ------\n\n'
|
||||
) + msg
|
||||
|
||||
# pdbp.set_trace()
|
||||
# TODO: maybe print the actor supervion tree up to the
|
||||
# root here? Bo
|
||||
log.pdb(
|
||||
f'{_pause_msg}\n'
|
||||
f'{msg}\n'
|
||||
'|\n'
|
||||
f'|_ {actor.uid}\n'
|
||||
)
|
||||
# no f!#$&* idea, but when we're in async land
|
||||
# we need 2x frames up?
|
||||
frame = frame.f_back
|
||||
# frame = frame.f_back
|
||||
|
||||
# if shield:
|
||||
# frame = frame.f_back
|
||||
for i in range(extra_frames_up_when_async):
|
||||
frame: FrameType = frame.f_back
|
||||
log.debug(
|
||||
f'Going up frame {i} -> {frame}\n'
|
||||
)
|
||||
|
||||
else:
|
||||
pdb, undo_sigint = mk_mpdb()
|
||||
|
@ -659,10 +677,9 @@ def _set_trace(
|
|||
Lock.local_task_in_debug = 'sync'
|
||||
|
||||
pdb.set_trace(frame=frame)
|
||||
# undo_
|
||||
|
||||
|
||||
async def pause(
|
||||
async def _pause(
|
||||
|
||||
debug_func: Callable = _set_trace,
|
||||
release_lock_signal: trio.Event | None = None,
|
||||
|
@ -676,27 +693,19 @@ async def pause(
|
|||
# be no way to override it?..
|
||||
# shield: bool = False,
|
||||
|
||||
# TODO:
|
||||
# shield: bool = False
|
||||
shield: bool = False,
|
||||
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
A pause point (more commonly known as a "breakpoint") interrupt
|
||||
instruction for engaging a blocking debugger instance to
|
||||
conduct manual console-based-REPL-interaction from within
|
||||
`tractor`'s async runtime, normally from some single-threaded
|
||||
and currently executing actor-hosted-`trio`-task in some
|
||||
(remote) process.
|
||||
Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()`
|
||||
stack frame when not shielded (since apparently i can't figure out
|
||||
how to hide it using the normal mechanisms..)
|
||||
|
||||
NOTE: we use the semantics "pause" since it better encompasses
|
||||
the entirety of the necessary global-runtime-state-mutation any
|
||||
actor-task must access and lock in order to get full isolated
|
||||
control over the process tree's root TTY:
|
||||
https://en.wikipedia.org/wiki/Breakpoint
|
||||
Hopefully we won't need this in the long run.
|
||||
|
||||
'''
|
||||
# __tracebackhide__ = True
|
||||
__tracebackhide__: bool = True
|
||||
actor = current_actor()
|
||||
pdb, undo_sigint = mk_mpdb()
|
||||
task_name: str = trio.lowlevel.current_task().name
|
||||
|
@ -707,23 +716,10 @@ async def pause(
|
|||
):
|
||||
Lock.local_pdb_complete = trio.Event()
|
||||
|
||||
# if shield:
|
||||
debug_func = partial(
|
||||
debug_func,
|
||||
# shield=shield,
|
||||
)
|
||||
|
||||
# def _exit(self, *args, **kwargs):
|
||||
# __tracebackhide__: bool = True
|
||||
# super().__exit__(*args, **kwargs)
|
||||
|
||||
# trio.CancelScope.__exit__.__tracebackhide__ = True
|
||||
|
||||
# import types
|
||||
# with trio.CancelScope(shield=shield) as cs:
|
||||
# cs.__exit__ = types.MethodType(_exit, cs)
|
||||
# cs.__exit__.__tracebackhide__ = True
|
||||
|
||||
# TODO: need a more robust check for the "root" actor
|
||||
if (
|
||||
not is_root_process()
|
||||
|
@ -812,27 +808,46 @@ async def pause(
|
|||
Lock.repl = pdb
|
||||
|
||||
try:
|
||||
if debug_func is None:
|
||||
# TODO: do we want to support using this **just** for the
|
||||
# locking / common code (prolly to help address #320)?
|
||||
#
|
||||
# if debug_func is None:
|
||||
# assert release_lock_signal, (
|
||||
# 'Must pass `release_lock_signal: trio.Event` if no '
|
||||
# 'trace func provided!'
|
||||
# )
|
||||
print(f"{actor.uid} ENTERING WAIT")
|
||||
task_status.started()
|
||||
|
||||
# print(f"{actor.uid} ENTERING WAIT")
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await release_lock_signal.wait()
|
||||
|
||||
else:
|
||||
# else:
|
||||
# block here one (at the appropriate frame *up*) where
|
||||
# ``breakpoint()`` was awaited and begin handling stdio.
|
||||
log.debug("Entering the synchronous world of pdb")
|
||||
debug_func(actor, pdb)
|
||||
log.debug('Entering sync world of the `pdb` REPL..')
|
||||
try:
|
||||
debug_func(
|
||||
actor,
|
||||
pdb,
|
||||
extra_frames_up_when_async=2,
|
||||
shield=shield,
|
||||
)
|
||||
except BaseException:
|
||||
log.exception(
|
||||
'Failed to invoke internal `debug_func = '
|
||||
f'{debug_func.func.__name__}`\n'
|
||||
)
|
||||
raise
|
||||
|
||||
except bdb.BdbQuit:
|
||||
Lock.release()
|
||||
raise
|
||||
|
||||
except BaseException:
|
||||
log.exception(
|
||||
'Failed to engage debugger via `_pause()` ??\n'
|
||||
)
|
||||
raise
|
||||
|
||||
# XXX: apparently we can't do this without showing this frame
|
||||
# in the backtrace on first entry to the REPL? Seems like an odd
|
||||
# behaviour that should have been fixed by now. This is also why
|
||||
|
@ -845,6 +860,81 @@ async def pause(
|
|||
# # signal.signal = pdbp.hideframe(signal.signal)
|
||||
|
||||
|
||||
async def pause(
|
||||
|
||||
debug_func: Callable = _set_trace,
|
||||
release_lock_signal: trio.Event | None = None,
|
||||
|
||||
# TODO: allow caller to pause despite task cancellation,
|
||||
# exactly the same as wrapping with:
|
||||
# with CancelScope(shield=True):
|
||||
# await pause()
|
||||
# => the REMAINING ISSUE is that the scope's .__exit__() frame
|
||||
# is always show in the debugger on entry.. and there seems to
|
||||
# be no way to override it?..
|
||||
# shield: bool = False,
|
||||
|
||||
shield: bool = False,
|
||||
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
A pause point (more commonly known as a "breakpoint") interrupt
|
||||
instruction for engaging a blocking debugger instance to
|
||||
conduct manual console-based-REPL-interaction from within
|
||||
`tractor`'s async runtime, normally from some single-threaded
|
||||
and currently executing actor-hosted-`trio`-task in some
|
||||
(remote) process.
|
||||
|
||||
NOTE: we use the semantics "pause" since it better encompasses
|
||||
the entirety of the necessary global-runtime-state-mutation any
|
||||
actor-task must access and lock in order to get full isolated
|
||||
control over the process tree's root TTY:
|
||||
https://en.wikipedia.org/wiki/Breakpoint
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
|
||||
if shield:
|
||||
# NOTE XXX: even hard coding this inside the `class CancelScope:`
|
||||
# doesn't seem to work for me!?
|
||||
# ^ XXX ^
|
||||
|
||||
# def _exit(self, *args, **kwargs):
|
||||
# __tracebackhide__: bool = True
|
||||
# super().__exit__(*args, **kwargs)
|
||||
|
||||
trio.CancelScope.__enter__.__tracebackhide__ = True
|
||||
trio.CancelScope.__exit__.__tracebackhide__ = True
|
||||
|
||||
# import types
|
||||
# with trio.CancelScope(shield=shield) as cs:
|
||||
# cs.__exit__ = types.MethodType(_exit, cs)
|
||||
# cs.__exit__.__tracebackhide__ = True
|
||||
|
||||
with trio.CancelScope(shield=shield) as cs:
|
||||
# setattr(cs.__exit__.__func__, '__tracebackhide__', True)
|
||||
# setattr(cs.__enter__.__func__, '__tracebackhide__', True)
|
||||
|
||||
# NOTE: so the caller can always cancel even if shielded
|
||||
task_status.started(cs)
|
||||
return await _pause(
|
||||
debug_func=debug_func,
|
||||
release_lock_signal=release_lock_signal,
|
||||
shield=True,
|
||||
task_status=task_status,
|
||||
)
|
||||
else:
|
||||
return await _pause(
|
||||
debug_func=debug_func,
|
||||
release_lock_signal=release_lock_signal,
|
||||
shield=False,
|
||||
task_status=task_status,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
# TODO: allow pausing from sync code.
|
||||
# normally by remapping python's builtin breakpoint() hook to this
|
||||
# runtime aware version which takes care of all .
|
||||
|
@ -929,6 +1019,10 @@ _crash_msg: str = (
|
|||
def _post_mortem(
|
||||
actor: tractor.Actor,
|
||||
pdb: MultiActorPdb,
|
||||
shield: bool = False,
|
||||
|
||||
# only for compat with `._set_trace()`..
|
||||
extra_frames_up_when_async=0,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -957,7 +1051,7 @@ def _post_mortem(
|
|||
|
||||
post_mortem = partial(
|
||||
pause,
|
||||
_post_mortem,
|
||||
debug_func=_post_mortem,
|
||||
)
|
||||
|
||||
|
||||
|
@ -1043,12 +1137,20 @@ async def maybe_wait_for_debugger(
|
|||
# will make the pdb repl unusable.
|
||||
# Instead try to wait for pdb to be released before
|
||||
# tearing down.
|
||||
sub_in_debug: tuple[str, str]|None = Lock.global_actor_in_debug
|
||||
in_debug: tuple[str, str]|None = Lock.global_actor_in_debug
|
||||
debug_complete: trio.Event|None = Lock.no_remote_has_tty
|
||||
|
||||
if sub_in_debug := Lock.global_actor_in_debug:
|
||||
if in_debug == current_actor().uid:
|
||||
log.debug(
|
||||
msg
|
||||
+
|
||||
'Root already owns the TTY LOCK'
|
||||
)
|
||||
return True
|
||||
|
||||
elif in_debug:
|
||||
msg += (
|
||||
f'Debug `Lock` in use by subactor: {sub_in_debug}\n'
|
||||
f'Debug `Lock` in use by subactor: {in_debug}\n'
|
||||
)
|
||||
# TODO: could this make things more deterministic?
|
||||
# wait to see if a sub-actor task will be
|
||||
|
@ -1065,27 +1167,26 @@ async def maybe_wait_for_debugger(
|
|||
return False
|
||||
|
||||
for istep in range(poll_steps):
|
||||
|
||||
|
||||
if (
|
||||
debug_complete
|
||||
and not debug_complete.is_set()
|
||||
and sub_in_debug is not None
|
||||
and in_debug is not None
|
||||
):
|
||||
log.pdb(
|
||||
msg
|
||||
+
|
||||
'Root is waiting on tty lock to release..\n'
|
||||
)
|
||||
with trio.CancelScope(shield=True):
|
||||
await debug_complete.wait()
|
||||
log.pdb(
|
||||
f'Child subactor released debug lock:'
|
||||
f'|_{sub_in_debug}\n'
|
||||
f'Child subactor released debug lock\n'
|
||||
f'|_{in_debug}\n'
|
||||
)
|
||||
|
||||
# is no subactor locking debugger currently?
|
||||
if (
|
||||
sub_in_debug is None
|
||||
in_debug is None
|
||||
and (
|
||||
debug_complete is None
|
||||
or debug_complete.is_set()
|
||||
|
|
Loading…
Reference in New Issue