Compare commits
10 Commits
54a0a0000d
...
10adf34be5
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 10adf34be5 | |
Tyler Goodlet | 82dcaff8db | |
Tyler Goodlet | 621b252b0c | |
Tyler Goodlet | 20a089c331 | |
Tyler Goodlet | df50d78042 | |
Tyler Goodlet | 114ec36436 | |
Tyler Goodlet | 179d7d2b04 | |
Tyler Goodlet | f568fca98f | |
Tyler Goodlet | 6c9bc627d8 | |
Tyler Goodlet | 1d7cf7d1dd |
12
setup.py
12
setup.py
|
@ -26,7 +26,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
|
|||
setup(
|
||||
name="tractor",
|
||||
version='0.1.0a6dev0', # alpha zone
|
||||
description='structured concurrrent `trio`-"actors"',
|
||||
description='structured concurrent `trio`-"actors"',
|
||||
long_description=readme,
|
||||
license='AGPLv3',
|
||||
author='Tyler Goodlet',
|
||||
|
@ -50,6 +50,7 @@ setup(
|
|||
'exceptiongroup',
|
||||
|
||||
# tooling
|
||||
'stackscope',
|
||||
'tricycle',
|
||||
'trio_typing',
|
||||
'colorlog',
|
||||
|
@ -61,16 +62,15 @@ setup(
|
|||
# debug mode REPL
|
||||
'pdbp',
|
||||
|
||||
# TODO: distributed transport using
|
||||
# linux kernel networking
|
||||
# 'pyroute2',
|
||||
|
||||
# pip ref docs on these specs:
|
||||
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
|
||||
# and pep:
|
||||
# https://peps.python.org/pep-0440/#version-specifiers
|
||||
|
||||
# windows deps workaround for ``pdbpp``
|
||||
# https://github.com/pdbpp/pdbpp/issues/498
|
||||
# https://github.com/pdbpp/fancycompleter/issues/37
|
||||
'pyreadline3 ; platform_system == "Windows"',
|
||||
|
||||
],
|
||||
tests_require=['pytest'],
|
||||
python_requires=">=3.10",
|
||||
|
|
|
@ -8,7 +8,9 @@ sync-opening a ``tractor.Context`` beforehand.
|
|||
# from contextlib import asynccontextmanager as acm
|
||||
from itertools import count
|
||||
import platform
|
||||
from typing import Optional
|
||||
from typing import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
@ -69,7 +71,7 @@ _state: bool = False
|
|||
|
||||
@tractor.context
|
||||
async def too_many_starteds(
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
'''
|
||||
Call ``Context.started()`` more then once (an error).
|
||||
|
@ -84,7 +86,7 @@ async def too_many_starteds(
|
|||
|
||||
@tractor.context
|
||||
async def not_started_but_stream_opened(
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
'''
|
||||
Enter ``Context.open_stream()`` without calling ``.started()``.
|
||||
|
@ -105,11 +107,15 @@ async def not_started_but_stream_opened(
|
|||
],
|
||||
ids='misuse_type={}'.format,
|
||||
)
|
||||
def test_started_misuse(target):
|
||||
|
||||
def test_started_misuse(
|
||||
target: Callable,
|
||||
debug_mode: bool,
|
||||
):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
target.__name__,
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -124,7 +130,7 @@ def test_started_misuse(target):
|
|||
@tractor.context
|
||||
async def simple_setup_teardown(
|
||||
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
data: int,
|
||||
block_forever: bool = False,
|
||||
|
||||
|
@ -170,6 +176,7 @@ def test_simple_context(
|
|||
error_parent,
|
||||
callee_blocks_forever,
|
||||
pointlessly_open_stream,
|
||||
debug_mode: bool,
|
||||
):
|
||||
|
||||
timeout = 1.5 if not platform.system() == 'Windows' else 4
|
||||
|
@ -177,9 +184,10 @@ def test_simple_context(
|
|||
async def main():
|
||||
|
||||
with trio.fail_after(timeout):
|
||||
async with tractor.open_nursery() as nursery:
|
||||
|
||||
portal = await nursery.start_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
'simple_context',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -260,6 +268,7 @@ def test_caller_cancels(
|
|||
cancel_method: str,
|
||||
chk_ctx_result_before_exit: bool,
|
||||
callee_returns_early: bool,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify that when the opening side of a context (aka the caller)
|
||||
|
@ -268,7 +277,7 @@ def test_caller_cancels(
|
|||
|
||||
'''
|
||||
async def check_canceller(
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
# should not raise yet return the remote
|
||||
# context cancelled error.
|
||||
|
@ -287,8 +296,10 @@ def test_caller_cancels(
|
|||
)
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as nursery:
|
||||
portal = await nursery.start_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
'simple_context',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -338,7 +349,7 @@ def test_caller_cancels(
|
|||
@tractor.context
|
||||
async def close_ctx_immediately(
|
||||
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
|
||||
) -> None:
|
||||
|
||||
|
@ -350,17 +361,33 @@ async def close_ctx_immediately(
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_callee_closes_ctx_after_stream_open():
|
||||
'callee context closes without using stream'
|
||||
async def test_callee_closes_ctx_after_stream_open(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
callee context closes without using stream.
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
This should result in a msg sequence
|
||||
|_<root>_
|
||||
|_<fast_stream_closer>
|
||||
|
||||
portal = await n.start_actor(
|
||||
<= {'started': <Any>, 'cid': <str>}
|
||||
<= {'stop': True, 'cid': <str>}
|
||||
<= {'result': Any, ..}
|
||||
|
||||
(ignored by child)
|
||||
=> {'stop': True, 'cid': <str>}
|
||||
|
||||
'''
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
'fast_stream_closer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
with trio.fail_after(2):
|
||||
with trio.fail_after(0.5):
|
||||
async with portal.open_context(
|
||||
close_ctx_immediately,
|
||||
|
||||
|
@ -368,10 +395,9 @@ async def test_callee_closes_ctx_after_stream_open():
|
|||
# cancel_on_exit=True,
|
||||
|
||||
) as (ctx, sent):
|
||||
|
||||
assert sent is None
|
||||
|
||||
with trio.fail_after(0.5):
|
||||
with trio.fail_after(0.4):
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
# should fall through since ``StopAsyncIteration``
|
||||
|
@ -379,12 +405,15 @@ async def test_callee_closes_ctx_after_stream_open():
|
|||
# a ``trio.EndOfChannel`` by
|
||||
# ``trio.abc.ReceiveChannel.__anext__()``
|
||||
async for _ in stream:
|
||||
# trigger failure if we DO NOT
|
||||
# get an EOC!
|
||||
assert 0
|
||||
else:
|
||||
|
||||
# verify stream is now closed
|
||||
try:
|
||||
await stream.receive()
|
||||
with trio.fail_after(0.3):
|
||||
await stream.receive()
|
||||
except trio.EndOfChannel:
|
||||
pass
|
||||
|
||||
|
@ -405,7 +434,7 @@ async def test_callee_closes_ctx_after_stream_open():
|
|||
@tractor.context
|
||||
async def expect_cancelled(
|
||||
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
|
||||
) -> None:
|
||||
global _state
|
||||
|
@ -434,11 +463,15 @@ async def expect_cancelled(
|
|||
@tractor_test
|
||||
async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||
use_ctx_cancel_method: bool,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'caller context closes without using stream'
|
||||
|
||||
async with tractor.open_nursery() as an:
|
||||
'''
|
||||
caller context closes without using/opening stream
|
||||
|
||||
'''
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
root: Actor = current_actor()
|
||||
|
||||
portal = await an.start_actor(
|
||||
|
@ -522,11 +555,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_multitask_caller_cancels_from_nonroot_task():
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
portal = await n.start_actor(
|
||||
async def test_multitask_caller_cancels_from_nonroot_task(
|
||||
debug_mode: bool,
|
||||
):
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
'ctx_cancelled',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -573,7 +608,7 @@ async def test_multitask_caller_cancels_from_nonroot_task():
|
|||
@tractor.context
|
||||
async def cancel_self(
|
||||
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
|
||||
) -> None:
|
||||
global _state
|
||||
|
@ -610,16 +645,20 @@ async def cancel_self(
|
|||
|
||||
raise RuntimeError('Context didnt cancel itself?!')
|
||||
|
||||
|
||||
@tractor_test
|
||||
async def test_callee_cancels_before_started():
|
||||
async def test_callee_cancels_before_started(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Callee calls `Context.cancel()` while streaming and caller
|
||||
sees stream terminated in `ContextCancelled`.
|
||||
|
||||
'''
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
portal = await n.start_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
'cancels_self',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -645,7 +684,7 @@ async def test_callee_cancels_before_started():
|
|||
@tractor.context
|
||||
async def never_open_stream(
|
||||
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -659,8 +698,8 @@ async def never_open_stream(
|
|||
@tractor.context
|
||||
async def keep_sending_from_callee(
|
||||
|
||||
ctx: tractor.Context,
|
||||
msg_buffer_size: Optional[int] = None,
|
||||
ctx: Context,
|
||||
msg_buffer_size: int|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -685,7 +724,10 @@ async def keep_sending_from_callee(
|
|||
],
|
||||
ids='overrun_condition={}'.format,
|
||||
)
|
||||
def test_one_end_stream_not_opened(overrun_by):
|
||||
def test_one_end_stream_not_opened(
|
||||
overrun_by: tuple[str, int, Callable],
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
This should exemplify the bug from:
|
||||
https://github.com/goodboy/tractor/issues/265
|
||||
|
@ -696,8 +738,10 @@ def test_one_end_stream_not_opened(overrun_by):
|
|||
buf_size = buf_size_increase + Actor.msg_buffer_size
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
entrypoint.__name__,
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -754,7 +798,7 @@ def test_one_end_stream_not_opened(overrun_by):
|
|||
@tractor.context
|
||||
async def echo_back_sequence(
|
||||
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
seq: list[int],
|
||||
wait_for_cancel: bool,
|
||||
allow_overruns_side: str,
|
||||
|
@ -837,6 +881,7 @@ def test_maybe_allow_overruns_stream(
|
|||
slow_side: str,
|
||||
allow_overruns_side: str,
|
||||
loglevel: str,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Demonstrate small overruns of each task back and forth
|
||||
|
@ -855,13 +900,14 @@ def test_maybe_allow_overruns_stream(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
'callee_sends_forever',
|
||||
enable_modules=[__name__],
|
||||
loglevel=loglevel,
|
||||
|
||||
# debug_mode=True,
|
||||
debug_mode=debug_mode,
|
||||
)
|
||||
seq = list(range(10))
|
||||
async with portal.open_context(
|
||||
|
|
|
@ -123,7 +123,9 @@ async def error_before_started(
|
|||
await peer_ctx.cancel()
|
||||
|
||||
|
||||
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||
def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify that an error raised in a remote context which itself
|
||||
opens YET ANOTHER remote context, which it then cancels, does not
|
||||
|
@ -132,7 +134,9 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
|||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as n:
|
||||
portal = await n.start_actor(
|
||||
'errorer',
|
||||
enable_modules=[__name__],
|
||||
|
@ -225,13 +229,16 @@ async def stream_from_peer(
|
|||
# NOTE: cancellation of the (sleeper) peer should always
|
||||
# cause a `ContextCancelled` raise in this streaming
|
||||
# actor.
|
||||
except ContextCancelled as ctxerr:
|
||||
err = ctxerr
|
||||
except ContextCancelled as ctxc:
|
||||
ctxerr = ctxc
|
||||
|
||||
assert peer_ctx._remote_error is ctxerr
|
||||
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
||||
assert peer_ctx.canceller == ctxerr.canceller
|
||||
|
||||
# caller peer should not be the cancel requester
|
||||
assert not ctx.cancel_called
|
||||
|
||||
# XXX can never be true since `._invoke` only
|
||||
# sets this AFTER the nursery block this task
|
||||
# was started in, exits.
|
||||
|
@ -269,9 +276,7 @@ async def stream_from_peer(
|
|||
# assert ctx.canceller[0] == 'root'
|
||||
# assert peer_ctx.canceller[0] == 'sleeper'
|
||||
|
||||
raise RuntimeError(
|
||||
'peer never triggered local `ContextCancelled`?'
|
||||
)
|
||||
raise RuntimeError('Never triggered local `ContextCancelled` ?!?')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -280,6 +285,7 @@ async def stream_from_peer(
|
|||
)
|
||||
def test_peer_canceller(
|
||||
error_during_ctxerr_handling: bool,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify that a cancellation triggered by an in-actor-tree peer
|
||||
|
@ -336,7 +342,7 @@ def test_peer_canceller(
|
|||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||
# debug_mode=True
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
canceller: Portal = await an.start_actor(
|
||||
'canceller',
|
||||
|
@ -377,7 +383,8 @@ def test_peer_canceller(
|
|||
|
||||
try:
|
||||
print('PRE CONTEXT RESULT')
|
||||
await sleeper_ctx.result()
|
||||
res = await sleeper_ctx.result()
|
||||
assert res
|
||||
|
||||
# should never get here
|
||||
pytest.fail(
|
||||
|
@ -387,7 +394,10 @@ def test_peer_canceller(
|
|||
# should always raise since this root task does
|
||||
# not request the sleeper cancellation ;)
|
||||
except ContextCancelled as ctxerr:
|
||||
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
||||
print(
|
||||
'CAUGHT REMOTE CONTEXT CANCEL FOM\n'
|
||||
f'{ctxerr}'
|
||||
)
|
||||
|
||||
# canceller and caller peers should not
|
||||
# have been remotely cancelled.
|
||||
|
@ -410,16 +420,31 @@ def test_peer_canceller(
|
|||
|
||||
# XXX SHOULD NEVER EVER GET HERE XXX
|
||||
except BaseException as berr:
|
||||
err = berr
|
||||
pytest.fail('did not rx ctx-cancelled error?')
|
||||
raise
|
||||
|
||||
# XXX if needed to debug failure
|
||||
# _err = berr
|
||||
# await tractor.pause()
|
||||
# await trio.sleep_forever()
|
||||
|
||||
pytest.fail(
|
||||
'did not rx ctxc ?!?\n\n'
|
||||
|
||||
f'{berr}\n'
|
||||
)
|
||||
|
||||
else:
|
||||
pytest.fail('did not rx ctx-cancelled error?')
|
||||
pytest.fail(
|
||||
'did not rx ctxc ?!?\n\n'
|
||||
|
||||
f'{ctxs}\n'
|
||||
)
|
||||
|
||||
except (
|
||||
ContextCancelled,
|
||||
RuntimeError,
|
||||
)as ctxerr:
|
||||
_err = ctxerr
|
||||
)as loc_err:
|
||||
_loc_err = loc_err
|
||||
|
||||
# NOTE: the main state to check on `Context` is:
|
||||
# - `.cancelled_caught` (maps to nursery cs)
|
||||
|
@ -436,7 +461,7 @@ def test_peer_canceller(
|
|||
# `ContextCancelled` inside `.open_context()`
|
||||
# block
|
||||
if error_during_ctxerr_handling:
|
||||
assert isinstance(ctxerr, RuntimeError)
|
||||
assert isinstance(loc_err, RuntimeError)
|
||||
|
||||
# NOTE: this root actor task should have
|
||||
# called `Context.cancel()` on the
|
||||
|
@ -472,9 +497,10 @@ def test_peer_canceller(
|
|||
|
||||
# CASE: standard teardown inside in `.open_context()` block
|
||||
else:
|
||||
assert ctxerr.canceller == sleeper_ctx.canceller
|
||||
assert isinstance(loc_err, ContextCancelled)
|
||||
assert loc_err.canceller == sleeper_ctx.canceller
|
||||
assert (
|
||||
ctxerr.canceller[0]
|
||||
loc_err.canceller[0]
|
||||
==
|
||||
sleeper_ctx.canceller[0]
|
||||
==
|
||||
|
@ -484,7 +510,7 @@ def test_peer_canceller(
|
|||
# the sleeper's remote error is the error bubbled
|
||||
# out of the context-stack above!
|
||||
re = sleeper_ctx._remote_error
|
||||
assert re is ctxerr
|
||||
assert re is loc_err
|
||||
|
||||
for ctx in ctxs:
|
||||
re: BaseException | None = ctx._remote_error
|
||||
|
@ -554,3 +580,14 @@ def test_peer_canceller(
|
|||
|
||||
assert excinfo.value.type == ContextCancelled
|
||||
assert excinfo.value.canceller[0] == 'canceller'
|
||||
|
||||
|
||||
def test_client_tree_spawns_and_cancels_service_subactor():
|
||||
...
|
||||
# TODO: test for the modden `mod wks open piker` bug!
|
||||
# -> start actor-tree (server) that offers sub-actor spawns via
|
||||
# context API
|
||||
# -> start another full actor-tree (client) which requests to the first to
|
||||
# spawn over its `@context` ep / api.
|
||||
# -> client actor cancels the context and should exit gracefully
|
||||
# and the server's spawned child should cancel and terminate!
|
||||
|
|
|
@ -33,12 +33,15 @@ import exceptiongroup as eg
|
|||
import trio
|
||||
|
||||
from ._state import current_actor
|
||||
from .log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._context import Context
|
||||
from ._stream import MsgStream
|
||||
from .log import StackLevelAdapter
|
||||
|
||||
log = get_logger('tractor')
|
||||
|
||||
_this_mod = importlib.import_module(__name__)
|
||||
|
||||
|
||||
|
@ -112,11 +115,36 @@ class ContextCancelled(RemoteActorError):
|
|||
|
||||
'''
|
||||
@property
|
||||
def canceller(self) -> tuple[str, str] | None:
|
||||
def canceller(self) -> tuple[str, str]|None:
|
||||
'''
|
||||
Return the (maybe) `Actor.uid` for the requesting-author
|
||||
of this ctxc.
|
||||
|
||||
Emit a warning msg when `.canceller` has not been set,
|
||||
which usually idicates that a `None` msg-loop setinel was
|
||||
sent before expected in the runtime. This can happen in
|
||||
a few situations:
|
||||
|
||||
- (simulating) an IPC transport network outage
|
||||
- a (malicious) pkt sent specifically to cancel an actor's
|
||||
runtime non-gracefully without ensuring ongoing RPC tasks are
|
||||
incrementally cancelled as is done with:
|
||||
`Actor`
|
||||
|_`.cancel()`
|
||||
|_`.cancel_soon()`
|
||||
|_`._cancel_task()`
|
||||
|
||||
'''
|
||||
value = self.msgdata.get('canceller')
|
||||
if value:
|
||||
return tuple(value)
|
||||
|
||||
log.warning(
|
||||
'IPC Context cancelled without a requesting actor?\n'
|
||||
'Maybe the IPC transport ended abruptly?\n\n'
|
||||
f'{self}'
|
||||
)
|
||||
|
||||
|
||||
class TransportClosed(trio.ClosedResourceError):
|
||||
"Underlying channel transport was closed prior to use"
|
||||
|
@ -199,7 +227,6 @@ def pack_error(
|
|||
):
|
||||
error_msg.update(exc.msgdata)
|
||||
|
||||
|
||||
pkt: dict = {'error': error_msg}
|
||||
if cid:
|
||||
pkt['cid'] = cid
|
||||
|
@ -210,8 +237,10 @@ def pack_error(
|
|||
def unpack_error(
|
||||
|
||||
msg: dict[str, Any],
|
||||
|
||||
chan=None,
|
||||
err_type=RemoteActorError,
|
||||
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> None|Exception:
|
||||
|
@ -287,37 +316,61 @@ def _raise_from_no_key_in_msg(
|
|||
msg: dict,
|
||||
src_err: KeyError,
|
||||
log: StackLevelAdapter, # caller specific `log` obj
|
||||
|
||||
expect_key: str = 'yield',
|
||||
stream: MsgStream | None = None,
|
||||
|
||||
# allow "deeper" tbs when debugging B^o
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Raise an appopriate local error when a `MsgStream` msg arrives
|
||||
which does not contain the expected (under normal operation)
|
||||
`'yield'` field.
|
||||
Raise an appopriate local error when a
|
||||
`MsgStream` msg arrives which does not
|
||||
contain the expected (at least under normal
|
||||
operation) `'yield'` field.
|
||||
|
||||
`Context` and any embedded `MsgStream` termination,
|
||||
as well as remote task errors are handled in order
|
||||
of priority as:
|
||||
|
||||
- any 'error' msg is re-boxed and raised locally as
|
||||
-> `RemoteActorError`|`ContextCancelled`
|
||||
|
||||
- a `MsgStream` 'stop' msg is constructed, assigned
|
||||
and raised locally as -> `trio.EndOfChannel`
|
||||
|
||||
- All other mis-keyed msgss (like say a "final result"
|
||||
'return' msg, normally delivered from `Context.result()`)
|
||||
are re-boxed inside a `MessagingError` with an explicit
|
||||
exc content describing the missing IPC-msg-key.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# internal error should never get here
|
||||
# an internal error should never get here
|
||||
try:
|
||||
cid: str = msg['cid']
|
||||
except KeyError as src_err:
|
||||
raise MessagingError(
|
||||
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
|
||||
f'cid: {cid}\n'
|
||||
'received msg:\n'
|
||||
f'cid: {cid}\n\n'
|
||||
|
||||
f'{pformat(msg)}\n'
|
||||
) from src_err
|
||||
|
||||
# TODO: test that shows stream raising an expected error!!!
|
||||
|
||||
# raise the error message in a boxed exception type!
|
||||
if msg.get('error'):
|
||||
# raise the error message
|
||||
raise unpack_error(
|
||||
msg,
|
||||
ctx.chan,
|
||||
hide_tb=hide_tb,
|
||||
|
||||
) from None
|
||||
|
||||
# `MsgStream` termination msg.
|
||||
elif (
|
||||
msg.get('stop')
|
||||
or (
|
||||
|
@ -330,29 +383,26 @@ def _raise_from_no_key_in_msg(
|
|||
f'cid: {cid}\n'
|
||||
)
|
||||
|
||||
# XXX: important to set so that a new ``.receive()``
|
||||
# call (likely by another task using a broadcast receiver)
|
||||
# doesn't accidentally pull the ``return`` message
|
||||
# value out of the underlying feed mem chan!
|
||||
stream._eoc: bool = True
|
||||
|
||||
# TODO: if the a local task is already blocking on
|
||||
# a `Context.result()` and thus a `.receive()` on the
|
||||
# rx-chan, we close the chan and set state ensuring that
|
||||
# an eoc is raised!
|
||||
|
||||
# # when the send is closed we assume the stream has
|
||||
# # terminated and signal this local iterator to stop
|
||||
# await stream.aclose()
|
||||
|
||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
raise trio.EndOfChannel(
|
||||
f'Context stream ended due to msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
) from src_err
|
||||
eoc = trio.EndOfChannel(
|
||||
f'Context stream ended due to msg:\n\n'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# XXX: important to set so that a new `.receive()`
|
||||
# call (likely by another task using a broadcast receiver)
|
||||
# doesn't accidentally pull the `return` message
|
||||
# value out of the underlying feed mem chan which is
|
||||
# destined for the `Context.result()` call during ctx-exit!
|
||||
stream._eoc: Exception = eoc
|
||||
|
||||
raise eoc from src_err
|
||||
|
||||
if (
|
||||
stream
|
||||
|
|
|
@ -138,13 +138,19 @@ async def open_root_actor(
|
|||
)
|
||||
assert registry_addrs
|
||||
|
||||
loglevel = (loglevel or log._default_loglevel).upper()
|
||||
loglevel = (
|
||||
loglevel
|
||||
or log._default_loglevel
|
||||
).upper()
|
||||
|
||||
if debug_mode and _spawn._spawn_method == 'trio':
|
||||
if (
|
||||
debug_mode
|
||||
and _spawn._spawn_method == 'trio'
|
||||
):
|
||||
_state._runtime_vars['_debug_mode'] = True
|
||||
|
||||
# expose internal debug module to every actor allowing
|
||||
# for use of ``await tractor.breakpoint()``
|
||||
# expose internal debug module to every actor allowing for
|
||||
# use of ``await tractor.pause()``
|
||||
enable_modules.append('tractor.devx._debug')
|
||||
|
||||
# if debug mode get's enabled *at least* use that level of
|
||||
|
@ -163,7 +169,20 @@ async def open_root_actor(
|
|||
"Debug mode is only supported for the `trio` backend!"
|
||||
)
|
||||
|
||||
log.get_console_log(loglevel)
|
||||
assert loglevel
|
||||
_log = log.get_console_log(loglevel)
|
||||
assert _log
|
||||
|
||||
# TODO: factor this into `.devx._stackscope`!!
|
||||
if debug_mode:
|
||||
try:
|
||||
logger.info('Enabling `stackscope` traces on SIGUSR1')
|
||||
from .devx import enable_stack_on_sig
|
||||
enable_stack_on_sig()
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
'`stackscope` not installed for use in debug mode!'
|
||||
)
|
||||
|
||||
# closed into below ping task-func
|
||||
ponged_addrs: list[tuple[str, int]] = []
|
||||
|
|
|
@ -48,15 +48,12 @@ import trio
|
|||
from trio import (
|
||||
CancelScope,
|
||||
)
|
||||
from trio.lowlevel import (
|
||||
current_task,
|
||||
Task,
|
||||
)
|
||||
from trio_typing import (
|
||||
Nursery,
|
||||
TaskStatus,
|
||||
)
|
||||
|
||||
from .msg import NamespacePath
|
||||
from ._ipc import Channel
|
||||
from ._context import (
|
||||
mk_context,
|
||||
|
@ -145,8 +142,9 @@ async def _invoke(
|
|||
cs: CancelScope | None = None
|
||||
|
||||
ctx = actor.get_context(
|
||||
chan,
|
||||
cid,
|
||||
chan=chan,
|
||||
cid=cid,
|
||||
nsf=NamespacePath.from_ref(func),
|
||||
# We shouldn't ever need to pass this through right?
|
||||
# it's up to the soon-to-be called rpc task to
|
||||
# open the stream with this option.
|
||||
|
@ -276,8 +274,8 @@ async def _invoke(
|
|||
|
||||
# TODO: should would be nice to have our
|
||||
# `TaskMngr` nursery here!
|
||||
# res: Any = await coro
|
||||
res = await coro
|
||||
res: Any = await coro
|
||||
ctx._result = res
|
||||
|
||||
# deliver final result to caller side.
|
||||
await chan.send({
|
||||
|
@ -314,11 +312,18 @@ async def _invoke(
|
|||
# don't pop the local context until we know the
|
||||
# associated child isn't in debug any more
|
||||
await maybe_wait_for_debugger()
|
||||
ctx: Context = actor._contexts.pop((chan.uid, cid))
|
||||
ctx: Context = actor._contexts.pop(
|
||||
(chan.uid, cid)
|
||||
)
|
||||
|
||||
res_str: str = (
|
||||
'error: {ctx._local_error}'
|
||||
if ctx._local_error
|
||||
else f'result: {ctx._result}'
|
||||
)
|
||||
log.cancel(
|
||||
f'Context task was terminated:\n'
|
||||
f'func: {func}\n'
|
||||
f'ctx: {pformat(ctx)}'
|
||||
f'IPC context terminated with final {res_str}\n'
|
||||
f'|_{pformat(ctx)}\n'
|
||||
)
|
||||
|
||||
if ctx.cancelled_caught:
|
||||
|
@ -331,7 +336,6 @@ async def _invoke(
|
|||
ctx._maybe_raise_remote_err(re)
|
||||
|
||||
# fname: str = func.__name__
|
||||
task: Task = current_task()
|
||||
cs: CancelScope = ctx._scope
|
||||
if cs.cancel_called:
|
||||
our_uid: tuple = actor.uid
|
||||
|
@ -378,16 +382,16 @@ async def _invoke(
|
|||
div_str +
|
||||
f'<= canceller: {canceller}\n'
|
||||
f'=> uid: {our_uid}\n'
|
||||
f' |_ task: `{task.name}()`'
|
||||
f' |_{ctx._task}()\n'
|
||||
)
|
||||
|
||||
# TODO: does this ever get set any more or can
|
||||
# we remove it?
|
||||
if ctx._cancel_msg:
|
||||
msg += (
|
||||
'------ - ------\n'
|
||||
'IPC msg:\n'
|
||||
f'{ctx._cancel_msg}'
|
||||
# '------ - ------\n'
|
||||
# 'IPC msg:\n'
|
||||
f'\n{ctx._cancel_msg}'
|
||||
)
|
||||
|
||||
# task-contex was either cancelled by request using
|
||||
|
@ -435,7 +439,12 @@ async def _invoke(
|
|||
task_status.started(ctx)
|
||||
result = await coro
|
||||
fname: str = func.__name__
|
||||
log.runtime(f'{fname}() result: {result}')
|
||||
log.runtime(
|
||||
'RPC complete:\n'
|
||||
f'task: {ctx._task}\n'
|
||||
f'|_cid={ctx.cid}\n'
|
||||
f'|_{fname}() -> {pformat(result)}\n'
|
||||
)
|
||||
|
||||
# NOTE: only send result if we know IPC isn't down
|
||||
if (
|
||||
|
@ -965,7 +974,7 @@ class Actor:
|
|||
# and bail after timeout (2-generals on closure).
|
||||
assert chan.msgstream
|
||||
|
||||
log.runtime(
|
||||
log.warning(
|
||||
f'Draining lingering msgs from stream {chan.msgstream}'
|
||||
)
|
||||
|
||||
|
@ -977,13 +986,24 @@ class Actor:
|
|||
# making sure any RPC response to that call is
|
||||
# delivered the local calling task.
|
||||
# TODO: factor this into a helper?
|
||||
log.runtime(f'drained {msg} for {chan.uid}')
|
||||
log.warning(
|
||||
'Draining msg from disconnected\n'
|
||||
f'peer: {chan.uid}]\n\n'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
cid = msg.get('cid')
|
||||
if cid:
|
||||
# deliver response to local caller/waiter
|
||||
await self._push_result(chan, cid, msg)
|
||||
await self._push_result(
|
||||
chan,
|
||||
cid,
|
||||
msg,
|
||||
)
|
||||
|
||||
log.runtime('Waiting on actor nursery to exit..')
|
||||
log.runtime(
|
||||
'Waiting on local actor nursery to exit..\n'
|
||||
f'|_{local_nursery}\n'
|
||||
)
|
||||
await local_nursery.exited.wait()
|
||||
|
||||
if disconnected:
|
||||
|
@ -1167,6 +1187,7 @@ class Actor:
|
|||
self,
|
||||
chan: Channel,
|
||||
cid: str,
|
||||
nsf: NamespacePath,
|
||||
|
||||
msg_buffer_size: int | None = None,
|
||||
allow_overruns: bool = False,
|
||||
|
@ -1180,11 +1201,15 @@ class Actor:
|
|||
task-as-function invocation.
|
||||
|
||||
'''
|
||||
log.runtime(f"Getting result queue for {chan.uid} cid {cid}")
|
||||
actor_uid = chan.uid
|
||||
assert actor_uid
|
||||
try:
|
||||
ctx = self._contexts[(actor_uid, cid)]
|
||||
log.runtime(
|
||||
f'Retreived cached IPC ctx for\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'cid:{cid}\n'
|
||||
)
|
||||
ctx._allow_overruns = allow_overruns
|
||||
|
||||
# adjust buffer size if specified
|
||||
|
@ -1193,9 +1218,15 @@ class Actor:
|
|||
state.max_buffer_size = msg_buffer_size
|
||||
|
||||
except KeyError:
|
||||
log.runtime(
|
||||
f'Creating NEW IPC ctx for\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'cid: {cid}\n'
|
||||
)
|
||||
ctx = mk_context(
|
||||
chan,
|
||||
cid,
|
||||
nsf=nsf,
|
||||
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
||||
_allow_overruns=allow_overruns,
|
||||
)
|
||||
|
@ -1206,11 +1237,13 @@ class Actor:
|
|||
async def start_remote_task(
|
||||
self,
|
||||
chan: Channel,
|
||||
ns: str,
|
||||
func: str,
|
||||
nsf: NamespacePath,
|
||||
kwargs: dict,
|
||||
|
||||
# IPC channel config
|
||||
msg_buffer_size: int | None = None,
|
||||
allow_overruns: bool = False,
|
||||
load_nsf: bool = False,
|
||||
|
||||
) -> Context:
|
||||
'''
|
||||
|
@ -1225,20 +1258,43 @@ class Actor:
|
|||
cid = str(uuid.uuid4())
|
||||
assert chan.uid
|
||||
ctx = self.get_context(
|
||||
chan,
|
||||
cid,
|
||||
chan=chan,
|
||||
cid=cid,
|
||||
nsf=nsf,
|
||||
msg_buffer_size=msg_buffer_size,
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||
|
||||
if (
|
||||
'self' in nsf
|
||||
or not load_nsf
|
||||
):
|
||||
ns, _, func = nsf.partition(':')
|
||||
else:
|
||||
# TODO: pass nsf directly over wire!
|
||||
# -[ ] but, how to do `self:<Actor.meth>`??
|
||||
ns, func = nsf.to_tuple()
|
||||
|
||||
log.runtime(
|
||||
'Sending cmd to\n'
|
||||
f'peer: {chan.uid} => \n'
|
||||
'\n'
|
||||
f'=> {ns}.{func}({kwargs})\n'
|
||||
)
|
||||
await chan.send(
|
||||
{'cmd': (ns, func, kwargs, self.uid, cid)}
|
||||
{'cmd': (
|
||||
ns,
|
||||
func,
|
||||
kwargs,
|
||||
self.uid,
|
||||
cid,
|
||||
)}
|
||||
)
|
||||
|
||||
# Wait on first response msg and validate; this should be
|
||||
# immediate.
|
||||
first_msg = await ctx._recv_chan.receive()
|
||||
functype = first_msg.get('functype')
|
||||
first_msg: dict = await ctx._recv_chan.receive()
|
||||
functype: str = first_msg.get('functype')
|
||||
|
||||
if 'error' in first_msg:
|
||||
raise unpack_error(first_msg, chan)
|
||||
|
@ -1280,14 +1336,19 @@ class Actor:
|
|||
parent_data: dict[str, Any]
|
||||
parent_data = await chan.recv()
|
||||
log.runtime(
|
||||
"Received state from parent:\n"
|
||||
f"{parent_data}"
|
||||
'Received state from parent:\n\n'
|
||||
# TODO: eventually all these msgs as
|
||||
# `msgspec.Struct` with a special mode that
|
||||
# pformats them in multi-line mode, BUT only
|
||||
# if "trace"/"util" mode is enabled?
|
||||
f'{pformat(parent_data)}\n'
|
||||
)
|
||||
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
|
||||
rvs = parent_data.pop('_runtime_vars')
|
||||
|
||||
if rvs['_debug_mode']:
|
||||
try:
|
||||
log.info('Enabling `stackscope` traces on SIGUSR1')
|
||||
from .devx import enable_stack_on_sig
|
||||
enable_stack_on_sig()
|
||||
except ImportError:
|
||||
|
@ -1368,7 +1429,8 @@ class Actor:
|
|||
for listener in listeners
|
||||
]
|
||||
log.runtime(
|
||||
f'Started tcp server(s) on {sockets}'
|
||||
'Started TCP server(s)\n'
|
||||
f'|_{sockets}\n'
|
||||
)
|
||||
self._listeners.extend(listeners)
|
||||
|
||||
|
@ -1480,8 +1542,20 @@ class Actor:
|
|||
# be cancelled was indeed spawned by a request from this channel
|
||||
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
||||
scope: CancelScope = ctx._scope
|
||||
|
||||
except KeyError:
|
||||
log.cancel(f"{cid} has already completed/terminated?")
|
||||
# NOTE: during msging race conditions this will often
|
||||
# emit, some examples:
|
||||
# - callee returns a result before cancel-msg/ctxc-raised
|
||||
# - callee self raises ctxc before caller send request,
|
||||
# - callee errors prior to cancel req.
|
||||
log.cancel(
|
||||
'Cancel request invalid, RPC task already completed?\n'
|
||||
f'<= canceller: {requesting_uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
|
||||
f'=> ctx id: {cid}\n'
|
||||
)
|
||||
return True
|
||||
|
||||
log.cancel(
|
||||
|
@ -1923,7 +1997,7 @@ async def process_messages(
|
|||
log.runtime(
|
||||
'Entering IPC msg loop:\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan}'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
nursery_cancelled_before_task: bool = False
|
||||
msg: dict | None = None
|
||||
|
@ -1960,8 +2034,10 @@ async def process_messages(
|
|||
|
||||
log.transport( # type: ignore
|
||||
f'<= IPC msg from peer: {chan.uid}\n\n'
|
||||
|
||||
# TODO: conditionally avoid fmting depending
|
||||
# on log level (for perf)?
|
||||
# => specifically `pformat()` sub-call..?
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
|
||||
|
@ -1969,19 +2045,35 @@ async def process_messages(
|
|||
if cid:
|
||||
# deliver response to local caller/waiter
|
||||
# via its per-remote-context memory channel.
|
||||
await actor._push_result(chan, cid, msg)
|
||||
await actor._push_result(
|
||||
chan,
|
||||
cid,
|
||||
msg,
|
||||
)
|
||||
|
||||
log.runtime(
|
||||
f'Waiting on next IPC msg from {chan.uid}:\n'
|
||||
'Waiting on next IPC msg from\n'
|
||||
f'peer: {chan.uid}:\n'
|
||||
f'|_{chan}\n'
|
||||
|
||||
# f'last msg: {msg}\n'
|
||||
f'|_{chan}'
|
||||
)
|
||||
continue
|
||||
|
||||
# TODO: implement with ``match:`` syntax?
|
||||
# process command request
|
||||
# process a 'cmd' request-msg upack
|
||||
# TODO: impl with native `msgspec.Struct` support !!
|
||||
# -[ ] implement with ``match:`` syntax?
|
||||
# -[ ] discard un-authed msgs as per,
|
||||
# <TODO put issue for typed msging structs>
|
||||
try:
|
||||
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
||||
(
|
||||
ns,
|
||||
funcname,
|
||||
kwargs,
|
||||
actorid,
|
||||
cid,
|
||||
) = msg['cmd']
|
||||
|
||||
except KeyError:
|
||||
# This is the non-rpc error case, that is, an
|
||||
# error **not** raised inside a call to ``_invoke()``
|
||||
|
@ -1994,29 +2086,33 @@ async def process_messages(
|
|||
raise exc
|
||||
|
||||
log.runtime(
|
||||
f"Processing request from {actorid}\n"
|
||||
f"{ns}.{funcname}({kwargs})")
|
||||
|
||||
'Handling RPC cmd from\n'
|
||||
f'peer: {actorid}\n'
|
||||
'\n'
|
||||
f'=> {ns}.{funcname}({kwargs})\n'
|
||||
)
|
||||
if ns == 'self':
|
||||
uid: tuple = chan.uid
|
||||
if funcname == 'cancel':
|
||||
func: Callable = actor.cancel
|
||||
kwargs['requesting_uid'] = chan.uid
|
||||
kwargs['requesting_uid'] = uid
|
||||
|
||||
# don't start entire actor runtime cancellation
|
||||
# if this actor is currently in debug mode!
|
||||
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
|
||||
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
|
||||
if pdb_complete:
|
||||
await pdb_complete.wait()
|
||||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
# Either of `Actor.cancel()`/`.cancel_soon()`
|
||||
# was called, so terminate this IPC msg
|
||||
# loop, exit back out into `async_main()`,
|
||||
# and immediately start the core runtime
|
||||
# machinery shutdown!
|
||||
with CancelScope(shield=True):
|
||||
# actor.cancel() was called so kill this
|
||||
# msg loop and break out into
|
||||
# ``async_main()``
|
||||
log.cancel(
|
||||
"Actor runtime for was remotely cancelled "
|
||||
f"by {chan.uid}"
|
||||
f'Cancel request for `Actor` runtime\n'
|
||||
f'<= canceller: {uid}\n'
|
||||
# f'=> uid: {actor.uid}\n'
|
||||
)
|
||||
await _invoke(
|
||||
actor,
|
||||
|
@ -2043,9 +2139,10 @@ async def process_messages(
|
|||
target_cid = kwargs['cid']
|
||||
kwargs['requesting_uid'] = chan.uid
|
||||
log.cancel(
|
||||
f'Remote request to cancel task\n'
|
||||
f'remote actor: {chan.uid}\n'
|
||||
f'task: {target_cid}'
|
||||
f'Rx task cancel request\n'
|
||||
f'<= canceller: {chan.uid}\n'
|
||||
f'=> uid: {actor.uid}\n'
|
||||
f' |_cid: {target_cid}\n'
|
||||
)
|
||||
try:
|
||||
await _invoke(
|
||||
|
@ -2105,17 +2202,18 @@ async def process_messages(
|
|||
# in the lone case where a ``Context`` is not
|
||||
# delivered, it's likely going to be a locally
|
||||
# scoped exception from ``_invoke()`` itself.
|
||||
if isinstance(ctx, Exception):
|
||||
if isinstance(err := ctx, Exception):
|
||||
log.warning(
|
||||
f"Task for RPC func {func} failed with"
|
||||
f"{ctx}"
|
||||
'Task for RPC failed?'
|
||||
f'|_ {func}()\n\n'
|
||||
|
||||
f'{err}'
|
||||
)
|
||||
continue
|
||||
|
||||
else:
|
||||
# mark that we have ongoing rpc tasks
|
||||
actor._ongoing_rpc_tasks = trio.Event()
|
||||
log.runtime(f"RPC func is {func}")
|
||||
|
||||
# store cancel scope such that the rpc task can be
|
||||
# cancelled gracefully if requested
|
||||
|
@ -2126,7 +2224,10 @@ async def process_messages(
|
|||
)
|
||||
|
||||
log.runtime(
|
||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||
'Waiting on next IPC msg from\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
|
||||
# end of async for, channel disconnect vis
|
||||
# ``trio.EndOfChannel``
|
||||
|
@ -2143,9 +2244,12 @@ async def process_messages(
|
|||
# handshake for them (yet) and instead we simply bail out of
|
||||
# the message loop and expect the teardown sequence to clean
|
||||
# up.
|
||||
# TODO: don't show this msg if it's an emphemeral
|
||||
# discovery ep call?
|
||||
log.runtime(
|
||||
f'channel from {chan.uid} closed abruptly:\n'
|
||||
f'-> {chan.raddr}\n'
|
||||
f'channel closed abruptly with\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan.raddr}\n'
|
||||
)
|
||||
|
||||
# transport **was** disconnected
|
||||
|
@ -2187,9 +2291,11 @@ async def process_messages(
|
|||
finally:
|
||||
# msg debugging for when he machinery is brokey
|
||||
log.runtime(
|
||||
f'Exiting IPC msg loop with {chan.uid} '
|
||||
f'final msg: {msg}\n'
|
||||
f'|_{chan}'
|
||||
'Exiting IPC msg loop with\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan}\n\n'
|
||||
'final msg:\n'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
|
||||
# transport **was not** disconnected
|
||||
|
|
|
@ -400,7 +400,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
else:
|
||||
log.exception(
|
||||
f"Nursery for {current_actor().uid} "
|
||||
"errored with\n"
|
||||
"errored with:"
|
||||
|
||||
# TODO: same thing as in
|
||||
# `._invoke()` to compute how to
|
||||
|
|
|
@ -1,18 +1,19 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
# This program is free software: you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Affero General Public License
|
||||
# as published by the Free Software Foundation, either version 3 of
|
||||
# the License, or (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
# This program is distributed in the hope that it will be useful, but
|
||||
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
# Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
# You should have received a copy of the GNU Affero General Public
|
||||
# License along with this program. If not, see
|
||||
# <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
Multi-core debugging for da peeps!
|
||||
|
@ -43,6 +44,7 @@ from types import FrameType
|
|||
import pdbp
|
||||
import tractor
|
||||
import trio
|
||||
from trio.lowlevel import current_task
|
||||
from trio_typing import (
|
||||
TaskStatus,
|
||||
# Task,
|
||||
|
@ -50,6 +52,7 @@ from trio_typing import (
|
|||
|
||||
from ..log import get_logger
|
||||
from .._state import (
|
||||
current_actor,
|
||||
is_root_process,
|
||||
debug_mode,
|
||||
)
|
||||
|
@ -238,7 +241,7 @@ async def _acquire_debug_lock_from_root_task(
|
|||
to the ``pdb`` repl.
|
||||
|
||||
'''
|
||||
task_name: str = trio.lowlevel.current_task().name
|
||||
task_name: str = current_task().name
|
||||
we_acquired: bool = False
|
||||
|
||||
log.runtime(
|
||||
|
@ -323,8 +326,7 @@ async def lock_tty_for_child(
|
|||
highly reliable at releasing the mutex complete!
|
||||
|
||||
'''
|
||||
task_name = trio.lowlevel.current_task().name
|
||||
|
||||
task_name: str = current_task().name
|
||||
if tuple(subactor_uid) in Lock._blocked:
|
||||
log.warning(
|
||||
f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
|
||||
|
@ -407,11 +409,13 @@ async def wait_for_parent_stdin_hijack(
|
|||
assert val == 'Locked'
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
# unblock local caller
|
||||
|
||||
try:
|
||||
# unblock local caller
|
||||
assert Lock.local_pdb_complete
|
||||
task_status.started(cs)
|
||||
|
||||
# wait for local task to exit and
|
||||
# release the REPL
|
||||
await Lock.local_pdb_complete.wait()
|
||||
|
||||
finally:
|
||||
|
@ -468,7 +472,7 @@ def shield_sigint_handler(
|
|||
|
||||
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
|
||||
|
||||
actor = tractor.current_actor()
|
||||
actor = current_actor()
|
||||
# print(f'{actor.uid} in HANDLER with ')
|
||||
|
||||
def do_cancel():
|
||||
|
@ -613,7 +617,7 @@ def _set_trace(
|
|||
shield: bool = False,
|
||||
):
|
||||
__tracebackhide__: bool = True
|
||||
actor: tractor.Actor = actor or tractor.current_actor()
|
||||
actor: tractor.Actor = actor or current_actor()
|
||||
|
||||
# start 2 levels up in user code
|
||||
frame: FrameType | None = sys._getframe()
|
||||
|
@ -683,9 +687,9 @@ async def pause(
|
|||
|
||||
'''
|
||||
# __tracebackhide__ = True
|
||||
actor = tractor.current_actor()
|
||||
actor = current_actor()
|
||||
pdb, undo_sigint = mk_mpdb()
|
||||
task_name = trio.lowlevel.current_task().name
|
||||
task_name: str = trio.lowlevel.current_task().name
|
||||
|
||||
if (
|
||||
not Lock.local_pdb_complete
|
||||
|
@ -836,7 +840,7 @@ async def pause(
|
|||
# runtime aware version which takes care of all .
|
||||
def pause_from_sync() -> None:
|
||||
print("ENTER SYNC PAUSE")
|
||||
actor: tractor.Actor = tractor.current_actor(
|
||||
actor: tractor.Actor = current_actor(
|
||||
err_on_no_runtime=False,
|
||||
)
|
||||
if actor:
|
||||
|
@ -971,9 +975,10 @@ async def acquire_debug_lock(
|
|||
'''
|
||||
Grab root's debug lock on entry, release on exit.
|
||||
|
||||
This helper is for actor's who don't actually need
|
||||
to acquired the debugger but want to wait until the
|
||||
lock is free in the process-tree root.
|
||||
This helper is for actor's who don't actually need to acquired
|
||||
the debugger but want to wait until the lock is free in the
|
||||
process-tree root such that they don't clobber an ongoing pdb
|
||||
REPL session in some peer or child!
|
||||
|
||||
'''
|
||||
if not debug_mode():
|
||||
|
@ -1013,43 +1018,71 @@ async def maybe_wait_for_debugger(
|
|||
# tearing down.
|
||||
sub_in_debug: tuple[str, str] | None = None
|
||||
|
||||
for _ in range(poll_steps):
|
||||
for istep in range(poll_steps):
|
||||
|
||||
if Lock.global_actor_in_debug:
|
||||
sub_in_debug = tuple(Lock.global_actor_in_debug)
|
||||
|
||||
log.debug('Root polling for debug')
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(poll_delay)
|
||||
|
||||
# TODO: could this make things more deterministic? wait
|
||||
# to see if a sub-actor task will be scheduled and grab
|
||||
# the tty lock on the next tick?
|
||||
# XXX: doesn't seem to work
|
||||
if sub_in_debug := Lock.global_actor_in_debug:
|
||||
log.pdb(
|
||||
f'Lock in use by {sub_in_debug}'
|
||||
)
|
||||
# TODO: could this make things more deterministic?
|
||||
# wait to see if a sub-actor task will be
|
||||
# scheduled and grab the tty lock on the next
|
||||
# tick?
|
||||
# XXX => but it doesn't seem to work..
|
||||
# await trio.testing.wait_all_tasks_blocked(cushion=0)
|
||||
|
||||
debug_complete = Lock.no_remote_has_tty
|
||||
if (
|
||||
debug_complete
|
||||
and sub_in_debug is not None
|
||||
and not debug_complete.is_set()
|
||||
):
|
||||
log.pdb(
|
||||
'Root has errored but pdb is in use by '
|
||||
f'child {sub_in_debug}\n'
|
||||
'Waiting on tty lock to release..'
|
||||
)
|
||||
debug_complete: trio.Event|None = Lock.no_remote_has_tty
|
||||
if (
|
||||
debug_complete
|
||||
and not debug_complete.is_set()
|
||||
and sub_in_debug is not None
|
||||
):
|
||||
log.pdb(
|
||||
'Root has errored but pdb is in use by child\n'
|
||||
'Waiting on tty lock to release..\n'
|
||||
f'uid: {sub_in_debug}\n'
|
||||
)
|
||||
await debug_complete.wait()
|
||||
log.pdb(
|
||||
f'Child subactor released debug lock!\n'
|
||||
f'uid: {sub_in_debug}\n'
|
||||
)
|
||||
if debug_complete.is_set():
|
||||
break
|
||||
|
||||
await debug_complete.wait()
|
||||
# is no subactor locking debugger currently?
|
||||
elif (
|
||||
debug_complete is None
|
||||
or sub_in_debug is None
|
||||
):
|
||||
log.pdb(
|
||||
'Root acquired debug TTY LOCK from child\n'
|
||||
f'uid: {sub_in_debug}'
|
||||
)
|
||||
break
|
||||
|
||||
await trio.sleep(poll_delay)
|
||||
continue
|
||||
else:
|
||||
# TODO: don't need this right?
|
||||
# await trio.lowlevel.checkpoint()
|
||||
|
||||
log.debug(
|
||||
'Root polling for debug:\n'
|
||||
f'poll step: {istep}\n'
|
||||
f'poll delya: {poll_delay}'
|
||||
)
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(poll_delay)
|
||||
continue
|
||||
else:
|
||||
log.debug(
|
||||
'Root acquired TTY LOCK'
|
||||
)
|
||||
log.pdb('Root acquired debug TTY LOCK')
|
||||
|
||||
# else:
|
||||
# # TODO: non-root call for #320?
|
||||
# this_uid: tuple[str, str] = current_actor().uid
|
||||
# async with acquire_debug_lock(
|
||||
# subactor_uid=this_uid,
|
||||
# ):
|
||||
# pass
|
||||
|
||||
# TODO: better naming and what additionals?
|
||||
# - [ ] optional runtime plugging?
|
||||
|
|
|
@ -58,6 +58,11 @@ class NamespacePath(str):
|
|||
'''
|
||||
_ref: object | type | None = None
|
||||
|
||||
# TODO: support providing the ns instance in
|
||||
# order to support 'self.<meth>` style to make
|
||||
# `Portal.run_from_ns()` work!
|
||||
# _ns: ModuleType|type|None = None
|
||||
|
||||
def load_ref(self) -> object | type:
|
||||
if self._ref is None:
|
||||
self._ref = resolve_name(self)
|
||||
|
@ -100,5 +105,13 @@ class NamespacePath(str):
|
|||
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
|
||||
return cls(':'.join(fqnp))
|
||||
|
||||
def to_tuple(self) -> tuple[str, str]:
|
||||
return self._mk_fqnp(self.load_ref())
|
||||
def to_tuple(
|
||||
self,
|
||||
|
||||
# TODO: could this work re `self:<meth>` case from above?
|
||||
# load_ref: bool = True,
|
||||
|
||||
) -> tuple[str, str]:
|
||||
return self._mk_fqnp(
|
||||
self.load_ref()
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue