Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 10adf34be5 Set any `._eoc` to the err in `_raise_from_no_key_in_msg()`
Since that's what we're now doing in `MsgStream._eoc` internal
assignments (coming in future patch), do the same in this exception
re-raise-helper and include more extensive doc string detailing all
the msg-type-to-raised-error cases. Also expose a `hide_tb: bool` like
we have already in `unpack_error()`.
2024-02-21 13:17:37 -05:00
Tyler Goodlet 82dcaff8db Better logging for cancel requests in IPC msg loop
As similarly improved in other parts of the runtime, adds much more
pedantic (`.cancel()`) logging content to indicate the src of remote
cancellation request particularly for `Actor.cancel()` and
`._cancel_task()` cases prior to `._invoke()` task scheduling. Also add
detailed case comments and much more info to the
"request-to-cancel-already-terminated-RPC-task" log emission to include
the `Channel` and `Context.cid` deats.

This helped me find the src of a race condition causing a test to fail
where a callee ctx task was returning a result *before* an expected
`ctx.cancel()` request arrived B). Adding much more pedantic
`.cancel()` msg contents around the requester's deats should ensure
these cases are much easier to detect going forward!

Also, simplify the `._invoke()` final result/error log msg to only put
*one of either* the final error or returned result above the `Context`
pprint.
2024-02-21 13:05:22 -05:00
Tyler Goodlet 621b252b0c Use `NamespacePath` in `Context` mgmt internals
The only case where we can't is in `Portal.run_from_ns()` usage (since we
pass a path with `self:<Actor.meth>`) and because `.to_tuple()`
internally uses `.load_ref()` which will of course fail on such a path..

