Compare commits

...

8 Commits

Author SHA1 Message Date
Tyler Goodlet c025761f15 Adjust `asyncio` test for stricter ctx-self-cancels
Use `expect_ctx()` around the portal cancellation case, toss in
a `'context'` parametrization and return just the `Context.outcome` from
`main()` B)
2024-03-07 21:33:07 -05:00
Tyler Goodlet 2e797ef7ee Update ctx test suites to stricter semantics
Including mostly tweaking asserts on relayed `ContextCancelled`s and
the new pub ctx properties: `.outcome`, `.maybe_error`, etc. as it
pertains to graceful (absorbed) remote cancellation vs. loud ctxc cases
expected to be raised by any `Portal.cancel_actor()` style teardown.

Start checking a variety internals like `._remote/local_error`,
`._is_self_cancelled()`, `._is_final_result_set()`, `._cancel_msg`
where applicable.

Also factor out the new `expect_ctxc()` checker to our `conftest.py` for
use in other suites.
2024-03-07 21:26:57 -05:00
Tyler Goodlet c36deb1f4d Woops, fix `_post_mortem()` type sig..
We're passing a `extra_frames_up_when_async=2` now (from prior attempt
to hide `CancelScope.__exit__()` when `shield=True`) and thus both
`debug_func`s must accept it 🤦

On the brighter side found out that the `TypeError` from the call-sig
mismatch was actually being swallowed entirely so add some
`.exception()` msgs for such cases to at least alert the dev they broke
stuff XD
2024-03-07 21:24:34 -05:00
Tyler Goodlet fa7e37d6ed (Event) more pedantic `.cancel_acked: bool` def
Changes the condition logic to be more strict and moves it to a private
`._is_self_cancelled() -> bool` predicate which can be used elsewhere
(instead of having almost similar duplicate checks all over the
place..) and allows taking in a specific `remote_error` just for
verification purposes (like for tests).

Main strictness distinctions are now:
- obvi that `.cancel_called` is set (this filters any
  `Portal.cancel_actor()` or other out-of-band RPC),
- the received `ContextCancelled` **must** have its `.canceller` set to
  this side's `Actor.uid` (indicating we are the requester).
- `.src_actor_uid` **must** be the same as the `.chan.uid` (so the error
  must have originated from the opposite side's task.
- `ContextCancelled.canceller` should be already set to the `.chan.uid`
  indicating we received the msg via the runtime calling
  `._deliver_msg()` -> `_maybe_cancel_and_set_remote_error()` which
  ensures the error is specifically destined for this ctx-task exactly
  the same as how `Actor._cancel_task()` sets it from an input
  `requesting_uid` arg.

In support of the above adjust some impl deats:
- add `Context._actor: Actor` which is set once in `mk_context()` to
  avoid issues (particularly in testing) where `current_actor()` raises
  after the root actor / runtime is already exited. Use `._actor.uid` in
  both `.cancel_acked` (obvi) and '_maybe_cancel_and_set_remote_error()`
  when deciding whether to call `._scope.cancel()`.
- always cast `.canceller` to `tuple` if not null.
- delegate `.cancel_acked` directly to new private predicate (obvi).
- always set `._canceller` from any `RemoteActorError.src_actor_uid` or
  failing over to the `.chan.uid` when a non-remote error (tho that
  shouldn't ever happen right?).
- more extensive doc-string for `.cancel()` detailing the new strictness
  rules about whether an eventual `.cancel_acked` might be set.

Also tossed in even more logging format tweaks by adding a
`type_only: bool` to `.repr_outcome()` as desired for simpler output in
the `state: <outcome-repr-here>` and `.repr_rpc()` sections of the
`.__str__()`.
2024-03-07 20:35:43 -05:00
Tyler Goodlet 364ea91983 Set `._cancel_msg` to RPC `{cmd: 'self._cancel_task', ..}` msg
Like how we set `Context._cancel_msg` in `._deliver_msg()` (in
which case normally it's an `{'error': ..}` msg), do the same when any
RPC task is remotely cancelled via `Actor._cancel_task` where that task
doesn't yet have a cancel msg set yet.

This makes is much easier to distinguish between ctx cancellations due
to some remote error vs. Explicit remote requests via any of
`Actor.cancel()`, `Portal.cancel_actor()` or `Context.cancel()`.
2024-03-07 18:24:00 -05:00
Tyler Goodlet 7ae9b5319b Tweak inter-peer `._scope` state asserts
We don't expect `._scope.cancelled_caught` to be set really ever on
inter-peer cancellation since no ctx is ever cancelling itself, a peer
cancels some other and then bubbles back to all other peers.

Also add `ids: lambda` for `error_during_ctxerr_handling` param to
`test_peer_canceller()`
2024-03-06 16:09:38 -05:00
Tyler Goodlet 6156ff95f8 Add `shield: bool` support to `.pause()`
It's been on the todo for a while and I've given up trying to properly
hide the `trio.CancelScope.__exit__()` frame for now instead opting to
just `log.pdb()` a big apology XD

Users can obvi still just not use the flag and wrap `tractor.pause()` in
their own cs block if they want to avoid having to hit `'up'` in the pdb
REPL if needed in a cancelled task-scope.

Impl deatz:
- factor orig `.pause()` impl into new `._pause()` so that we can more tersely
  wrap the original content depending on `shield: bool` input; only open
  the cancel-scope when shield is set to avoid aforemented extra strack
  frame annoyance.
- pass through `shield` to underlying `_pause` and `debug_func()` so we
  can actually know when so log our apology.
- add a buncha notes to new `.pause()` wrapper regarding the inability
  to hide the cancel-scope `.__exit__()`, inluding that overriding the
  code in `trio._core._run.CancelScope` doesn't seem to solve the issue
  either..

Unrelated `maybe_wait_for_debugger()` tweaks:
- don't read `Lock.global_actor_in_debug` more then needed, rename local
  read var to `in_debug` (since it can also hold the root actor uid, not
  just sub-actors).
- shield the `await debug_complete.wait()` since ideally we avoid the
  root cancellation child-actors in debug even when the root calls this
  func in a cancelled scope.
2024-03-06 14:37:54 -05:00
Tyler Goodlet 9e3f41a5b1 Tweak inter-peer tests for new/refined semantics
Buncha subtle details changed mostly to do with when `Context.cancel()`
gets called on "real" remote errors vs. (peer requested) cancellation
and then local side handling of `ContextCancelled`.

Specific changes to make tests pass:
- due to raciness with `sleeper_ctx.result()` raising the ctxc locally
  vs. the child-peers receiving similar ctxcs themselves (and then
  erroring and propagating back to the root parent), we might not see
  `._remote_error` set during the sub-ctx loops (except for the sleeper
  itself obvi).
- do not expect `.cancel_called`/`.cancel_caught` to be set on any
  sub-ctx since currently `Context.cancel()` is only called non-shielded
  and thus is not in invoked when `._scope.cancel()` is called as part
  of each root-side ctx ref/block handling the inter-peer ctxc.
- do not expect `Context._scope.cancelled_caught` to be set in most cases
  (even the sleeper)

TODO Outstanding adjustments not fixed yet:
-[ ] `_scope.cancelled_caught` checks outside the `.open_context()`
  blocks.
2024-03-06 10:13:41 -05:00
7 changed files with 646 additions and 300 deletions

View File

@ -1,6 +1,7 @@
""" """
``tractor`` testing!! ``tractor`` testing!!
""" """
from contextlib import asynccontextmanager as acm
import sys import sys
import subprocess import subprocess
import os import os
@ -292,3 +293,26 @@ def daemon(
time.sleep(_PROC_SPAWN_WAIT) time.sleep(_PROC_SPAWN_WAIT)
yield proc yield proc
sig_prog(proc, _INT_SIGNAL) sig_prog(proc, _INT_SIGNAL)
@acm
async def expect_ctxc(
yay: bool,
reraise: bool = False,
) -> None:
'''
Small acm to catch `ContextCancelled` errors when expected
below it in a `async with ()` block.
'''
if yay:
try:
yield
raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled:
if reraise:
raise
else:
return
else:
yield

View File

@ -5,7 +5,6 @@ Verify the we raise errors when streams are opened prior to
sync-opening a ``tractor.Context`` beforehand. sync-opening a ``tractor.Context`` beforehand.
''' '''
from contextlib import asynccontextmanager as acm
from itertools import count from itertools import count
import platform import platform
from pprint import pformat from pprint import pformat
@ -26,7 +25,10 @@ from tractor._exceptions import (
ContextCancelled, ContextCancelled,
) )
from conftest import tractor_test from conftest import (
tractor_test,
expect_ctxc,
)
# ``Context`` semantics are as follows, # ``Context`` semantics are as follows,
# ------------------------------------ # ------------------------------------
@ -194,12 +196,13 @@ def test_simple_context(
) )
try: try:
async with portal.open_context( async with (
portal.open_context(
simple_setup_teardown, simple_setup_teardown,
data=10, data=10,
block_forever=callee_blocks_forever, block_forever=callee_blocks_forever,
) as (ctx, sent): ) as (ctx, sent),
):
assert sent == 11 assert sent == 11
if callee_blocks_forever: if callee_blocks_forever:
@ -250,17 +253,6 @@ def test_simple_context(
trio.run(main) trio.run(main)
@acm
async def expect_ctxc(yay: bool) -> None:
if yay:
try:
yield
except ContextCancelled:
return
else:
yield
@pytest.mark.parametrize( @pytest.mark.parametrize(
'callee_returns_early', 'callee_returns_early',
[True, False], [True, False],
@ -293,6 +285,7 @@ def test_caller_cancels(
) -> None: ) -> None:
actor: Actor = current_actor() actor: Actor = current_actor()
uid: tuple = actor.uid uid: tuple = actor.uid
_ctxc: ContextCancelled|None = None
if ( if (
cancel_method == 'portal' cancel_method == 'portal'
@ -303,6 +296,9 @@ def test_caller_cancels(
assert 0, 'Portal cancel should raise!' assert 0, 'Portal cancel should raise!'
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
# with trio.CancelScope(shield=True):
# await tractor.pause()
_ctxc = ctxc
assert ctx.chan._cancel_called assert ctx.chan._cancel_called
assert ctxc.canceller == uid assert ctxc.canceller == uid
assert ctxc is ctx.maybe_error assert ctxc is ctx.maybe_error
@ -311,7 +307,10 @@ def test_caller_cancels(
# case since self-cancellation should swallow the ctxc # case since self-cancellation should swallow the ctxc
# silently! # silently!
else: else:
try:
res = await ctx.result() res = await ctx.result()
except ContextCancelled as ctxc:
pytest.fail(f'should not have raised ctxc\n{ctxc}')
# we actually get a result # we actually get a result
if callee_returns_early: if callee_returns_early:
@ -342,6 +341,10 @@ def test_caller_cancels(
# await tractor.pause() # await tractor.pause()
# assert ctx._local_error is None # assert ctx._local_error is None
# TODO: don't need this right?
# if _ctxc:
# raise _ctxc
async def main(): async def main():
@ -352,11 +355,19 @@ def test_caller_cancels(
'simple_context', 'simple_context',
enable_modules=[__name__], enable_modules=[__name__],
) )
timeout = 0.5 if not callee_returns_early else 2 timeout: float = (
0.5
if not callee_returns_early
else 2
)
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with ( async with (
expect_ctxc(
expect_ctxc(yay=cancel_method == 'portal'), yay=(
not callee_returns_early
and cancel_method == 'portal'
)
),
portal.open_context( portal.open_context(
simple_setup_teardown, simple_setup_teardown,
@ -372,10 +383,18 @@ def test_caller_cancels(
await trio.sleep(0.5) await trio.sleep(0.5)
if cancel_method == 'ctx': if cancel_method == 'ctx':
print('cancelling with `Context.cancel()`')
await ctx.cancel() await ctx.cancel()
else:
elif cancel_method == 'portal':
print('cancelling with `Portal.cancel_actor()`')
await portal.cancel_actor() await portal.cancel_actor()
else:
pytest.fail(
f'Unknown `cancel_method={cancel_method} ?'
)
if chk_ctx_result_before_exit: if chk_ctx_result_before_exit:
await check_canceller(ctx) await check_canceller(ctx)
@ -385,15 +404,22 @@ def test_caller_cancels(
if cancel_method != 'portal': if cancel_method != 'portal':
await portal.cancel_actor() await portal.cancel_actor()
# since the `.cancel_actor()` call just above # XXX NOTE XXX: non-normal yet purposeful
# will cause the `.open_context().__aexit__()` raise # test-specific ctxc suppression is implemented!
# a ctxc which should in turn cause `ctx._scope` to #
# WHY: the `.cancel_actor()` case (cancel_method='portal')
# will cause both:
# * the `ctx.result()` inside `.open_context().__aexit__()`
# * AND the `ctx.result()` inside `check_canceller()`
# to raise ctxc.
#
# which should in turn cause `ctx._scope` to
# catch any cancellation? # catch any cancellation?
if ( if (
not callee_returns_early not callee_returns_early
and cancel_method == 'portal' and cancel_method != 'portal'
): ):
assert ctx._scope.cancelled_caught assert not ctx._scope.cancelled_caught
trio.run(main) trio.run(main)
@ -511,6 +537,23 @@ async def expect_cancelled(
await stream.send(msg) # echo server await stream.send(msg) # echo server
except trio.Cancelled: except trio.Cancelled:
# on ctx.cancel() the internal RPC scope is cancelled but
# never caught until the func exits.
assert ctx._scope.cancel_called
assert not ctx._scope.cancelled_caught
# should be the RPC cmd request for `._cancel_task()`
assert ctx._cancel_msg
# which, has not yet resolved to an error outcome
# since this rpc func has not yet exited.
assert not ctx.maybe_error
assert not ctx._final_result_is_set()
# debug REPL if needed
# with trio.CancelScope(shield=True):
# await tractor.pause()
# expected case # expected case
_state = False _state = False
raise raise
@ -594,16 +637,16 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
with trio.fail_after(0.2): with trio.fail_after(0.2):
await ctx.result() await ctx.result()
assert 0, "Callee should have blocked!?" assert 0, "Callee should have blocked!?"
except trio.TooSlowError: except trio.TooSlowError:
# NO-OP -> since already called above # NO-OP -> since already called above
await ctx.cancel() await ctx.cancel()
# NOTE: local scope should have absorbed the cancellation since # NOTE: local scope should have absorbed the cancellation since
# in this case we call `ctx.cancel()` and the local # in this case we call `ctx.cancel()` and the local
# `._scope` gets `.cancel_called` on the ctxc ack. # `._scope` does not get `.cancel_called` and thus
# `.cancelled_caught` neither will ever bet set.
if use_ctx_cancel_method: if use_ctx_cancel_method:
assert ctx._scope.cancelled_caught assert not ctx._scope.cancelled_caught
# rxed ctxc response from far end # rxed ctxc response from far end
assert ctx.cancel_acked assert ctx.cancel_acked

View File

@ -19,6 +19,8 @@ from tractor import (
) )
from tractor.trionics import BroadcastReceiver from tractor.trionics import BroadcastReceiver
from conftest import expect_ctxc
async def sleep_and_err( async def sleep_and_err(
sleep_for: float = 0.1, sleep_for: float = 0.1,
@ -190,7 +192,8 @@ async def trio_ctx(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'parent_cancels', [False, True], 'parent_cancels',
['context', 'actor', False],
ids='parent_actor_cancels_child={}'.format ids='parent_actor_cancels_child={}'.format
) )
def test_context_spawns_aio_task_that_errors( def test_context_spawns_aio_task_that_errors(
@ -214,18 +217,36 @@ def test_context_spawns_aio_task_that_errors(
# debug_mode=True, # debug_mode=True,
loglevel='cancel', loglevel='cancel',
) )
async with p.open_context( async with (
expect_ctxc(
yay=parent_cancels == 'actor',
),
p.open_context(
trio_ctx, trio_ctx,
) as (ctx, first): ) as (ctx, first),
):
assert first == 'start' assert first == 'start'
if parent_cancels: if parent_cancels == 'actor':
await p.cancel_actor() await p.cancel_actor()
elif parent_cancels == 'context':
await ctx.cancel()
else:
await trio.sleep_forever() await trio.sleep_forever()
return await ctx.result() async with expect_ctxc(
yay=parent_cancels == 'actor',
):
await ctx.result()
if parent_cancels == 'context':
# to tear down sub-acor
await p.cancel_actor()
return ctx.outcome
if parent_cancels: if parent_cancels:
# bc the parent made the cancel request, # bc the parent made the cancel request,

View File

@ -220,11 +220,12 @@ async def stream_from_peer(
# - what about IPC-transport specific errors, should # - what about IPC-transport specific errors, should
# they bubble from the async for and trigger # they bubble from the async for and trigger
# other special cases? # other special cases?
#
# NOTE: current ctl flow: # NOTE: current ctl flow:
# - stream raises `trio.EndOfChannel` and # - stream raises `trio.EndOfChannel` and
# exits the loop # exits the loop
# - `.open_context()` will raise the ctxcanc # - `.open_context()` will raise the ctxc received
# received from the sleeper. # from the sleeper.
async for msg in stream: async for msg in stream:
assert msg is not None assert msg is not None
print(msg) print(msg)
@ -237,7 +238,12 @@ async def stream_from_peer(
assert peer_ctx._remote_error is ctxerr assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
assert peer_ctx.canceller == ctxerr.canceller
# the peer ctx is the canceller even though it's canceller
# is the "canceller" XD
assert peer_name in peer_ctx.canceller
assert "canceller" in ctxerr.canceller
# caller peer should not be the cancel requester # caller peer should not be the cancel requester
assert not ctx.cancel_called assert not ctx.cancel_called
@ -271,7 +277,6 @@ async def stream_from_peer(
# root/parent actor task should NEVER HAVE cancelled us! # root/parent actor task should NEVER HAVE cancelled us!
assert not ctx.canceller assert not ctx.canceller
assert 'canceller' in peer_ctx.canceller
raise raise
# TODO: IN THEORY we could have other cases depending on # TODO: IN THEORY we could have other cases depending on
@ -291,6 +296,7 @@ async def stream_from_peer(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'error_during_ctxerr_handling', 'error_during_ctxerr_handling',
[False, True], [False, True],
ids=lambda item: f'rte_during_ctxerr={item}',
) )
def test_peer_canceller( def test_peer_canceller(
error_during_ctxerr_handling: bool, error_during_ctxerr_handling: bool,
@ -383,11 +389,11 @@ def test_peer_canceller(
) as (canceller_ctx, sent), ) as (canceller_ctx, sent),
): ):
ctxs: list[Context] = [ ctxs: dict[str, Context] = {
sleeper_ctx, 'sleeper': sleeper_ctx,
caller_ctx, 'caller': caller_ctx,
canceller_ctx, 'canceller': canceller_ctx,
] }
try: try:
print('PRE CONTEXT RESULT') print('PRE CONTEXT RESULT')
@ -491,6 +497,15 @@ def test_peer_canceller(
# should be cancelled by US. # should be cancelled by US.
# #
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
# since we do a rte reraise above, the
# `.open_context()` error handling should have
# raised a local rte, thus the internal
# `.open_context()` enterer task's
# cancel-scope should have raised the RTE, NOT
# a `trio.Cancelled` due to a local
# `._scope.cancel()` call.
assert not sleeper_ctx._scope.cancelled_caught
assert isinstance(loc_err, RuntimeError) assert isinstance(loc_err, RuntimeError)
print(f'_loc_err: {_loc_err}\n') print(f'_loc_err: {_loc_err}\n')
# assert sleeper_ctx._local_error is _loc_err # assert sleeper_ctx._local_error is _loc_err
@ -505,35 +520,35 @@ def test_peer_canceller(
# NOTE: this root actor task should have # NOTE: this root actor task should have
# called `Context.cancel()` on the # called `Context.cancel()` on the
# `.__aexit__()` to every opened ctx. # `.__aexit__()` to every opened ctx.
for ctx in ctxs: for name, ctx in ctxs.items():
assert ctx.cancel_called
# this root actor task should have # this root actor task should have
# cancelled all opened contexts except the # cancelled all opened contexts except the
# sleeper which is obvi by the "canceller" # sleeper which is obvi by the "canceller"
# peer. # peer.
re = ctx._remote_error re = ctx._remote_error
if ( le = ctx._local_error
ctx is sleeper_ctx
or ctx is caller_ctx
):
assert (
re.canceller
==
ctx.canceller
==
canceller.channel.uid
)
else: assert ctx.cancel_called
if ctx is sleeper_ctx:
assert 'canceller' in re.canceller
assert 'sleeper' in ctx.canceller
if ctx is canceller_ctx:
assert ( assert (
re.canceller re.canceller
== ==
ctx.canceller
==
root.uid root.uid
) )
else: # the other 2 ctxs
assert (
re.canceller
==
canceller.channel.uid
)
# since the sleeper errors while handling a # since the sleeper errors while handling a
# peer-cancelled (by ctxc) scenario, we expect # peer-cancelled (by ctxc) scenario, we expect
# that the `.open_context()` block DOES call # that the `.open_context()` block DOES call
@ -554,44 +569,64 @@ def test_peer_canceller(
# propagated # propagated
# #
else: else:
# since sleeper_ctx.result() IS called above
# we should have (silently) absorbed the
# corresponding `ContextCancelled` for it and
# `._scope.cancel()` should never have been
# called.
assert not sleeper_ctx._scope.cancelled_caught
assert isinstance(loc_err, ContextCancelled) assert isinstance(loc_err, ContextCancelled)
assert loc_err.canceller == sleeper_ctx.canceller
assert ( # the received remote error's `.canceller`
loc_err.canceller[0] # will of course be the "canceller" actor BUT
== # the canceller set on the local handle to
sleeper_ctx.canceller[0] # `sleeper_ctx` will be the "sleeper" uid
== # since it's the actor that relayed us the
'canceller' # error which was **caused** by the
) # "canceller".
assert 'sleeper' in sleeper_ctx.canceller
assert 'canceller' == loc_err.canceller[0]
# the sleeper's remote error is the error bubbled # the sleeper's remote error is the error bubbled
# out of the context-stack above! # out of the context-stack above!
re = sleeper_ctx.outcome final_err = sleeper_ctx.outcome
assert ( assert (
re is loc_err final_err is loc_err
is sleeper_ctx.maybe_error is sleeper_ctx.maybe_error
is sleeper_ctx._remote_error is sleeper_ctx._remote_error
) )
for ctx in ctxs: for name, ctx in ctxs.items():
re: BaseException|None = ctx._remote_error re: BaseException|None = ctx._remote_error
re: BaseException|None = ctx.outcome le: BaseException|None = ctx._local_error
assert ( err = ctx.maybe_error
re and out = ctx.outcome
(
re is ctx.maybe_error # every ctx should error!
is ctx._remote_error assert out is err
)
) # the recorded local erro should always be
le: trio.MultiError = ctx._local_error # the same as the one raised by the
# `sleeper_ctx.result()` call
assert ( assert (
le le
and ctx._local_error and
le is loc_err
) )
# root doesn't cancel sleeper since it's # root doesn't cancel sleeper since it's
# cancelled by its peer. # cancelled by its peer.
if ctx is sleeper_ctx: if ctx is sleeper_ctx:
assert re
assert (
ctx._remote_error
is ctx.maybe_error
is ctx.outcome
is ctx._local_error
)
assert not ctx.cancel_called assert not ctx.cancel_called
assert not ctx.cancel_acked assert not ctx.cancel_acked
@ -601,21 +636,49 @@ def test_peer_canceller(
# `ContextCancelled` for it and thus # `ContextCancelled` for it and thus
# the logic inside `.cancelled_caught` # the logic inside `.cancelled_caught`
# should trigger! # should trigger!
assert ctx._scope.cancelled_caught assert not ctx._scope.cancelled_caught
elif ctx is caller_ctx: elif ctx in (
# since its context was remotely caller_ctx,
# cancelled, we never needed to canceller_ctx,
# call `Context.cancel()` bc it was ):
# done by the peer and also we never
assert ctx.cancel_called assert not ctx._remote_error
# neither of the `caller/canceller_ctx` should
# have called `ctx.cancel()` bc the
# canceller's task internally issues
# a `Portal.cancel_actor()` to the
# sleeper and thus never should call
# `ctx.cancel()` per say UNLESS the
# sleeper's `.result()` call above
# ctxc exception results in the
# canceller's
# `.open_context().__aexit__()` error
# handling to kick in BEFORE a remote
# error is delivered - which since
# we're asserting what we are above,
# that should normally be the case
# right?
#
assert not ctx.cancel_called
#
# assert ctx.cancel_called
# orig ^
# TODO: figure out the details of this..? # TODO: figure out the details of this..?
# if you look the `._local_error` here # if you look the `._local_error` here
# is a multi of ctxc + 2 Cancelleds? # is a multi of ctxc + 2 Cancelleds?
# assert not ctx.cancelled_caught # assert not ctx._scope.cancelled_caught
elif ctx is canceller_ctx: assert (
not ctx.cancel_called
and not ctx.cancel_acked
)
assert not ctx._scope.cancelled_caught
# elif ctx is canceller_ctx:
# assert not ctx._remote_error
# XXX NOTE XXX: ONLY the canceller # XXX NOTE XXX: ONLY the canceller
# will get a self-cancelled outcome # will get a self-cancelled outcome
@ -626,11 +689,6 @@ def test_peer_canceller(
# .cancel() whenever an interpeer # .cancel() whenever an interpeer
# cancel takes place since each # cancel takes place since each
# reception of a ctxc # reception of a ctxc
assert (
ctx.cancel_called
and ctx.cancel_acked
)
assert not ctx._scope.cancelled_caught
else: else:
pytest.fail( pytest.fail(
@ -663,7 +721,7 @@ def test_peer_canceller(
# `.open_context()` block has exited and should be # `.open_context()` block has exited and should be
# set in both outcomes including the case where # set in both outcomes including the case where
# ctx-cancel handling itself errors. # ctx-cancel handling itself errors.
assert sleeper_ctx._scope.cancelled_caught assert not sleeper_ctx._scope.cancelled_caught
assert _loc_err is sleeper_ctx._local_error assert _loc_err is sleeper_ctx._local_error
assert ( assert (
sleeper_ctx.outcome sleeper_ctx.outcome

View File

@ -364,6 +364,9 @@ class Context:
''' '''
chan: Channel chan: Channel
cid: str # "context id", more or less a unique linked-task-pair id cid: str # "context id", more or less a unique linked-task-pair id
_actor: Actor
# the "feeder" channels for delivering message values to the # the "feeder" channels for delivering message values to the
# local task from the runtime's msg processing loop. # local task from the runtime's msg processing loop.
_recv_chan: trio.MemoryReceiveChannel _recv_chan: trio.MemoryReceiveChannel
@ -429,7 +432,7 @@ class Context:
# there's always going to be an "underlying reason" that any # there's always going to be an "underlying reason" that any
# context was closed due to either a remote side error or # context was closed due to either a remote side error or
# a call to `.cancel()` which triggers `ContextCancelled`. # a call to `.cancel()` which triggers `ContextCancelled`.
_cancel_msg: str | dict | None = None _cancel_msg: str|dict|None = None
# NOTE: this state var used by the runtime to determine if the # NOTE: this state var used by the runtime to determine if the
# `pdbp` REPL is allowed to engage on contexts terminated via # `pdbp` REPL is allowed to engage on contexts terminated via
@ -486,6 +489,13 @@ class Context:
f' {stream}\n' f' {stream}\n'
) )
outcome_str: str = self.repr_outcome(
show_error_fields=True
)
outcome_typ_str: str = self.repr_outcome(
type_only=True
)
return ( return (
f'<Context(\n' f'<Context(\n'
# f'\n' # f'\n'
@ -505,8 +515,16 @@ class Context:
# f' ---\n' # f' ---\n'
f'\n' f'\n'
# f' -----\n' # f' -----\n'
f' |_state: {self.repr_outcome()}\n' #
f' outcome{ds}{self.repr_outcome(show_error_fields=True)}\n' # TODO: better state `str`ids?
# -[ ] maybe map err-types to strs like 'cancelled',
# 'errored', 'streaming', 'started', .. etc.
# -[ ] as well as a final result wrapper like
# `outcome.Value`?
#
f' |_state: {outcome_typ_str}\n'
f' outcome{ds}{outcome_str}\n'
f' result{ds}{self._result}\n' f' result{ds}{self._result}\n'
f' cancel_called{ds}{self.cancel_called}\n' f' cancel_called{ds}{self.cancel_called}\n'
f' cancel_acked{ds}{self.cancel_acked}\n' f' cancel_acked{ds}{self.cancel_acked}\n'
@ -545,14 +563,46 @@ class Context:
return self._cancel_called return self._cancel_called
@property @property
def canceller(self) -> tuple[str, str] | None: def canceller(self) -> tuple[str, str]|None:
''' '''
``Actor.uid: tuple[str, str]`` of the (remote) ``Actor.uid: tuple[str, str]`` of the (remote)
actor-process who's task was cancelled thus causing this actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled. (side of the) context to also be cancelled.
''' '''
return self._canceller if canc := self._canceller:
return tuple(canc)
return None
def _is_self_cancelled(
self,
remote_error: Exception|None = None,
) -> bool:
if not self._cancel_called:
return False
re: BaseException|None = (
remote_error
or self._remote_error
)
if not re:
return False
if from_uid := re.src_actor_uid:
from_uid: tuple = tuple(from_uid)
our_uid: tuple = self._actor.uid
our_canceller = self.canceller
return bool(
isinstance(re, ContextCancelled)
and from_uid == self.chan.uid
and re.canceller == our_uid
and our_canceller == from_uid
)
@property @property
def cancel_acked(self) -> bool: def cancel_acked(self) -> bool:
@ -568,22 +618,7 @@ class Context:
equal to the uid of the calling task's actor. equal to the uid of the calling task's actor.
''' '''
portal: Portal|None = self._portal return self._is_self_cancelled()
if portal:
our_uid: tuple = portal.actor.uid
return bool(
self._cancel_called
and (re := self._remote_error)
and isinstance(re, ContextCancelled)
and (
re.canceller
==
self.canceller
==
our_uid
)
)
@property @property
def cancelled_caught(self) -> bool: def cancelled_caught(self) -> bool:
@ -762,30 +797,15 @@ class Context:
# self-cancel (ack) or, # self-cancel (ack) or,
# peer propagated remote cancellation. # peer propagated remote cancellation.
if isinstance(error, ContextCancelled): if isinstance(error, ContextCancelled):
ctxc_src: tuple = error.canceller
whom: str = ( whom: str = (
'us' if ctxc_src == current_actor().uid 'us' if error.canceller == self._actor.uid
else 'peer' else 'peer'
) )
log.cancel( log.cancel(
f'IPC context cancelled by {whom}!\n\n' f'IPC context cancelled by {whom}!\n\n'
f'{error}' f'{error}'
) )
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
self._canceller = ctxc_src
if self._cancel_called:
# this is an expected cancel request response
# message and we **don't need to raise it** in the
# local cancel `._scope` since it will potentially
# override a real error. After this returns
# `.cancel_acked == True`.
return
else: else:
log.error( log.error(
@ -794,7 +814,23 @@ class Context:
f'{error}\n' f'{error}\n'
f'{pformat(self)}\n' f'{pformat(self)}\n'
) )
self._canceller = self.chan.uid
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
maybe_error_src: tuple = getattr(
error,
'src_actor_uid',
None,
)
self._canceller = (
maybe_error_src
or
# XXX: in the case we get a non-boxed error?
# -> wait but this should never happen right?
self.chan.uid
)
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
@ -803,6 +839,15 @@ class Context:
cs: trio.CancelScope = self._scope cs: trio.CancelScope = self._scope
if ( if (
cs cs
# XXX this is an expected cancel request response
# message and we **don't need to raise it** in the
# local cancel `._scope` since it will potentially
# override a real error. After this method returns
# if `._cancel_called` then `.cancel_acked and .cancel_called`
# always should be set.
and not self._is_self_cancelled()
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
): ):
@ -840,9 +885,13 @@ class Context:
) -> str: ) -> str:
# TODO: how to show the transport interchange fmt? # TODO: how to show the transport interchange fmt?
# codec: str = self.chan.transport.codec_key # codec: str = self.chan.transport.codec_key
outcome_str: str = self.repr_outcome(
show_error_fields=True,
type_only=True,
)
return ( return (
# f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:' # f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:'
f'{self._nsf}() -> {self.repr_outcome()}:' f'{self._nsf}() -> {outcome_str}:'
) )
async def cancel( async def cancel(
@ -851,10 +900,32 @@ class Context:
) -> None: ) -> None:
''' '''
Cancel this inter-actor-task context. Cancel this inter-actor IPC context by requestng the
remote side's cancel-scope-linked `trio.Task` by calling
`._scope.cancel()` and delivering an `ContextCancelled`
ack msg in reponse.
Request that the far side cancel it's current linked context, Behaviour:
Timeout quickly in an attempt to sidestep 2-generals... ---------
- after the far end cancels, the `.cancel()` calling side
should receive a `ContextCancelled` with the
`.canceller: tuple` uid set to the current `Actor.uid`.
- timeout (quickly) on failure to rx this ACK error-msg in
an attempt to sidestep 2-generals when the transport
layer fails.
Note, that calling this method DOES NOT also necessarily
result in `Context._scope.cancel()` being called
**locally**!
=> That is, an IPC `Context` (this) **does not**
have the same semantics as a `trio.CancelScope`.
If the caller (who entered the `Portal.open_context()`)
desires that the internal block's cancel-scope be
cancelled it should open its own `trio.CancelScope` and
manage it as needed.
''' '''
side: str = self.side side: str = self.side
@ -976,7 +1047,7 @@ class Context:
``trio``'s cancellation system. ``trio``'s cancellation system.
''' '''
actor: Actor = current_actor() actor: Actor = self._actor
# If the surrounding context has been cancelled by some # If the surrounding context has been cancelled by some
# task with a handle to THIS, we error here immediately # task with a handle to THIS, we error here immediately
@ -1149,32 +1220,30 @@ class Context:
a cancellation (if any). a cancellation (if any).
''' '''
if (( our_uid: tuple = self.chan.uid
# NOTE: whenever the context's "opener" side (task) **is**
# the side which requested the cancellation (likekly via # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
# ``Context.cancel()``), we don't want to re-raise that # for "graceful cancellation" case:
# cancellation signal locally (would be akin to #
# a ``trio.Nursery`` nursery raising ``trio.Cancelled`` # Whenever a "side" of a context (a `trio.Task` running in
# whenever ``CancelScope.cancel()`` was called) and # an actor) **is** the side which requested ctx
# instead silently reap the expected cancellation # cancellation (likekly via ``Context.cancel()``), we
# "error"-msg-as-ack. In this case the `err: # **don't** want to re-raise any eventually received
# ContextCancelled` must have a `.canceller` set to the # `ContextCancelled` response locally (would be akin to
# uid of the requesting task's actor and we only do NOT # a `trio.Nursery` nursery raising `trio.Cancelled`
# raise that error locally if WE ARE THAT ACTOR which # whenever `CancelScope.cancel()` was called).
# requested the cancellation. #
# Instead, silently reap the remote delivered ctxc
# (`ContextCancelled`) as an expected
# error-msg-is-cancellation-ack IFF said
# `remote_error: ContextCancelled` has `.canceller`
# set to the `Actor.uid` of THIS task (i.e. the
# cancellation requesting task's actor is the actor
# checking whether it should absorb the ctxc).
if (
not raise_ctxc_from_self_call not raise_ctxc_from_self_call
and isinstance(remote_error, ContextCancelled) and self._is_self_cancelled(remote_error)
and (
self._cancel_called
# or self.chan._cancel_called
# TODO: ^ should we have a special separate case
# for this ^ ?
)
and ( # one of,
(portal := self._portal)
and (our_uid := portal.actor.uid)
# TODO: ?potentially it is useful to emit certain # TODO: ?potentially it is useful to emit certain
# warning/cancel logs for the cases where the # warning/cancel logs for the cases where the
# cancellation is due to a lower level cancel # cancellation is due to a lower level cancel
@ -1182,12 +1251,11 @@ class Context:
# that case it's not actually this specific ctx that # that case it's not actually this specific ctx that
# made a `.cancel()` call, but it is the same # made a `.cancel()` call, but it is the same
# actor-process? # actor-process?
and tuple(remote_error.canceller) == our_uid # or self.chan._cancel_called
or self.chan._cancel_called # XXX: ^ should we have a special separate case
or self.canceller == our_uid # for this ^, NO right?
)
) or (
) or (
# NOTE: whenever this context is the cause of an # NOTE: whenever this context is the cause of an
# overrun on the remote side (aka we sent msgs too # overrun on the remote side (aka we sent msgs too
# fast that the remote task was overrun according # fast that the remote task was overrun according
@ -1204,7 +1272,6 @@ class Context:
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.msgdata['type_str'] == 'StreamOverrun' and remote_error.msgdata['type_str'] == 'StreamOverrun'
and tuple(remote_error.msgdata['sender']) == our_uid and tuple(remote_error.msgdata['sender']) == our_uid
)
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing" # cancellation" error-response thus "absorbing"
@ -1236,7 +1303,7 @@ class Context:
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
async def result( async def result(
self, self,
hide_tb: bool = True, hide_tb: bool = False,
) -> Any|Exception: ) -> Any|Exception:
''' '''
@ -1378,7 +1445,20 @@ class Context:
if error: if error:
return error return error
assert not self._cancel_msg if cancmsg := self._cancel_msg:
# NOTE: means we're prolly in the process of
# processing the cancellation caused by
# this msg (eg. logging from `Actor._cancel_task()`
# method after receiving a `Context.cancel()` RPC)
# though there shouldn't ever be a `._cancel_msg`
# without it eventually resulting in this property
# delivering a value!
log.debug(
'`Context._cancel_msg` is set but has not yet resolved to `.maybe_error`?\n\n'
f'{cancmsg}\n'
)
# assert not self._cancel_msg
return None return None
def _final_result_is_set(self) -> bool: def _final_result_is_set(self) -> bool:
@ -1411,6 +1491,7 @@ class Context:
def repr_outcome( def repr_outcome(
self, self,
show_error_fields: bool = False, show_error_fields: bool = False,
type_only: bool = False,
) -> str: ) -> str:
''' '''
@ -1420,6 +1501,9 @@ class Context:
''' '''
merr: Exception|None = self.maybe_error merr: Exception|None = self.maybe_error
if merr: if merr:
if type_only:
return type(merr).__name__
# if the error-type is one of ours and has the custom # if the error-type is one of ours and has the custom
# defined "repr-(in)-one-line" method call it, ow # defined "repr-(in)-one-line" method call it, ow
# just deliver the type name. # just deliver the type name.
@ -1616,8 +1700,6 @@ class Context:
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
# from .devx._debug import pause
# await pause()
# NOTE: if an error is deteced we should always still # NOTE: if an error is deteced we should always still
# send it through the feeder-mem-chan and expect # send it through the feeder-mem-chan and expect
@ -1666,7 +1748,7 @@ class Context:
# overrun state and that msg isn't stuck in an # overrun state and that msg isn't stuck in an
# overflow queue what happens?!? # overflow queue what happens?!?
local_uid = current_actor().uid local_uid = self._actor.uid
txt: str = ( txt: str = (
'on IPC context:\n' 'on IPC context:\n'
@ -1765,6 +1847,7 @@ def mk_context(
ctx = Context( ctx = Context(
chan=chan, chan=chan,
cid=cid, cid=cid,
_actor=current_actor(),
_send_chan=send_chan, _send_chan=send_chan,
_recv_chan=recv_chan, _recv_chan=recv_chan,
_nsf=nsf, _nsf=nsf,

View File

@ -302,7 +302,7 @@ async def _errors_relayed_via_ipc(
) )
) )
): ):
# await pause() # await _debug.pause()
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this # => I can't think of a reason that inspecting this
@ -322,6 +322,12 @@ async def _errors_relayed_via_ipc(
cid=ctx.cid, cid=ctx.cid,
) )
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
if is_rpc: if is_rpc:
try: try:
await chan.send(err_msg) await chan.send(err_msg)
@ -566,6 +572,7 @@ async def _invoke(
# inside ._context._drain_to_final_msg()`.. # inside ._context._drain_to_final_msg()`..
# # TODO: remove this ^ right? # # TODO: remove this ^ right?
if ctx._scope.cancelled_caught: if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid
# first check for and raise any remote error # first check for and raise any remote error
# before raising any context cancelled case # before raising any context cancelled case
@ -575,8 +582,9 @@ async def _invoke(
ctx._maybe_raise_remote_err(re) ctx._maybe_raise_remote_err(re)
cs: CancelScope = ctx._scope cs: CancelScope = ctx._scope
if cs.cancel_called: if cs.cancel_called:
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
msg: str = ( msg: str = (
'actor was cancelled by ' 'actor was cancelled by '
@ -632,15 +640,6 @@ async def _invoke(
# f' |_{ctx}' # f' |_{ctx}'
) )
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += (
# '------ - ------\n'
# 'IPC msg:\n'
f'\n\n{ctx._cancel_msg}'
)
# task-contex was either cancelled by request using # task-contex was either cancelled by request using
# ``Portal.cancel_actor()`` or ``Context.cancel()`` # ``Portal.cancel_actor()`` or ``Context.cancel()``
# on the far end, or it was cancelled by the local # on the far end, or it was cancelled by the local
@ -1753,7 +1752,9 @@ class Actor:
self, self,
cid: str, cid: str,
parent_chan: Channel, parent_chan: Channel,
requesting_uid: tuple[str, str] | None = None,
requesting_uid: tuple[str, str]|None = None,
ipc_msg: dict|None|bool = False,
) -> bool: ) -> bool:
''' '''
@ -1764,16 +1765,13 @@ class Actor:
in the signature (for now). in the signature (for now).
''' '''
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from # this ctx based lookup ensures the requested task to be
# this channel # cancelled was indeed spawned by a request from its
# parent (or some grandparent's) channel
ctx: Context ctx: Context
func: Callable func: Callable
is_complete: trio.Event is_complete: trio.Event
# NOTE: right now this is only implicitly called by
# streaming IPC but it should be called
# to cancel any remotely spawned task
try: try:
( (
ctx, ctx,
@ -1801,20 +1799,23 @@ class Actor:
log.cancel( log.cancel(
'Cancel request for RPC task\n\n' 'Cancel request for RPC task\n\n'
f'<= ._cancel_task(): {requesting_uid}\n' f'<= Actor.cancel_task(): {requesting_uid}\n\n'
f' |_ @{ctx.dmaddr}\n\n' f'=> {ctx._task}\n'
f' |_ >> {ctx.repr_rpc}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n'
# f' |_ {ctx._task}\n\n'
# TODO: better ascii repr for "supervisor" like # TODO: better ascii repr for "supervisor" like
# a nursery or context scope? # a nursery or context scope?
# f'=> {parent_chan}\n' # f'=> {parent_chan}\n'
f'=> {ctx._task}\n' # f' |_{ctx._task}\n'
# TODO: simplified `Context.__repr__()` fields output # TODO: simplified `Context.__repr__()` fields output
# shows only application state-related stuff like, # shows only application state-related stuff like,
# - ._stream # - ._stream
# - .closed # - .closed
# - .started_called # - .started_called
# - .. etc. # - .. etc.
f' >> {ctx.repr_rpc}\n' # f' >> {ctx.repr_rpc}\n'
# f' |_ctx: {cid}\n' # f' |_ctx: {cid}\n'
# f' >> {ctx._nsf}()\n' # f' >> {ctx._nsf}()\n'
) )
@ -1824,6 +1825,16 @@ class Actor:
): ):
ctx._canceller: tuple = requesting_uid ctx._canceller: tuple = requesting_uid
# TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and
# then raise and pack it here?
if (
ipc_msg
and ctx._cancel_msg is None
):
# assign RPC msg directly from the loop which usually
# the case with `ctx.cancel()` on the other side.
ctx._cancel_msg = ipc_msg
# don't allow cancelling this function mid-execution # don't allow cancelling this function mid-execution
# (is this necessary?) # (is this necessary?)
if func is self._cancel_task: if func is self._cancel_task:
@ -1904,10 +1915,15 @@ class Actor:
else else
"IPC channel's " "IPC channel's "
) )
rent_chan_repr: str = (
f'|_{parent_chan}'
if parent_chan
else ''
)
log.cancel( log.cancel(
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n' f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
f'<= .cancel_rpc_tasks(): {req_uid}\n' f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
f' {rent_chan_repr}\n'
# f'{self}\n' # f'{self}\n'
# f'{tasks_str}' # f'{tasks_str}'
) )
@ -1927,9 +1943,6 @@ class Actor:
): ):
continue continue
# if func == self._cancel_task:
# continue
# TODO: this maybe block on the task cancellation # TODO: this maybe block on the task cancellation
# and so should really done in a nursery batch? # and so should really done in a nursery batch?
await self._cancel_task( await self._cancel_task(
@ -2339,6 +2352,8 @@ async def process_messages(
await actor._cancel_task( await actor._cancel_task(
cid, cid,
channel, channel,
ipc_msg=msg,
) )
break break
@ -2449,6 +2464,7 @@ async def process_messages(
# cancel it! # cancel it!
'parent_chan': chan, 'parent_chan': chan,
'requesting_uid': chan.uid, 'requesting_uid': chan.uid,
'ipc_msg': msg,
} }
# TODO: remove? already have emit in meth. # TODO: remove? already have emit in meth.
# log.runtime( # log.runtime(
@ -2737,7 +2753,7 @@ class Arbiter(Actor):
sockaddr: tuple[str, int] sockaddr: tuple[str, int]
for (aname, _), sockaddr in self._registry.items(): for (aname, _), sockaddr in self._registry.items():
log.info( log.runtime(
f'Actor mailbox info:\n' f'Actor mailbox info:\n'
f'aname: {aname}\n' f'aname: {aname}\n'
f'sockaddr: {sockaddr}\n' f'sockaddr: {sockaddr}\n'

View File

@ -95,12 +95,12 @@ class Lock:
# and must be cancelled if this actor is cancelled via IPC # and must be cancelled if this actor is cancelled via IPC
# request-message otherwise deadlocks with the parent actor may # request-message otherwise deadlocks with the parent actor may
# ensure # ensure
_debugger_request_cs: trio.CancelScope | None = None _debugger_request_cs: trio.CancelScope|None = None
# NOTE: set only in the root actor for the **local** root spawned task # NOTE: set only in the root actor for the **local** root spawned task
# which has acquired the lock (i.e. this is on the callee side of # which has acquired the lock (i.e. this is on the callee side of
# the `lock_tty_for_child()` context entry). # the `lock_tty_for_child()` context entry).
_root_local_task_cs_in_debug: trio.CancelScope | None = None _root_local_task_cs_in_debug: trio.CancelScope|None = None
# actor tree-wide actor uid that supposedly has the tty lock # actor tree-wide actor uid that supposedly has the tty lock
global_actor_in_debug: tuple[str, str] = None global_actor_in_debug: tuple[str, str] = None
@ -619,13 +619,15 @@ def _set_trace(
actor: tractor.Actor | None = None, actor: tractor.Actor | None = None,
pdb: MultiActorPdb | None = None, pdb: MultiActorPdb | None = None,
shield: bool = False, shield: bool = False,
extra_frames_up_when_async: int = 1,
): ):
__tracebackhide__: bool = True __tracebackhide__: bool = True
actor: tractor.Actor = actor or current_actor() actor: tractor.Actor = actor or current_actor()
# start 2 levels up in user code # always start 1 level up from THIS in user code.
frame: FrameType | None = sys._getframe() frame: FrameType|None
if frame: if frame := sys._getframe():
frame: FrameType = frame.f_back # type: ignore frame: FrameType = frame.f_back # type: ignore
if ( if (
@ -633,23 +635,39 @@ def _set_trace(
and ( and (
pdb pdb
and actor is not None and actor is not None
) or shield )
# or shield
): ):
msg: str = _pause_msg
if shield:
# log.warning(
msg = (
'\n\n'
' ------ - ------\n'
'Debugger invoked with `shield=True` so an extra\n'
'`trio.CancelScope.__exit__()` frame is shown..\n'
'\n'
'Try going up one frame to see your pause point!\n'
'\n'
' SORRY we need to fix this!\n'
' ------ - ------\n\n'
) + msg
# pdbp.set_trace() # pdbp.set_trace()
# TODO: maybe print the actor supervion tree up to the # TODO: maybe print the actor supervion tree up to the
# root here? Bo # root here? Bo
log.pdb( log.pdb(
f'{_pause_msg}\n' f'{msg}\n'
'|\n' '|\n'
f'|_ {actor.uid}\n' f'|_ {actor.uid}\n'
) )
# no f!#$&* idea, but when we're in async land # no f!#$&* idea, but when we're in async land
# we need 2x frames up? # we need 2x frames up?
frame = frame.f_back for i in range(extra_frames_up_when_async):
# frame = frame.f_back frame: FrameType = frame.f_back
log.debug(
# if shield: f'Going up frame {i} -> {frame}\n'
# frame = frame.f_back )
else: else:
pdb, undo_sigint = mk_mpdb() pdb, undo_sigint = mk_mpdb()
@ -659,10 +677,9 @@ def _set_trace(
Lock.local_task_in_debug = 'sync' Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame) pdb.set_trace(frame=frame)
# undo_
async def pause( async def _pause(
debug_func: Callable = _set_trace, debug_func: Callable = _set_trace,
release_lock_signal: trio.Event | None = None, release_lock_signal: trio.Event | None = None,
@ -676,27 +693,19 @@ async def pause(
# be no way to override it?.. # be no way to override it?..
# shield: bool = False, # shield: bool = False,
# TODO: shield: bool = False,
# shield: bool = False
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
''' '''
A pause point (more commonly known as a "breakpoint") interrupt Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()`
instruction for engaging a blocking debugger instance to stack frame when not shielded (since apparently i can't figure out
conduct manual console-based-REPL-interaction from within how to hide it using the normal mechanisms..)
`tractor`'s async runtime, normally from some single-threaded
and currently executing actor-hosted-`trio`-task in some
(remote) process.
NOTE: we use the semantics "pause" since it better encompasses Hopefully we won't need this in the long run.
the entirety of the necessary global-runtime-state-mutation any
actor-task must access and lock in order to get full isolated
control over the process tree's root TTY:
https://en.wikipedia.org/wiki/Breakpoint
''' '''
# __tracebackhide__ = True __tracebackhide__: bool = True
actor = current_actor() actor = current_actor()
pdb, undo_sigint = mk_mpdb() pdb, undo_sigint = mk_mpdb()
task_name: str = trio.lowlevel.current_task().name task_name: str = trio.lowlevel.current_task().name
@ -707,23 +716,10 @@ async def pause(
): ):
Lock.local_pdb_complete = trio.Event() Lock.local_pdb_complete = trio.Event()
# if shield:
debug_func = partial( debug_func = partial(
debug_func, debug_func,
# shield=shield,
) )
# def _exit(self, *args, **kwargs):
# __tracebackhide__: bool = True
# super().__exit__(*args, **kwargs)
# trio.CancelScope.__exit__.__tracebackhide__ = True
# import types
# with trio.CancelScope(shield=shield) as cs:
# cs.__exit__ = types.MethodType(_exit, cs)
# cs.__exit__.__tracebackhide__ = True
# TODO: need a more robust check for the "root" actor # TODO: need a more robust check for the "root" actor
if ( if (
not is_root_process() not is_root_process()
@ -812,37 +808,131 @@ async def pause(
Lock.repl = pdb Lock.repl = pdb
try: try:
if debug_func is None: # TODO: do we want to support using this **just** for the
# locking / common code (prolly to help address #320)?
#
# if debug_func is None:
# assert release_lock_signal, ( # assert release_lock_signal, (
# 'Must pass `release_lock_signal: trio.Event` if no ' # 'Must pass `release_lock_signal: trio.Event` if no '
# 'trace func provided!' # 'trace func provided!'
# ) # )
print(f"{actor.uid} ENTERING WAIT") # print(f"{actor.uid} ENTERING WAIT")
task_status.started()
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
# await release_lock_signal.wait() # await release_lock_signal.wait()
else: # else:
# block here one (at the appropriate frame *up*) where # block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio. # ``breakpoint()`` was awaited and begin handling stdio.
log.debug("Entering the synchronous world of pdb") log.debug('Entering sync world of the `pdb` REPL..')
debug_func(actor, pdb) try:
debug_func(
actor,
pdb,
extra_frames_up_when_async=2,
shield=shield,
)
except BaseException:
log.exception(
'Failed to invoke internal `debug_func = '
f'{debug_func.func.__name__}`\n'
)
raise
except bdb.BdbQuit: except bdb.BdbQuit:
Lock.release() Lock.release()
raise raise
# XXX: apparently we can't do this without showing this frame except BaseException:
# in the backtrace on first entry to the REPL? Seems like an odd log.exception(
# behaviour that should have been fixed by now. This is also why 'Failed to engage debugger via `_pause()` ??\n'
# we scrapped all the @cm approaches that were tried previously. )
# finally: raise
# __tracebackhide__ = True
# # frame = sys._getframe() # XXX: apparently we can't do this without showing this frame
# # last_f = frame.f_back # in the backtrace on first entry to the REPL? Seems like an odd
# # last_f.f_globals['__tracebackhide__'] = True # behaviour that should have been fixed by now. This is also why
# # signal.signal = pdbp.hideframe(signal.signal) # we scrapped all the @cm approaches that were tried previously.
# finally:
# __tracebackhide__ = True
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbp.hideframe(signal.signal)
async def pause(
debug_func: Callable = _set_trace,
release_lock_signal: trio.Event | None = None,
# TODO: allow caller to pause despite task cancellation,
# exactly the same as wrapping with:
# with CancelScope(shield=True):
# await pause()
# => the REMAINING ISSUE is that the scope's .__exit__() frame
# is always show in the debugger on entry.. and there seems to
# be no way to override it?..
# shield: bool = False,
shield: bool = False,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
) -> None:
'''
A pause point (more commonly known as a "breakpoint") interrupt
instruction for engaging a blocking debugger instance to
conduct manual console-based-REPL-interaction from within
`tractor`'s async runtime, normally from some single-threaded
and currently executing actor-hosted-`trio`-task in some
(remote) process.
NOTE: we use the semantics "pause" since it better encompasses
the entirety of the necessary global-runtime-state-mutation any
actor-task must access and lock in order to get full isolated
control over the process tree's root TTY:
https://en.wikipedia.org/wiki/Breakpoint
'''
__tracebackhide__: bool = True
if shield:
# NOTE XXX: even hard coding this inside the `class CancelScope:`
# doesn't seem to work for me!?
# ^ XXX ^
# def _exit(self, *args, **kwargs):
# __tracebackhide__: bool = True
# super().__exit__(*args, **kwargs)
trio.CancelScope.__enter__.__tracebackhide__ = True
trio.CancelScope.__exit__.__tracebackhide__ = True
# import types
# with trio.CancelScope(shield=shield) as cs:
# cs.__exit__ = types.MethodType(_exit, cs)
# cs.__exit__.__tracebackhide__ = True
with trio.CancelScope(shield=shield) as cs:
# setattr(cs.__exit__.__func__, '__tracebackhide__', True)
# setattr(cs.__enter__.__func__, '__tracebackhide__', True)
# NOTE: so the caller can always cancel even if shielded
task_status.started(cs)
return await _pause(
debug_func=debug_func,
release_lock_signal=release_lock_signal,
shield=True,
task_status=task_status,
)
else:
return await _pause(
debug_func=debug_func,
release_lock_signal=release_lock_signal,
shield=False,
task_status=task_status,
)
# TODO: allow pausing from sync code. # TODO: allow pausing from sync code.
@ -929,6 +1019,10 @@ _crash_msg: str = (
def _post_mortem( def _post_mortem(
actor: tractor.Actor, actor: tractor.Actor,
pdb: MultiActorPdb, pdb: MultiActorPdb,
shield: bool = False,
# only for compat with `._set_trace()`..
extra_frames_up_when_async=0,
) -> None: ) -> None:
''' '''
@ -957,7 +1051,7 @@ def _post_mortem(
post_mortem = partial( post_mortem = partial(
pause, pause,
_post_mortem, debug_func=_post_mortem,
) )
@ -1043,12 +1137,20 @@ async def maybe_wait_for_debugger(
# will make the pdb repl unusable. # will make the pdb repl unusable.
# Instead try to wait for pdb to be released before # Instead try to wait for pdb to be released before
# tearing down. # tearing down.
sub_in_debug: tuple[str, str]|None = Lock.global_actor_in_debug in_debug: tuple[str, str]|None = Lock.global_actor_in_debug
debug_complete: trio.Event|None = Lock.no_remote_has_tty debug_complete: trio.Event|None = Lock.no_remote_has_tty
if sub_in_debug := Lock.global_actor_in_debug: if in_debug == current_actor().uid:
log.debug(
msg
+
'Root already owns the TTY LOCK'
)
return True
elif in_debug:
msg += ( msg += (
f'Debug `Lock` in use by subactor: {sub_in_debug}\n' f'Debug `Lock` in use by subactor: {in_debug}\n'
) )
# TODO: could this make things more deterministic? # TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be # wait to see if a sub-actor task will be
@ -1065,27 +1167,26 @@ async def maybe_wait_for_debugger(
return False return False
for istep in range(poll_steps): for istep in range(poll_steps):
if ( if (
debug_complete debug_complete
and not debug_complete.is_set() and not debug_complete.is_set()
and sub_in_debug is not None and in_debug is not None
): ):
log.pdb( log.pdb(
msg msg
+ +
'Root is waiting on tty lock to release..\n' 'Root is waiting on tty lock to release..\n'
) )
with trio.CancelScope(shield=True):
await debug_complete.wait() await debug_complete.wait()
log.pdb( log.pdb(
f'Child subactor released debug lock:' f'Child subactor released debug lock\n'
f'|_{sub_in_debug}\n' f'|_{in_debug}\n'
) )
# is no subactor locking debugger currently? # is no subactor locking debugger currently?
if ( if (
sub_in_debug is None in_debug is None
and ( and (
debug_complete is None debug_complete is None
or debug_complete.is_set() or debug_complete.is_set()