So or now impl as,
- mk `Actor.start_remote_task()` take a `nsf: NamespacePath` but also
  offer a `load_nsf: bool = False` such that by default we bypass ref
  loading (maybe this is fine for perf long run as well?) for the
  `Actor`/'self:'` case mentioned above.
- mk `.get_context()` take an instance `nsf` obvi.

More logging msg format tweaks:
- change msg-flow related content to show the `Context._nsf`, which,
  right, is coming follow up commit..
- bunch more `.runtime()` format updates to show `msg: dict` contents
  and internal primitives with trailing `'\n'` for easier reading.
- report import loading `stackscope` in subactors.
2024-02-20 16:15:48 -05:00
Tyler Goodlet 20a089c331 Drop extra "
" when logging actor nursery errors
2024-02-20 15:58:11 -05:00
Tyler Goodlet df50d78042 Fix `.devx.maybe_wait_for_debugger()` polling deats
When entered by the root actor avoid excessive polling cycles by,
- blocking on the `Lock.no_remote_has_tty: trio.Event` and breaking
  *immediately* when set (though we should really also lock
  it from the root right?) to avoid extra loops..
- shielding the `await trio.sleep(poll_delay)` call to avoid any local
  cancellation causing the (presumably root-actor task) caller to move
  on (possibly to cancel its children) and instead to continue
  poll-blocking until the lock is actually released by its user.
- `break` the poll loop immediately if no remote locker is detected.
- use `.pdb()` level for reporting lock state changes.

Also add a #TODO to handle calls by non-root actors as it pertains to
2024-02-20 15:57:31 -05:00
Tyler Goodlet 114ec36436 Add `stackscope` as dep, drop legacy `pdb` issue cruft 2024-02-20 15:29:31 -05:00
Tyler Goodlet 179d7d2b04 Add `NamespacePath._ns` todo for `self:<ns.meth>` support 2024-02-20 15:28:11 -05:00
Tyler Goodlet f568fca98f Emit warning on any `ContextCancelled.canceller == None` 2024-02-20 15:26:14 -05:00
Tyler Goodlet 6c9bc627d8 Make ctx tests support `debug_mode: bool` fixture
Such that with `--tpdb` passed (sub)actors will engage the `pdbp` REPL
automatically and so that we can use the new `stackscope` support when
complex cases hang Bo

Also,
- simplified some type-annots (ns paths),
- doc-ed an inter-peer test func with some ascii msg flows,
- added a bottom #TODO for replicating the scenario i hit in `modden`
  where a separate client actor-tree was hanging on cancelling a `bigd`
  sub-workspace..
2024-02-20 15:14:58 -05:00
Tyler Goodlet 1d7cf7d1dd Enable `stackscope` render via root in debug mode
If `stackscope` is importable and debug_mode is enabled then we by
default call and report `.devx.enable_stack_on_sig()` is set B)

This makes debugging unexpected (SIGINT ignoring) hangs a cinch!
2024-02-20 13:23:16 -05:00
9 changed files with 529 additions and 225 deletions

View File

@ -26,7 +26,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup(
name="tractor",
version='0.1.0a6dev0', # alpha zone
description='structured concurrrent `trio`-"actors"',
description='structured concurrent `trio`-"actors"',
long_description=readme,
license='AGPLv3',
author='Tyler Goodlet',
@ -50,6 +50,7 @@ setup(
'exceptiongroup',
# tooling
'stackscope',
'tricycle',
'trio_typing',
'colorlog',
@ -61,16 +62,15 @@ setup(
# debug mode REPL
'pdbp',
# TODO: distributed transport using
# linux kernel networking
# 'pyroute2',
# pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
# and pep:
# https://peps.python.org/pep-0440/#version-specifiers
# windows deps workaround for ``pdbpp``
# https://github.com/pdbpp/pdbpp/issues/498
# https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"',
],
tests_require=['pytest'],
python_requires=">=3.10",

View File

@ -8,7 +8,9 @@ sync-opening a ``tractor.Context`` beforehand.
# from contextlib import asynccontextmanager as acm
from itertools import count
import platform
from typing import Optional
from typing import (
Callable,
)
import pytest
import trio
@ -69,7 +71,7 @@ _state: bool = False
@tractor.context
async def too_many_starteds(
ctx: tractor.Context,
ctx: Context,
) -> None:
'''
Call ``Context.started()`` more then once (an error).
@ -84,7 +86,7 @@ async def too_many_starteds(
@tractor.context
async def not_started_but_stream_opened(
ctx: tractor.Context,
ctx: Context,
) -> None:
'''
Enter ``Context.open_stream()`` without calling ``.started()``.
@ -105,11 +107,15 @@ async def not_started_but_stream_opened(
],
ids='misuse_type={}'.format,
)
def test_started_misuse(target):
def test_started_misuse(
target: Callable,
debug_mode: bool,
):
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
target.__name__,
enable_modules=[__name__],
)
@ -124,7 +130,7 @@ def test_started_misuse(target):
@tractor.context
async def simple_setup_teardown(
ctx: tractor.Context,
ctx: Context,
data: int,
block_forever: bool = False,
@ -170,6 +176,7 @@ def test_simple_context(
error_parent,
callee_blocks_forever,
pointlessly_open_stream,
debug_mode: bool,
):
timeout = 1.5 if not platform.system() == 'Windows' else 4
@ -177,9 +184,10 @@ def test_simple_context(
async def main():
with trio.fail_after(timeout):
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'simple_context',
enable_modules=[__name__],
)
@ -260,6 +268,7 @@ def test_caller_cancels(
cancel_method: str,
chk_ctx_result_before_exit: bool,
callee_returns_early: bool,
debug_mode: bool,
):
'''
Verify that when the opening side of a context (aka the caller)
@ -268,7 +277,7 @@ def test_caller_cancels(
'''
async def check_canceller(
ctx: tractor.Context,
ctx: Context,
) -> None:
# should not raise yet return the remote
# context cancelled error.
@ -287,8 +296,10 @@ def test_caller_cancels(
)
async def main():
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'simple_context',
enable_modules=[__name__],
)
@ -338,7 +349,7 @@ def test_caller_cancels(
@tractor.context
async def close_ctx_immediately(
ctx: tractor.Context,
ctx: Context,
) -> None:
@ -350,17 +361,33 @@ async def close_ctx_immediately(
@tractor_test
async def test_callee_closes_ctx_after_stream_open():
'callee context closes without using stream'
async def test_callee_closes_ctx_after_stream_open(
debug_mode: bool,
):
'''
callee context closes without using stream.
async with tractor.open_nursery() as n:
This should result in a msg sequence
|_<root>_
|_<fast_stream_closer>
portal = await n.start_actor(
<= {'started': <Any>, 'cid': <str>}
<= {'stop': True, 'cid': <str>}
<= {'result': Any, ..}
(ignored by child)
=> {'stop': True, 'cid': <str>}
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'fast_stream_closer',
enable_modules=[__name__],
)
with trio.fail_after(2):
with trio.fail_after(0.5):
async with portal.open_context(
close_ctx_immediately,
@ -368,10 +395,9 @@ async def test_callee_closes_ctx_after_stream_open():
# cancel_on_exit=True,
) as (ctx, sent):
assert sent is None
with trio.fail_after(0.5):
with trio.fail_after(0.4):
async with ctx.open_stream() as stream:
# should fall through since ``StopAsyncIteration``
@ -379,12 +405,15 @@ async def test_callee_closes_ctx_after_stream_open():
# a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream:
# trigger failure if we DO NOT
# get an EOC!
assert 0
else:
# verify stream is now closed
try:
await stream.receive()
with trio.fail_after(0.3):
await stream.receive()
except trio.EndOfChannel:
pass
@ -405,7 +434,7 @@ async def test_callee_closes_ctx_after_stream_open():
@tractor.context
async def expect_cancelled(
ctx: tractor.Context,
ctx: Context,
) -> None:
global _state
@ -434,11 +463,15 @@ async def expect_cancelled(
@tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool,
debug_mode: bool,
):
'caller context closes without using stream'
async with tractor.open_nursery() as an:
'''
caller context closes without using/opening stream
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
root: Actor = current_actor()
portal = await an.start_actor(
@ -522,11 +555,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
@tractor_test
async def test_multitask_caller_cancels_from_nonroot_task():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
async def test_multitask_caller_cancels_from_nonroot_task(
debug_mode: bool,
):
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
@ -573,7 +608,7 @@ async def test_multitask_caller_cancels_from_nonroot_task():
@tractor.context
async def cancel_self(
ctx: tractor.Context,
ctx: Context,
) -> None:
global _state
@ -610,16 +645,20 @@ async def cancel_self(
raise RuntimeError('Context didnt cancel itself?!')
@tractor_test
async def test_callee_cancels_before_started():
async def test_callee_cancels_before_started(
debug_mode: bool,
):
'''
Callee calls `Context.cancel()` while streaming and caller
sees stream terminated in `ContextCancelled`.
'''
async with tractor.open_nursery() as n:
portal = await n.start_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'cancels_self',
enable_modules=[__name__],
)
@ -645,7 +684,7 @@ async def test_callee_cancels_before_started():
@tractor.context
async def never_open_stream(
ctx: tractor.Context,
ctx: Context,
) -> None:
'''
@ -659,8 +698,8 @@ async def never_open_stream(
@tractor.context
async def keep_sending_from_callee(
ctx: tractor.Context,
msg_buffer_size: Optional[int] = None,
ctx: Context,
msg_buffer_size: int|None = None,
) -> None:
'''
@ -685,7 +724,10 @@ async def keep_sending_from_callee(
],
ids='overrun_condition={}'.format,
)
def test_one_end_stream_not_opened(overrun_by):
def test_one_end_stream_not_opened(
overrun_by: tuple[str, int, Callable],
debug_mode: bool,
):
'''
This should exemplify the bug from:
https://github.com/goodboy/tractor/issues/265
@ -696,8 +738,10 @@ def test_one_end_stream_not_opened(overrun_by):
buf_size = buf_size_increase + Actor.msg_buffer_size
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
entrypoint.__name__,
enable_modules=[__name__],
)
@ -754,7 +798,7 @@ def test_one_end_stream_not_opened(overrun_by):
@tractor.context
async def echo_back_sequence(
ctx: tractor.Context,
ctx: Context,
seq: list[int],
wait_for_cancel: bool,
allow_overruns_side: str,
@ -837,6 +881,7 @@ def test_maybe_allow_overruns_stream(
slow_side: str,
allow_overruns_side: str,
loglevel: str,
debug_mode: bool,
):
'''
Demonstrate small overruns of each task back and forth
@ -855,13 +900,14 @@ def test_maybe_allow_overruns_stream(
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'callee_sends_forever',
enable_modules=[__name__],
loglevel=loglevel,
# debug_mode=True,
debug_mode=debug_mode,
)
seq = list(range(10))
async with portal.open_context(

View File

@ -123,7 +123,9 @@ async def error_before_started(
await peer_ctx.cancel()
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
debug_mode: bool,
):
'''
Verify that an error raised in a remote context which itself
opens YET ANOTHER remote context, which it then cancels, does not
@ -132,7 +134,9 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
'''
async def main():
async with tractor.open_nursery() as n:
async with tractor.open_nursery(
debug_mode=debug_mode,
) as n:
portal = await n.start_actor(
'errorer',
enable_modules=[__name__],
@ -225,13 +229,16 @@ async def stream_from_peer(
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxerr:
err = ctxerr
except ContextCancelled as ctxc:
ctxerr = ctxc
assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
# XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task
# was started in, exits.
@ -269,9 +276,7 @@ async def stream_from_peer(
# assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper'
raise RuntimeError(
'peer never triggered local `ContextCancelled`?'
)
raise RuntimeError('Never triggered local `ContextCancelled` ?!?')
@pytest.mark.parametrize(
@ -280,6 +285,7 @@ async def stream_from_peer(
)
def test_peer_canceller(
error_during_ctxerr_handling: bool,
debug_mode: bool,
):
'''
Verify that a cancellation triggered by an in-actor-tree peer
@ -336,7 +342,7 @@ def test_peer_canceller(
async def main():
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
# debug_mode=True
debug_mode=debug_mode,
) as an:
canceller: Portal = await an.start_actor(
'canceller',
@ -377,7 +383,8 @@ def test_peer_canceller(
try:
print('PRE CONTEXT RESULT')
await sleeper_ctx.result()
res = await sleeper_ctx.result()
assert res
# should never get here
pytest.fail(
@ -387,7 +394,10 @@ def test_peer_canceller(
# should always raise since this root task does
# not request the sleeper cancellation ;)
except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
print(
'CAUGHT REMOTE CONTEXT CANCEL FOM\n'
f'{ctxerr}'
)
# canceller and caller peers should not
# have been remotely cancelled.
@ -410,16 +420,31 @@ def test_peer_canceller(
# XXX SHOULD NEVER EVER GET HERE XXX
except BaseException as berr:
err = berr
pytest.fail('did not rx ctx-cancelled error?')
raise
# XXX if needed to debug failure
# _err = berr
# await tractor.pause()
# await trio.sleep_forever()
pytest.fail(
'did not rx ctxc ?!?\n\n'
f'{berr}\n'
)
else:
pytest.fail('did not rx ctx-cancelled error?')
pytest.fail(
'did not rx ctxc ?!?\n\n'
f'{ctxs}\n'
)
except (
ContextCancelled,
RuntimeError,
)as ctxerr:
_err = ctxerr
)as loc_err:
_loc_err = loc_err
# NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs)
@ -436,7 +461,7 @@ def test_peer_canceller(
# `ContextCancelled` inside `.open_context()`
# block
if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError)
assert isinstance(loc_err, RuntimeError)
# NOTE: this root actor task should have
# called `Context.cancel()` on the
@ -472,9 +497,10 @@ def test_peer_canceller(
# CASE: standard teardown inside in `.open_context()` block
else:
assert ctxerr.canceller == sleeper_ctx.canceller
assert isinstance(loc_err, ContextCancelled)
assert loc_err.canceller == sleeper_ctx.canceller
assert (
ctxerr.canceller[0]
loc_err.canceller[0]
==
sleeper_ctx.canceller[0]
==
@ -484,7 +510,7 @@ def test_peer_canceller(
# the sleeper's remote error is the error bubbled
# out of the context-stack above!
re = sleeper_ctx._remote_error
assert re is ctxerr
assert re is loc_err
for ctx in ctxs:
re: BaseException | None = ctx._remote_error
@ -554,3 +580,14 @@ def test_peer_canceller(
assert excinfo.value.type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller'
def test_client_tree_spawns_and_cancels_service_subactor():
...
# TODO: test for the modden `mod wks open piker` bug!
# -> start actor-tree (server) that offers sub-actor spawns via
# context API
# -> start another full actor-tree (client) which requests to the first to
# spawn over its `@context` ep / api.
# -> client actor cancels the context and should exit gracefully
# and the server's spawned child should cancel and terminate!

View File

@ -33,12 +33,15 @@ import exceptiongroup as eg
import trio
from ._state import current_actor
from .log import get_logger
if TYPE_CHECKING:
from ._context import Context
from ._stream import MsgStream
from .log import StackLevelAdapter
log = get_logger('tractor')
_this_mod = importlib.import_module(__name__)
@ -112,11 +115,36 @@ class ContextCancelled(RemoteActorError):
'''
@property
def canceller(self) -> tuple[str, str] | None:
def canceller(self) -> tuple[str, str]|None:
'''
Return the (maybe) `Actor.uid` for the requesting-author
of this ctxc.
Emit a warning msg when `.canceller` has not been set,
which usually idicates that a `None` msg-loop setinel was
sent before expected in the runtime. This can happen in
a few situations:
- (simulating) an IPC transport network outage
- a (malicious) pkt sent specifically to cancel an actor's
runtime non-gracefully without ensuring ongoing RPC tasks are
incrementally cancelled as is done with:
`Actor`
|_`.cancel()`
|_`.cancel_soon()`
|_`._cancel_task()`
'''
value = self.msgdata.get('canceller')
if value:
return tuple(value)
log.warning(
'IPC Context cancelled without a requesting actor?\n'
'Maybe the IPC transport ended abruptly?\n\n'
f'{self}'
)
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
@ -199,7 +227,6 @@ def pack_error(
):
error_msg.update(exc.msgdata)
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
@ -210,8 +237,10 @@ def pack_error(
def unpack_error(
msg: dict[str, Any],
chan=None,
err_type=RemoteActorError,
hide_tb: bool = True,
) -> None|Exception:
@ -287,37 +316,61 @@ def _raise_from_no_key_in_msg(
msg: dict,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
stream: MsgStream | None = None,
# allow "deeper" tbs when debugging B^o
hide_tb: bool = True,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
Raise an appopriate local error when a
`MsgStream` msg arrives which does not
contain the expected (at least under normal
operation) `'yield'` field.
`Context` and any embedded `MsgStream` termination,
as well as remote task errors are handled in order
of priority as:
- any 'error' msg is re-boxed and raised locally as
-> `RemoteActorError`|`ContextCancelled`
- a `MsgStream` 'stop' msg is constructed, assigned
and raised locally as -> `trio.EndOfChannel`
- All other mis-keyed msgss (like say a "final result"
'return' msg, normally delivered from `Context.result()`)
are re-boxed inside a `MessagingError` with an explicit
exc content describing the missing IPC-msg-key.
'''
__tracebackhide__: bool = True
__tracebackhide__: bool = hide_tb
# internal error should never get here
# an internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'cid: {cid}\n\n'
f'{pformat(msg)}\n'
) from src_err
# TODO: test that shows stream raising an expected error!!!
# raise the error message in a boxed exception type!
if msg.get('error'):
# raise the error message
raise unpack_error(
msg,
ctx.chan,
hide_tb=hide_tb,
) from None
# `MsgStream` termination msg.
elif (
msg.get('stop')
or (
@ -330,29 +383,26 @@ def _raise_from_no_key_in_msg(
f'cid: {cid}\n'
)
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
f'Context stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
eoc = trio.EndOfChannel(
f'Context stream ended due to msg:\n\n'
f'{pformat(msg)}\n'
)
# XXX: important to set so that a new `.receive()`
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the `return` message
# value out of the underlying feed mem chan which is
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
raise eoc from src_err
if (
stream

View File

@ -138,13 +138,19 @@ async def open_root_actor(
)
assert registry_addrs
loglevel = (loglevel or log._default_loglevel).upper()
loglevel = (
loglevel
or log._default_loglevel
).upper()
if debug_mode and _spawn._spawn_method == 'trio':
if (
debug_mode
and _spawn._spawn_method == 'trio'
):
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
# expose internal debug module to every actor allowing for
# use of ``await tractor.pause()``
enable_modules.append('tractor.devx._debug')
# if debug mode get's enabled *at least* use that level of
@ -163,7 +169,20 @@ async def open_root_actor(
"Debug mode is only supported for the `trio` backend!"
)
log.get_console_log(loglevel)
assert loglevel
_log = log.get_console_log(loglevel)
assert _log
# TODO: factor this into `.devx._stackscope`!!
if debug_mode:
try:
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
# closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = []

View File

@ -48,15 +48,12 @@ import trio
from trio import (
CancelScope,
)
from trio.lowlevel import (
current_task,
Task,
)
from trio_typing import (
Nursery,
TaskStatus,
)
from .msg import NamespacePath
from ._ipc import Channel
from ._context import (
mk_context,
@ -145,8 +142,9 @@ async def _invoke(
cs: CancelScope | None = None
ctx = actor.get_context(
chan,
cid,
chan=chan,
cid=cid,
nsf=NamespacePath.from_ref(func),
# We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to
# open the stream with this option.
@ -276,8 +274,8 @@ async def _invoke(
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
# res: Any = await coro
res = await coro
res: Any = await coro
ctx._result = res
# deliver final result to caller side.
await chan.send({
@ -314,11 +312,18 @@ async def _invoke(
# don't pop the local context until we know the
# associated child isn't in debug any more
await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((chan.uid, cid))
ctx: Context = actor._contexts.pop(
(chan.uid, cid)
)
res_str: str = (
'error: {ctx._local_error}'
if ctx._local_error
else f'result: {ctx._result}'
)
log.cancel(
f'Context task was terminated:\n'
f'func: {func}\n'
f'ctx: {pformat(ctx)}'
f'IPC context terminated with final {res_str}\n'
f'|_{pformat(ctx)}\n'
)
if ctx.cancelled_caught:
@ -331,7 +336,6 @@ async def _invoke(
ctx._maybe_raise_remote_err(re)
# fname: str = func.__name__
task: Task = current_task()
cs: CancelScope = ctx._scope
if cs.cancel_called:
our_uid: tuple = actor.uid
@ -378,16 +382,16 @@ async def _invoke(
div_str +
f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n'
f' |_ task: `{task.name}()`'
f' |_{ctx._task}()\n'
)
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += (
'------ - ------\n'
'IPC msg:\n'
f'{ctx._cancel_msg}'
# '------ - ------\n'
# 'IPC msg:\n'
f'\n{ctx._cancel_msg}'
)
# task-contex was either cancelled by request using
@ -435,7 +439,12 @@ async def _invoke(
task_status.started(ctx)
result = await coro
fname: str = func.__name__
log.runtime(f'{fname}() result: {result}')
log.runtime(
'RPC complete:\n'
f'task: {ctx._task}\n'
f'|_cid={ctx.cid}\n'
f'|_{fname}() -> {pformat(result)}\n'
)
# NOTE: only send result if we know IPC isn't down
if (
@ -965,7 +974,7 @@ class Actor:
# and bail after timeout (2-generals on closure).
assert chan.msgstream
log.runtime(
log.warning(
f'Draining lingering msgs from stream {chan.msgstream}'
)
@ -977,13 +986,24 @@ class Actor:
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.runtime(f'drained {msg} for {chan.uid}')
log.warning(
'Draining msg from disconnected\n'
f'peer: {chan.uid}]\n\n'
f'{pformat(msg)}\n'
)
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter
await self._push_result(chan, cid, msg)
await self._push_result(
chan,
cid,
msg,
)
log.runtime('Waiting on actor nursery to exit..')
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
await local_nursery.exited.wait()
if disconnected:
@ -1167,6 +1187,7 @@ class Actor:
self,
chan: Channel,
cid: str,
nsf: NamespacePath,
msg_buffer_size: int | None = None,
allow_overruns: bool = False,
@ -1180,11 +1201,15 @@ class Actor:
task-as-function invocation.
'''
log.runtime(f"Getting result queue for {chan.uid} cid {cid}")
actor_uid = chan.uid
assert actor_uid
try:
ctx = self._contexts[(actor_uid, cid)]
log.runtime(
f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n'
f'cid:{cid}\n'
)
ctx._allow_overruns = allow_overruns
# adjust buffer size if specified
@ -1193,9 +1218,15 @@ class Actor:
state.max_buffer_size = msg_buffer_size
except KeyError:
log.runtime(
f'Creating NEW IPC ctx for\n'
f'peer: {chan.uid}\n'
f'cid: {cid}\n'
)
ctx = mk_context(
chan,
cid,
nsf=nsf,
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
_allow_overruns=allow_overruns,
)
@ -1206,11 +1237,13 @@ class Actor:
async def start_remote_task(
self,
chan: Channel,
ns: str,
func: str,
nsf: NamespacePath,
kwargs: dict,
# IPC channel config
msg_buffer_size: int | None = None,
allow_overruns: bool = False,
load_nsf: bool = False,
) -> Context:
'''
@ -1225,20 +1258,43 @@ class Actor:
cid = str(uuid.uuid4())
assert chan.uid
ctx = self.get_context(
chan,
cid,
chan=chan,
cid=cid,
nsf=nsf,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
if (
'self' in nsf
or not load_nsf
):
ns, _, func = nsf.partition(':')
else:
# TODO: pass nsf directly over wire!
# -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple()
log.runtime(
'Sending cmd to\n'
f'peer: {chan.uid} => \n'
'\n'
f'=> {ns}.{func}({kwargs})\n'
)
await chan.send(
{'cmd': (ns, func, kwargs, self.uid, cid)}
{'cmd': (
ns,
func,
kwargs,
self.uid,
cid,
)}
)
# Wait on first response msg and validate; this should be
# immediate.
first_msg = await ctx._recv_chan.receive()
functype = first_msg.get('functype')
first_msg: dict = await ctx._recv_chan.receive()
functype: str = first_msg.get('functype')
if 'error' in first_msg:
raise unpack_error(first_msg, chan)
@ -1280,14 +1336,19 @@ class Actor:
parent_data: dict[str, Any]
parent_data = await chan.recv()
log.runtime(
"Received state from parent:\n"
f"{parent_data}"
'Received state from parent:\n\n'
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pformat(parent_data)}\n'
)
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
try:
log.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
@ -1368,7 +1429,8 @@ class Actor:
for listener in listeners
]
log.runtime(
f'Started tcp server(s) on {sockets}'
'Started TCP server(s)\n'
f'|_{sockets}\n'
)
self._listeners.extend(listeners)
@ -1480,8 +1542,20 @@ class Actor:
# be cancelled was indeed spawned by a request from this channel
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope: CancelScope = ctx._scope
except KeyError:
log.cancel(f"{cid} has already completed/terminated?")
# NOTE: during msging race conditions this will often
# emit, some examples:
# - callee returns a result before cancel-msg/ctxc-raised
# - callee self raises ctxc before caller send request,
# - callee errors prior to cancel req.
log.cancel(
'Cancel request invalid, RPC task already completed?\n'
f'<= canceller: {requesting_uid}\n'
f' |_{chan}\n\n'
f'=> ctx id: {cid}\n'
)
return True
log.cancel(
@ -1923,7 +1997,7 @@ async def process_messages(
log.runtime(
'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}'
f'|_{chan}\n'
)
nursery_cancelled_before_task: bool = False
msg: dict | None = None
@ -1960,8 +2034,10 @@ async def process_messages(
log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: conditionally avoid fmting depending
# on log level (for perf)?
# => specifically `pformat()` sub-call..?
f'{pformat(msg)}\n'
)
@ -1969,19 +2045,35 @@ async def process_messages(
if cid:
# deliver response to local caller/waiter
# via its per-remote-context memory channel.
await actor._push_result(chan, cid, msg)
await actor._push_result(
chan,
cid,
msg,
)
log.runtime(
f'Waiting on next IPC msg from {chan.uid}:\n'
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}:\n'
f'|_{chan}\n'
# f'last msg: {msg}\n'
f'|_{chan}'
)
continue
# TODO: implement with ``match:`` syntax?
# process command request
# process a 'cmd' request-msg upack
# TODO: impl with native `msgspec.Struct` support !!
# -[ ] implement with ``match:`` syntax?
# -[ ] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
try:
ns, funcname, kwargs, actorid, cid = msg['cmd']
(
ns,
funcname,
kwargs,
actorid,
cid,
) = msg['cmd']
except KeyError:
# This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()``
@ -1994,29 +2086,33 @@ async def process_messages(
raise exc
log.runtime(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
'Handling RPC cmd from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
if ns == 'self':
uid: tuple = chan.uid
if funcname == 'cancel':
func: Callable = actor.cancel
kwargs['requesting_uid'] = chan.uid
kwargs['requesting_uid'] = uid
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``async_main()``
log.cancel(
"Actor runtime for was remotely cancelled "
f"by {chan.uid}"
f'Cancel request for `Actor` runtime\n'
f'<= canceller: {uid}\n'
# f'=> uid: {actor.uid}\n'
)
await _invoke(
actor,
@ -2043,9 +2139,10 @@ async def process_messages(
target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid
log.cancel(
f'Remote request to cancel task\n'
f'remote actor: {chan.uid}\n'
f'task: {target_cid}'
f'Rx task cancel request\n'
f'<= canceller: {chan.uid}\n'
f'=> uid: {actor.uid}\n'
f' |_cid: {target_cid}\n'
)
try:
await _invoke(
@ -2105,17 +2202,18 @@ async def process_messages(
# in the lone case where a ``Context`` is not
# delivered, it's likely going to be a locally
# scoped exception from ``_invoke()`` itself.
if isinstance(ctx, Exception):
if isinstance(err := ctx, Exception):
log.warning(
f"Task for RPC func {func} failed with"
f"{ctx}"
'Task for RPC failed?'
f'|_ {func}()\n\n'
f'{err}'
)
continue
else:
# mark that we have ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
@ -2126,7 +2224,10 @@ async def process_messages(
)
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
# end of async for, channel disconnect vis
# ``trio.EndOfChannel``
@ -2143,9 +2244,12 @@ async def process_messages(
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
# TODO: don't show this msg if it's an emphemeral
# discovery ep call?
log.runtime(
f'channel from {chan.uid} closed abruptly:\n'
f'-> {chan.raddr}\n'
f'channel closed abruptly with\n'
f'peer: {chan.uid}\n'
f'|_{chan.raddr}\n'
)
# transport **was** disconnected
@ -2187,9 +2291,11 @@ async def process_messages(
finally:
# msg debugging for when he machinery is brokey
log.runtime(
f'Exiting IPC msg loop with {chan.uid} '
f'final msg: {msg}\n'
f'|_{chan}'
'Exiting IPC msg loop with\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n\n'
'final msg:\n'
f'{pformat(msg)}\n'
)
# transport **was not** disconnected

View File

@ -400,7 +400,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
else:
log.exception(
f"Nursery for {current_actor().uid} "
"errored with\n"
"errored with:"
# TODO: same thing as in
# `._invoke()` to compute how to

View File

@ -1,18 +1,19 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
"""
Multi-core debugging for da peeps!
@ -43,6 +44,7 @@ from types import FrameType
import pdbp
import tractor
import trio
from trio.lowlevel import current_task
from trio_typing import (
TaskStatus,
# Task,
@ -50,6 +52,7 @@ from trio_typing import (
from ..log import get_logger
from .._state import (
current_actor,
is_root_process,
debug_mode,
)
@ -238,7 +241,7 @@ async def _acquire_debug_lock_from_root_task(
to the ``pdb`` repl.
'''
task_name: str = trio.lowlevel.current_task().name
task_name: str = current_task().name
we_acquired: bool = False
log.runtime(
@ -323,8 +326,7 @@ async def lock_tty_for_child(
highly reliable at releasing the mutex complete!
'''
task_name = trio.lowlevel.current_task().name
task_name: str = current_task().name
if tuple(subactor_uid) in Lock._blocked:
log.warning(
f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
@ -407,11 +409,13 @@ async def wait_for_parent_stdin_hijack(
assert val == 'Locked'
async with ctx.open_stream() as stream:
# unblock local caller
try:
# unblock local caller
assert Lock.local_pdb_complete
task_status.started(cs)
# wait for local task to exit and
# release the REPL
await Lock.local_pdb_complete.wait()
finally:
@ -468,7 +472,7 @@ def shield_sigint_handler(
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
actor = tractor.current_actor()
actor = current_actor()
# print(f'{actor.uid} in HANDLER with ')
def do_cancel():
@ -613,7 +617,7 @@ def _set_trace(
shield: bool = False,
):
__tracebackhide__: bool = True
actor: tractor.Actor = actor or tractor.current_actor()
actor: tractor.Actor = actor or current_actor()
# start 2 levels up in user code
frame: FrameType | None = sys._getframe()
@ -683,9 +687,9 @@ async def pause(
'''
# __tracebackhide__ = True
actor = tractor.current_actor()
actor = current_actor()
pdb, undo_sigint = mk_mpdb()
task_name = trio.lowlevel.current_task().name
task_name: str = trio.lowlevel.current_task().name
if (
not Lock.local_pdb_complete
@ -836,7 +840,7 @@ async def pause(
# runtime aware version which takes care of all .
def pause_from_sync() -> None:
print("ENTER SYNC PAUSE")
actor: tractor.Actor = tractor.current_actor(
actor: tractor.Actor = current_actor(
err_on_no_runtime=False,
)
if actor:
@ -971,9 +975,10 @@ async def acquire_debug_lock(
'''
Grab root's debug lock on entry, release on exit.
This helper is for actor's who don't actually need
to acquired the debugger but want to wait until the
lock is free in the process-tree root.
This helper is for actor's who don't actually need to acquired
the debugger but want to wait until the lock is free in the
process-tree root such that they don't clobber an ongoing pdb
REPL session in some peer or child!
'''
if not debug_mode():
@ -1013,43 +1018,71 @@ async def maybe_wait_for_debugger(
# tearing down.
sub_in_debug: tuple[str, str] | None = None
for _ in range(poll_steps):
for istep in range(poll_steps):
if Lock.global_actor_in_debug:
sub_in_debug = tuple(Lock.global_actor_in_debug)
log.debug('Root polling for debug')
with trio.CancelScope(shield=True):
await trio.sleep(poll_delay)
# TODO: could this make things more deterministic? wait
# to see if a sub-actor task will be scheduled and grab
# the tty lock on the next tick?
# XXX: doesn't seem to work
if sub_in_debug := Lock.global_actor_in_debug:
log.pdb(
f'Lock in use by {sub_in_debug}'
)
# TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be
# scheduled and grab the tty lock on the next
# tick?
# XXX => but it doesn't seem to work..
# await trio.testing.wait_all_tasks_blocked(cushion=0)
debug_complete = Lock.no_remote_has_tty
if (
debug_complete
and sub_in_debug is not None
and not debug_complete.is_set()
):
log.pdb(
'Root has errored but pdb is in use by '
f'child {sub_in_debug}\n'
'Waiting on tty lock to release..'
)
debug_complete: trio.Event|None = Lock.no_remote_has_tty
if (
debug_complete
and not debug_complete.is_set()
and sub_in_debug is not None
):
log.pdb(
'Root has errored but pdb is in use by child\n'
'Waiting on tty lock to release..\n'
f'uid: {sub_in_debug}\n'
)
await debug_complete.wait()
log.pdb(
f'Child subactor released debug lock!\n'
f'uid: {sub_in_debug}\n'
)
if debug_complete.is_set():
break
await debug_complete.wait()
# is no subactor locking debugger currently?
elif (
debug_complete is None
or sub_in_debug is None
):
log.pdb(
'Root acquired debug TTY LOCK from child\n'
f'uid: {sub_in_debug}'
)
break
await trio.sleep(poll_delay)
continue
else:
# TODO: don't need this right?
# await trio.lowlevel.checkpoint()
log.debug(
'Root polling for debug:\n'
f'poll step: {istep}\n'
f'poll delya: {poll_delay}'
)
with trio.CancelScope(shield=True):
await trio.sleep(poll_delay)
continue
else:
log.debug(
'Root acquired TTY LOCK'
)
log.pdb('Root acquired debug TTY LOCK')
# else:
# # TODO: non-root call for #320?
# this_uid: tuple[str, str] = current_actor().uid
# async with acquire_debug_lock(
# subactor_uid=this_uid,
# ):
# pass
# TODO: better naming and what additionals?
# - [ ] optional runtime plugging?

View File

@ -58,6 +58,11 @@ class NamespacePath(str):
'''
_ref: object | type | None = None
# TODO: support providing the ns instance in
# order to support 'self.<meth>` style to make
# `Portal.run_from_ns()` work!
# _ns: ModuleType|type|None = None
def load_ref(self) -> object | type:
if self._ref is None:
self._ref = resolve_name(self)
@ -100,5 +105,13 @@ class NamespacePath(str):
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
return cls(':'.join(fqnp))
def to_tuple(self) -> tuple[str, str]:
return self._mk_fqnp(self.load_ref())
def to_tuple(
self,
# TODO: could this work re `self:<meth>` case from above?
# load_ref: bool = True,
) -> tuple[str, str]:
return self._mk_fqnp(
self.load_ref()
)