Compare commits

..

No commits in common. "10adf34be582a92f55587259845b09a5054f2265" and "54a0a0000d15210590fab1ba79ce18d85e940e74" have entirely different histories.

9 changed files with 225 additions and 529 deletions

View File

@ -26,7 +26,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup( setup(
name="tractor", name="tractor",
version='0.1.0a6dev0', # alpha zone version='0.1.0a6dev0', # alpha zone
description='structured concurrent `trio`-"actors"', description='structured concurrrent `trio`-"actors"',
long_description=readme, long_description=readme,
license='AGPLv3', license='AGPLv3',
author='Tyler Goodlet', author='Tyler Goodlet',
@ -50,7 +50,6 @@ setup(
'exceptiongroup', 'exceptiongroup',
# tooling # tooling
'stackscope',
'tricycle', 'tricycle',
'trio_typing', 'trio_typing',
'colorlog', 'colorlog',
@ -62,15 +61,16 @@ setup(
# debug mode REPL # debug mode REPL
'pdbp', 'pdbp',
# TODO: distributed transport using
# linux kernel networking
# 'pyroute2',
# pip ref docs on these specs: # pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
# and pep: # and pep:
# https://peps.python.org/pep-0440/#version-specifiers # https://peps.python.org/pep-0440/#version-specifiers
# windows deps workaround for ``pdbpp``
# https://github.com/pdbpp/pdbpp/issues/498
# https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"',
], ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.10", python_requires=">=3.10",

View File

@ -8,9 +8,7 @@ sync-opening a ``tractor.Context`` beforehand.
# from contextlib import asynccontextmanager as acm # from contextlib import asynccontextmanager as acm
from itertools import count from itertools import count
import platform import platform
from typing import ( from typing import Optional
Callable,
)
import pytest import pytest
import trio import trio
@ -71,7 +69,7 @@ _state: bool = False
@tractor.context @tractor.context
async def too_many_starteds( async def too_many_starteds(
ctx: Context, ctx: tractor.Context,
) -> None: ) -> None:
''' '''
Call ``Context.started()`` more then once (an error). Call ``Context.started()`` more then once (an error).
@ -86,7 +84,7 @@ async def too_many_starteds(
@tractor.context @tractor.context
async def not_started_but_stream_opened( async def not_started_but_stream_opened(
ctx: Context, ctx: tractor.Context,
) -> None: ) -> None:
''' '''
Enter ``Context.open_stream()`` without calling ``.started()``. Enter ``Context.open_stream()`` without calling ``.started()``.
@ -107,15 +105,11 @@ async def not_started_but_stream_opened(
], ],
ids='misuse_type={}'.format, ids='misuse_type={}'.format,
) )
def test_started_misuse( def test_started_misuse(target):
target: Callable,
debug_mode: bool,
):
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode, portal = await n.start_actor(
) as an:
portal = await an.start_actor(
target.__name__, target.__name__,
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -130,7 +124,7 @@ def test_started_misuse(
@tractor.context @tractor.context
async def simple_setup_teardown( async def simple_setup_teardown(
ctx: Context, ctx: tractor.Context,
data: int, data: int,
block_forever: bool = False, block_forever: bool = False,
@ -176,7 +170,6 @@ def test_simple_context(
error_parent, error_parent,
callee_blocks_forever, callee_blocks_forever,
pointlessly_open_stream, pointlessly_open_stream,
debug_mode: bool,
): ):
timeout = 1.5 if not platform.system() == 'Windows' else 4 timeout = 1.5 if not platform.system() == 'Windows' else 4
@ -184,10 +177,9 @@ def test_simple_context(
async def main(): async def main():
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with tractor.open_nursery( async with tractor.open_nursery() as nursery:
debug_mode=debug_mode,
) as an: portal = await nursery.start_actor(
portal = await an.start_actor(
'simple_context', 'simple_context',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -268,7 +260,6 @@ def test_caller_cancels(
cancel_method: str, cancel_method: str,
chk_ctx_result_before_exit: bool, chk_ctx_result_before_exit: bool,
callee_returns_early: bool, callee_returns_early: bool,
debug_mode: bool,
): ):
''' '''
Verify that when the opening side of a context (aka the caller) Verify that when the opening side of a context (aka the caller)
@ -277,7 +268,7 @@ def test_caller_cancels(
''' '''
async def check_canceller( async def check_canceller(
ctx: Context, ctx: tractor.Context,
) -> None: ) -> None:
# should not raise yet return the remote # should not raise yet return the remote
# context cancelled error. # context cancelled error.
@ -296,10 +287,8 @@ def test_caller_cancels(
) )
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as nursery:
debug_mode=debug_mode, portal = await nursery.start_actor(
) as an:
portal = await an.start_actor(
'simple_context', 'simple_context',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -349,7 +338,7 @@ def test_caller_cancels(
@tractor.context @tractor.context
async def close_ctx_immediately( async def close_ctx_immediately(
ctx: Context, ctx: tractor.Context,
) -> None: ) -> None:
@ -361,33 +350,17 @@ async def close_ctx_immediately(
@tractor_test @tractor_test
async def test_callee_closes_ctx_after_stream_open( async def test_callee_closes_ctx_after_stream_open():
debug_mode: bool, 'callee context closes without using stream'
):
'''
callee context closes without using stream.
This should result in a msg sequence async with tractor.open_nursery() as n:
|_<root>_
|_<fast_stream_closer>
<= {'started': <Any>, 'cid': <str>} portal = await n.start_actor(
<= {'stop': True, 'cid': <str>}
<= {'result': Any, ..}
(ignored by child)
=> {'stop': True, 'cid': <str>}
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'fast_stream_closer', 'fast_stream_closer',
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(0.5): with trio.fail_after(2):
async with portal.open_context( async with portal.open_context(
close_ctx_immediately, close_ctx_immediately,
@ -395,9 +368,10 @@ async def test_callee_closes_ctx_after_stream_open(
# cancel_on_exit=True, # cancel_on_exit=True,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
with trio.fail_after(0.4): with trio.fail_after(0.5):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# should fall through since ``StopAsyncIteration`` # should fall through since ``StopAsyncIteration``
@ -405,14 +379,11 @@ async def test_callee_closes_ctx_after_stream_open(
# a ``trio.EndOfChannel`` by # a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()`` # ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream: async for _ in stream:
# trigger failure if we DO NOT
# get an EOC!
assert 0 assert 0
else: else:
# verify stream is now closed # verify stream is now closed
try: try:
with trio.fail_after(0.3):
await stream.receive() await stream.receive()
except trio.EndOfChannel: except trio.EndOfChannel:
pass pass
@ -434,7 +405,7 @@ async def test_callee_closes_ctx_after_stream_open(
@tractor.context @tractor.context
async def expect_cancelled( async def expect_cancelled(
ctx: Context, ctx: tractor.Context,
) -> None: ) -> None:
global _state global _state
@ -463,15 +434,11 @@ async def expect_cancelled(
@tractor_test @tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream( async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool, use_ctx_cancel_method: bool,
debug_mode: bool,
): ):
''' 'caller context closes without using stream'
caller context closes without using/opening stream
async with tractor.open_nursery() as an:
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
root: Actor = current_actor() root: Actor = current_actor()
portal = await an.start_actor( portal = await an.start_actor(
@ -555,13 +522,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
@tractor_test @tractor_test
async def test_multitask_caller_cancels_from_nonroot_task( async def test_multitask_caller_cancels_from_nonroot_task():
debug_mode: bool,
): async with tractor.open_nursery() as n:
async with tractor.open_nursery(
debug_mode=debug_mode, portal = await n.start_actor(
) as an:
portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -608,7 +573,7 @@ async def test_multitask_caller_cancels_from_nonroot_task(
@tractor.context @tractor.context
async def cancel_self( async def cancel_self(
ctx: Context, ctx: tractor.Context,
) -> None: ) -> None:
global _state global _state
@ -645,20 +610,16 @@ async def cancel_self(
raise RuntimeError('Context didnt cancel itself?!') raise RuntimeError('Context didnt cancel itself?!')
@tractor_test @tractor_test
async def test_callee_cancels_before_started( async def test_callee_cancels_before_started():
debug_mode: bool,
):
''' '''
Callee calls `Context.cancel()` while streaming and caller Callee calls `Context.cancel()` while streaming and caller
sees stream terminated in `ContextCancelled`. sees stream terminated in `ContextCancelled`.
''' '''
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode,
) as an: portal = await n.start_actor(
portal = await an.start_actor(
'cancels_self', 'cancels_self',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -684,7 +645,7 @@ async def test_callee_cancels_before_started(
@tractor.context @tractor.context
async def never_open_stream( async def never_open_stream(
ctx: Context, ctx: tractor.Context,
) -> None: ) -> None:
''' '''
@ -698,8 +659,8 @@ async def never_open_stream(
@tractor.context @tractor.context
async def keep_sending_from_callee( async def keep_sending_from_callee(
ctx: Context, ctx: tractor.Context,
msg_buffer_size: int|None = None, msg_buffer_size: Optional[int] = None,
) -> None: ) -> None:
''' '''
@ -724,10 +685,7 @@ async def keep_sending_from_callee(
], ],
ids='overrun_condition={}'.format, ids='overrun_condition={}'.format,
) )
def test_one_end_stream_not_opened( def test_one_end_stream_not_opened(overrun_by):
overrun_by: tuple[str, int, Callable],
debug_mode: bool,
):
''' '''
This should exemplify the bug from: This should exemplify the bug from:
https://github.com/goodboy/tractor/issues/265 https://github.com/goodboy/tractor/issues/265
@ -738,10 +696,8 @@ def test_one_end_stream_not_opened(
buf_size = buf_size_increase + Actor.msg_buffer_size buf_size = buf_size_increase + Actor.msg_buffer_size
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode, portal = await n.start_actor(
) as an:
portal = await an.start_actor(
entrypoint.__name__, entrypoint.__name__,
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -798,7 +754,7 @@ def test_one_end_stream_not_opened(
@tractor.context @tractor.context
async def echo_back_sequence( async def echo_back_sequence(
ctx: Context, ctx: tractor.Context,
seq: list[int], seq: list[int],
wait_for_cancel: bool, wait_for_cancel: bool,
allow_overruns_side: str, allow_overruns_side: str,
@ -881,7 +837,6 @@ def test_maybe_allow_overruns_stream(
slow_side: str, slow_side: str,
allow_overruns_side: str, allow_overruns_side: str,
loglevel: str, loglevel: str,
debug_mode: bool,
): ):
''' '''
Demonstrate small overruns of each task back and forth Demonstrate small overruns of each task back and forth
@ -900,14 +855,13 @@ def test_maybe_allow_overruns_stream(
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode, portal = await n.start_actor(
) as an:
portal = await an.start_actor(
'callee_sends_forever', 'callee_sends_forever',
enable_modules=[__name__], enable_modules=[__name__],
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode,
# debug_mode=True,
) )
seq = list(range(10)) seq = list(range(10))
async with portal.open_context( async with portal.open_context(

View File

@ -123,9 +123,7 @@ async def error_before_started(
await peer_ctx.cancel() await peer_ctx.cancel()
def test_do_not_swallow_error_before_started_by_remote_contextcancelled( def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
debug_mode: bool,
):
''' '''
Verify that an error raised in a remote context which itself Verify that an error raised in a remote context which itself
opens YET ANOTHER remote context, which it then cancels, does not opens YET ANOTHER remote context, which it then cancels, does not
@ -134,9 +132,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode,
) as n:
portal = await n.start_actor( portal = await n.start_actor(
'errorer', 'errorer',
enable_modules=[__name__], enable_modules=[__name__],
@ -229,16 +225,13 @@ async def stream_from_peer(
# NOTE: cancellation of the (sleeper) peer should always # NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming # cause a `ContextCancelled` raise in this streaming
# actor. # actor.
except ContextCancelled as ctxc: except ContextCancelled as ctxerr:
ctxerr = ctxc err = ctxerr
assert peer_ctx._remote_error is ctxerr assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
assert peer_ctx.canceller == ctxerr.canceller assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester # caller peer should not be the cancel requester
assert not ctx.cancel_called assert not ctx.cancel_called
# XXX can never be true since `._invoke` only # XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task # sets this AFTER the nursery block this task
# was started in, exits. # was started in, exits.
@ -276,7 +269,9 @@ async def stream_from_peer(
# assert ctx.canceller[0] == 'root' # assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper' # assert peer_ctx.canceller[0] == 'sleeper'
raise RuntimeError('Never triggered local `ContextCancelled` ?!?') raise RuntimeError(
'peer never triggered local `ContextCancelled`?'
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -285,7 +280,6 @@ async def stream_from_peer(
) )
def test_peer_canceller( def test_peer_canceller(
error_during_ctxerr_handling: bool, error_during_ctxerr_handling: bool,
debug_mode: bool,
): ):
''' '''
Verify that a cancellation triggered by an in-actor-tree peer Verify that a cancellation triggered by an in-actor-tree peer
@ -342,7 +336,7 @@ def test_peer_canceller(
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this. # NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode, # debug_mode=True
) as an: ) as an:
canceller: Portal = await an.start_actor( canceller: Portal = await an.start_actor(
'canceller', 'canceller',
@ -383,8 +377,7 @@ def test_peer_canceller(
try: try:
print('PRE CONTEXT RESULT') print('PRE CONTEXT RESULT')
res = await sleeper_ctx.result() await sleeper_ctx.result()
assert res
# should never get here # should never get here
pytest.fail( pytest.fail(
@ -394,10 +387,7 @@ def test_peer_canceller(
# should always raise since this root task does # should always raise since this root task does
# not request the sleeper cancellation ;) # not request the sleeper cancellation ;)
except ContextCancelled as ctxerr: except ContextCancelled as ctxerr:
print( print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
'CAUGHT REMOTE CONTEXT CANCEL FOM\n'
f'{ctxerr}'
)
# canceller and caller peers should not # canceller and caller peers should not
# have been remotely cancelled. # have been remotely cancelled.
@ -420,31 +410,16 @@ def test_peer_canceller(
# XXX SHOULD NEVER EVER GET HERE XXX # XXX SHOULD NEVER EVER GET HERE XXX
except BaseException as berr: except BaseException as berr:
raise err = berr
pytest.fail('did not rx ctx-cancelled error?')
# XXX if needed to debug failure
# _err = berr
# await tractor.pause()
# await trio.sleep_forever()
pytest.fail(
'did not rx ctxc ?!?\n\n'
f'{berr}\n'
)
else: else:
pytest.fail( pytest.fail('did not rx ctx-cancelled error?')
'did not rx ctxc ?!?\n\n'
f'{ctxs}\n'
)
except ( except (
ContextCancelled, ContextCancelled,
RuntimeError, RuntimeError,
)as loc_err: )as ctxerr:
_loc_err = loc_err _err = ctxerr
# NOTE: the main state to check on `Context` is: # NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs) # - `.cancelled_caught` (maps to nursery cs)
@ -461,7 +436,7 @@ def test_peer_canceller(
# `ContextCancelled` inside `.open_context()` # `ContextCancelled` inside `.open_context()`
# block # block
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
assert isinstance(loc_err, RuntimeError) assert isinstance(ctxerr, RuntimeError)
# NOTE: this root actor task should have # NOTE: this root actor task should have
# called `Context.cancel()` on the # called `Context.cancel()` on the
@ -497,10 +472,9 @@ def test_peer_canceller(
# CASE: standard teardown inside in `.open_context()` block # CASE: standard teardown inside in `.open_context()` block
else: else:
assert isinstance(loc_err, ContextCancelled) assert ctxerr.canceller == sleeper_ctx.canceller
assert loc_err.canceller == sleeper_ctx.canceller
assert ( assert (
loc_err.canceller[0] ctxerr.canceller[0]
== ==
sleeper_ctx.canceller[0] sleeper_ctx.canceller[0]
== ==
@ -510,7 +484,7 @@ def test_peer_canceller(
# the sleeper's remote error is the error bubbled # the sleeper's remote error is the error bubbled
# out of the context-stack above! # out of the context-stack above!
re = sleeper_ctx._remote_error re = sleeper_ctx._remote_error
assert re is loc_err assert re is ctxerr
for ctx in ctxs: for ctx in ctxs:
re: BaseException | None = ctx._remote_error re: BaseException | None = ctx._remote_error
@ -580,14 +554,3 @@ def test_peer_canceller(
assert excinfo.value.type == ContextCancelled assert excinfo.value.type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller' assert excinfo.value.canceller[0] == 'canceller'
def test_client_tree_spawns_and_cancels_service_subactor():
...
# TODO: test for the modden `mod wks open piker` bug!
# -> start actor-tree (server) that offers sub-actor spawns via
# context API
# -> start another full actor-tree (client) which requests to the first to
# spawn over its `@context` ep / api.
# -> client actor cancels the context and should exit gracefully
# and the server's spawned child should cancel and terminate!

View File

@ -33,15 +33,12 @@ import exceptiongroup as eg
import trio import trio
from ._state import current_actor from ._state import current_actor
from .log import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from ._context import Context from ._context import Context
from ._stream import MsgStream from ._stream import MsgStream
from .log import StackLevelAdapter from .log import StackLevelAdapter
log = get_logger('tractor')
_this_mod = importlib.import_module(__name__) _this_mod = importlib.import_module(__name__)
@ -115,36 +112,11 @@ class ContextCancelled(RemoteActorError):
''' '''
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str] | None:
'''
Return the (maybe) `Actor.uid` for the requesting-author
of this ctxc.
Emit a warning msg when `.canceller` has not been set,
which usually idicates that a `None` msg-loop setinel was
sent before expected in the runtime. This can happen in
a few situations:
- (simulating) an IPC transport network outage
- a (malicious) pkt sent specifically to cancel an actor's
runtime non-gracefully without ensuring ongoing RPC tasks are
incrementally cancelled as is done with:
`Actor`
|_`.cancel()`
|_`.cancel_soon()`
|_`._cancel_task()`
'''
value = self.msgdata.get('canceller') value = self.msgdata.get('canceller')
if value: if value:
return tuple(value) return tuple(value)
log.warning(
'IPC Context cancelled without a requesting actor?\n'
'Maybe the IPC transport ended abruptly?\n\n'
f'{self}'
)
class TransportClosed(trio.ClosedResourceError): class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use" "Underlying channel transport was closed prior to use"
@ -227,6 +199,7 @@ def pack_error(
): ):
error_msg.update(exc.msgdata) error_msg.update(exc.msgdata)
pkt: dict = {'error': error_msg} pkt: dict = {'error': error_msg}
if cid: if cid:
pkt['cid'] = cid pkt['cid'] = cid
@ -237,10 +210,8 @@ def pack_error(
def unpack_error( def unpack_error(
msg: dict[str, Any], msg: dict[str, Any],
chan=None, chan=None,
err_type=RemoteActorError, err_type=RemoteActorError,
hide_tb: bool = True, hide_tb: bool = True,
) -> None|Exception: ) -> None|Exception:
@ -316,61 +287,37 @@ def _raise_from_no_key_in_msg(
msg: dict, msg: dict,
src_err: KeyError, src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield', expect_key: str = 'yield',
stream: MsgStream | None = None, stream: MsgStream | None = None,
# allow "deeper" tbs when debugging B^o
hide_tb: bool = True,
) -> bool: ) -> bool:
''' '''
Raise an appopriate local error when a Raise an appopriate local error when a `MsgStream` msg arrives
`MsgStream` msg arrives which does not which does not contain the expected (under normal operation)
contain the expected (at least under normal `'yield'` field.
operation) `'yield'` field.
`Context` and any embedded `MsgStream` termination,
as well as remote task errors are handled in order
of priority as:
- any 'error' msg is re-boxed and raised locally as
-> `RemoteActorError`|`ContextCancelled`
- a `MsgStream` 'stop' msg is constructed, assigned
and raised locally as -> `trio.EndOfChannel`
- All other mis-keyed msgss (like say a "final result"
'return' msg, normally delivered from `Context.result()`)
are re-boxed inside a `MessagingError` with an explicit
exc content describing the missing IPC-msg-key.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = True
# an internal error should never get here # internal error should never get here
try: try:
cid: str = msg['cid'] cid: str = msg['cid']
except KeyError as src_err: except KeyError as src_err:
raise MessagingError( raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n\n' f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) from src_err ) from src_err
# TODO: test that shows stream raising an expected error!!! # TODO: test that shows stream raising an expected error!!!
# raise the error message in a boxed exception type!
if msg.get('error'): if msg.get('error'):
# raise the error message
raise unpack_error( raise unpack_error(
msg, msg,
ctx.chan, ctx.chan,
hide_tb=hide_tb,
) from None ) from None
# `MsgStream` termination msg.
elif ( elif (
msg.get('stop') msg.get('stop')
or ( or (
@ -383,26 +330,29 @@ def _raise_from_no_key_in_msg(
f'cid: {cid}\n' f'cid: {cid}\n'
) )
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# TODO: if the a local task is already blocking on # TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the # a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that # rx-chan, we close the chan and set state ensuring that
# an eoc is raised! # an eoc is raised!
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to # XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch # raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``. # block below it will trigger ``.aclose()``.
eoc = trio.EndOfChannel( raise trio.EndOfChannel(
f'Context stream ended due to msg:\n\n' f'Context stream ended due to msg:\n'
f'{pformat(msg)}\n' f'{pformat(msg)}'
) ) from src_err
# XXX: important to set so that a new `.receive()`
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the `return` message
# value out of the underlying feed mem chan which is
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
raise eoc from src_err
if ( if (
stream stream

View File

@ -138,19 +138,13 @@ async def open_root_actor(
) )
assert registry_addrs assert registry_addrs
loglevel = ( loglevel = (loglevel or log._default_loglevel).upper()
loglevel
or log._default_loglevel
).upper()
if ( if debug_mode and _spawn._spawn_method == 'trio':
debug_mode
and _spawn._spawn_method == 'trio'
):
_state._runtime_vars['_debug_mode'] = True _state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing for # expose internal debug module to every actor allowing
# use of ``await tractor.pause()`` # for use of ``await tractor.breakpoint()``
enable_modules.append('tractor.devx._debug') enable_modules.append('tractor.devx._debug')
# if debug mode get's enabled *at least* use that level of # if debug mode get's enabled *at least* use that level of
@ -169,20 +163,7 @@ async def open_root_actor(
"Debug mode is only supported for the `trio` backend!" "Debug mode is only supported for the `trio` backend!"
) )
assert loglevel log.get_console_log(loglevel)
_log = log.get_console_log(loglevel)
assert _log
# TODO: factor this into `.devx._stackscope`!!
if debug_mode:
try:
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = [] ponged_addrs: list[tuple[str, int]] = []

View File

@ -48,12 +48,15 @@ import trio
from trio import ( from trio import (
CancelScope, CancelScope,
) )
from trio.lowlevel import (
current_task,
Task,
)
from trio_typing import ( from trio_typing import (
Nursery, Nursery,
TaskStatus, TaskStatus,
) )
from .msg import NamespacePath
from ._ipc import Channel from ._ipc import Channel
from ._context import ( from ._context import (
mk_context, mk_context,
@ -142,9 +145,8 @@ async def _invoke(
cs: CancelScope | None = None cs: CancelScope | None = None
ctx = actor.get_context( ctx = actor.get_context(
chan=chan, chan,
cid=cid, cid,
nsf=NamespacePath.from_ref(func),
# We shouldn't ever need to pass this through right? # We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to # it's up to the soon-to-be called rpc task to
# open the stream with this option. # open the stream with this option.
@ -274,8 +276,8 @@ async def _invoke(
# TODO: should would be nice to have our # TODO: should would be nice to have our
# `TaskMngr` nursery here! # `TaskMngr` nursery here!
res: Any = await coro # res: Any = await coro
ctx._result = res res = await coro
# deliver final result to caller side. # deliver final result to caller side.
await chan.send({ await chan.send({
@ -312,18 +314,11 @@ async def _invoke(
# don't pop the local context until we know the # don't pop the local context until we know the
# associated child isn't in debug any more # associated child isn't in debug any more
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop( ctx: Context = actor._contexts.pop((chan.uid, cid))
(chan.uid, cid)
)
res_str: str = (
'error: {ctx._local_error}'
if ctx._local_error
else f'result: {ctx._result}'
)
log.cancel( log.cancel(
f'IPC context terminated with final {res_str}\n' f'Context task was terminated:\n'
f'|_{pformat(ctx)}\n' f'func: {func}\n'
f'ctx: {pformat(ctx)}'
) )
if ctx.cancelled_caught: if ctx.cancelled_caught:
@ -336,6 +331,7 @@ async def _invoke(
ctx._maybe_raise_remote_err(re) ctx._maybe_raise_remote_err(re)
# fname: str = func.__name__ # fname: str = func.__name__
task: Task = current_task()
cs: CancelScope = ctx._scope cs: CancelScope = ctx._scope
if cs.cancel_called: if cs.cancel_called:
our_uid: tuple = actor.uid our_uid: tuple = actor.uid
@ -382,16 +378,16 @@ async def _invoke(
div_str + div_str +
f'<= canceller: {canceller}\n' f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n' f'=> uid: {our_uid}\n'
f' |_{ctx._task}()\n' f' |_ task: `{task.name}()`'
) )
# TODO: does this ever get set any more or can # TODO: does this ever get set any more or can
# we remove it? # we remove it?
if ctx._cancel_msg: if ctx._cancel_msg:
msg += ( msg += (
# '------ - ------\n' '------ - ------\n'
# 'IPC msg:\n' 'IPC msg:\n'
f'\n{ctx._cancel_msg}' f'{ctx._cancel_msg}'
) )
# task-contex was either cancelled by request using # task-contex was either cancelled by request using
@ -439,12 +435,7 @@ async def _invoke(
task_status.started(ctx) task_status.started(ctx)
result = await coro result = await coro
fname: str = func.__name__ fname: str = func.__name__
log.runtime( log.runtime(f'{fname}() result: {result}')
'RPC complete:\n'
f'task: {ctx._task}\n'
f'|_cid={ctx.cid}\n'
f'|_{fname}() -> {pformat(result)}\n'
)
# NOTE: only send result if we know IPC isn't down # NOTE: only send result if we know IPC isn't down
if ( if (
@ -974,7 +965,7 @@ class Actor:
# and bail after timeout (2-generals on closure). # and bail after timeout (2-generals on closure).
assert chan.msgstream assert chan.msgstream
log.warning( log.runtime(
f'Draining lingering msgs from stream {chan.msgstream}' f'Draining lingering msgs from stream {chan.msgstream}'
) )
@ -986,24 +977,13 @@ class Actor:
# making sure any RPC response to that call is # making sure any RPC response to that call is
# delivered the local calling task. # delivered the local calling task.
# TODO: factor this into a helper? # TODO: factor this into a helper?
log.warning( log.runtime(f'drained {msg} for {chan.uid}')
'Draining msg from disconnected\n'
f'peer: {chan.uid}]\n\n'
f'{pformat(msg)}\n'
)
cid = msg.get('cid') cid = msg.get('cid')
if cid: if cid:
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result( await self._push_result(chan, cid, msg)
chan,
cid,
msg,
)
log.runtime( log.runtime('Waiting on actor nursery to exit..')
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
await local_nursery.exited.wait() await local_nursery.exited.wait()
if disconnected: if disconnected:
@ -1187,7 +1167,6 @@ class Actor:
self, self,
chan: Channel, chan: Channel,
cid: str, cid: str,
nsf: NamespacePath,
msg_buffer_size: int | None = None, msg_buffer_size: int | None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
@ -1201,15 +1180,11 @@ class Actor:
task-as-function invocation. task-as-function invocation.
''' '''
log.runtime(f"Getting result queue for {chan.uid} cid {cid}")
actor_uid = chan.uid actor_uid = chan.uid
assert actor_uid assert actor_uid
try: try:
ctx = self._contexts[(actor_uid, cid)] ctx = self._contexts[(actor_uid, cid)]
log.runtime(
f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n'
f'cid:{cid}\n'
)
ctx._allow_overruns = allow_overruns ctx._allow_overruns = allow_overruns
# adjust buffer size if specified # adjust buffer size if specified
@ -1218,15 +1193,9 @@ class Actor:
state.max_buffer_size = msg_buffer_size state.max_buffer_size = msg_buffer_size
except KeyError: except KeyError:
log.runtime(
f'Creating NEW IPC ctx for\n'
f'peer: {chan.uid}\n'
f'cid: {cid}\n'
)
ctx = mk_context( ctx = mk_context(
chan, chan,
cid, cid,
nsf=nsf,
msg_buffer_size=msg_buffer_size or self.msg_buffer_size, msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
_allow_overruns=allow_overruns, _allow_overruns=allow_overruns,
) )
@ -1237,13 +1206,11 @@ class Actor:
async def start_remote_task( async def start_remote_task(
self, self,
chan: Channel, chan: Channel,
nsf: NamespacePath, ns: str,
func: str,
kwargs: dict, kwargs: dict,
# IPC channel config
msg_buffer_size: int | None = None, msg_buffer_size: int | None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False,
) -> Context: ) -> Context:
''' '''
@ -1258,43 +1225,20 @@ class Actor:
cid = str(uuid.uuid4()) cid = str(uuid.uuid4())
assert chan.uid assert chan.uid
ctx = self.get_context( ctx = self.get_context(
chan=chan, chan,
cid=cid, cid,
nsf=nsf,
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
if (
'self' in nsf
or not load_nsf
):
ns, _, func = nsf.partition(':')
else:
# TODO: pass nsf directly over wire!
# -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple()
log.runtime(
'Sending cmd to\n'
f'peer: {chan.uid} => \n'
'\n'
f'=> {ns}.{func}({kwargs})\n'
)
await chan.send( await chan.send(
{'cmd': ( {'cmd': (ns, func, kwargs, self.uid, cid)}
ns,
func,
kwargs,
self.uid,
cid,
)}
) )
# Wait on first response msg and validate; this should be # Wait on first response msg and validate; this should be
# immediate. # immediate.
first_msg: dict = await ctx._recv_chan.receive() first_msg = await ctx._recv_chan.receive()
functype: str = first_msg.get('functype') functype = first_msg.get('functype')
if 'error' in first_msg: if 'error' in first_msg:
raise unpack_error(first_msg, chan) raise unpack_error(first_msg, chan)
@ -1336,19 +1280,14 @@ class Actor:
parent_data: dict[str, Any] parent_data: dict[str, Any]
parent_data = await chan.recv() parent_data = await chan.recv()
log.runtime( log.runtime(
'Received state from parent:\n\n' "Received state from parent:\n"
# TODO: eventually all these msgs as f"{parent_data}"
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pformat(parent_data)}\n'
) )
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars') rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']: if rvs['_debug_mode']:
try: try:
log.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig from .devx import enable_stack_on_sig
enable_stack_on_sig() enable_stack_on_sig()
except ImportError: except ImportError:
@ -1429,8 +1368,7 @@ class Actor:
for listener in listeners for listener in listeners
] ]
log.runtime( log.runtime(
'Started TCP server(s)\n' f'Started tcp server(s) on {sockets}'
f'|_{sockets}\n'
) )
self._listeners.extend(listeners) self._listeners.extend(listeners)
@ -1542,20 +1480,8 @@ class Actor:
# be cancelled was indeed spawned by a request from this channel # be cancelled was indeed spawned by a request from this channel
ctx, func, is_complete = self._rpc_tasks[(chan, cid)] ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope: CancelScope = ctx._scope scope: CancelScope = ctx._scope
except KeyError: except KeyError:
# NOTE: during msging race conditions this will often log.cancel(f"{cid} has already completed/terminated?")
# emit, some examples:
# - callee returns a result before cancel-msg/ctxc-raised
# - callee self raises ctxc before caller send request,
# - callee errors prior to cancel req.
log.cancel(
'Cancel request invalid, RPC task already completed?\n'
f'<= canceller: {requesting_uid}\n'
f' |_{chan}\n\n'
f'=> ctx id: {cid}\n'
)
return True return True
log.cancel( log.cancel(
@ -1997,7 +1923,7 @@ async def process_messages(
log.runtime( log.runtime(
'Entering IPC msg loop:\n' 'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'|_{chan}\n' f'|_{chan}'
) )
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: dict | None = None msg: dict | None = None
@ -2034,10 +1960,8 @@ async def process_messages(
log.transport( # type: ignore log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n' f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: conditionally avoid fmting depending # TODO: conditionally avoid fmting depending
# on log level (for perf)? # on log level (for perf)?
# => specifically `pformat()` sub-call..?
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
@ -2045,35 +1969,19 @@ async def process_messages(
if cid: if cid:
# deliver response to local caller/waiter # deliver response to local caller/waiter
# via its per-remote-context memory channel. # via its per-remote-context memory channel.
await actor._push_result( await actor._push_result(chan, cid, msg)
chan,
cid,
msg,
)
log.runtime( log.runtime(
'Waiting on next IPC msg from\n' f'Waiting on next IPC msg from {chan.uid}:\n'
f'peer: {chan.uid}:\n'
f'|_{chan}\n'
# f'last msg: {msg}\n' # f'last msg: {msg}\n'
f'|_{chan}'
) )
continue continue
# process a 'cmd' request-msg upack # TODO: implement with ``match:`` syntax?
# TODO: impl with native `msgspec.Struct` support !! # process command request
# -[ ] implement with ``match:`` syntax?
# -[ ] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
try: try:
( ns, funcname, kwargs, actorid, cid = msg['cmd']
ns,
funcname,
kwargs,
actorid,
cid,
) = msg['cmd']
except KeyError: except KeyError:
# This is the non-rpc error case, that is, an # This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()`` # error **not** raised inside a call to ``_invoke()``
@ -2086,33 +1994,29 @@ async def process_messages(
raise exc raise exc
log.runtime( log.runtime(
'Handling RPC cmd from\n' f"Processing request from {actorid}\n"
f'peer: {actorid}\n' f"{ns}.{funcname}({kwargs})")
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
if ns == 'self': if ns == 'self':
uid: tuple = chan.uid
if funcname == 'cancel': if funcname == 'cancel':
func: Callable = actor.cancel func: Callable = actor.cancel
kwargs['requesting_uid'] = uid kwargs['requesting_uid'] = chan.uid
# don't start entire actor runtime cancellation # don't start entire actor runtime cancellation
# if this actor is currently in debug mode! # if this actor is currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
if pdb_complete: if pdb_complete:
await pdb_complete.wait() await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()` # we immediately start the runtime machinery
# was called, so terminate this IPC msg # shutdown
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True): with CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``async_main()``
log.cancel( log.cancel(
f'Cancel request for `Actor` runtime\n' "Actor runtime for was remotely cancelled "
f'<= canceller: {uid}\n' f"by {chan.uid}"
# f'=> uid: {actor.uid}\n'
) )
await _invoke( await _invoke(
actor, actor,
@ -2139,10 +2043,9 @@ async def process_messages(
target_cid = kwargs['cid'] target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid kwargs['requesting_uid'] = chan.uid
log.cancel( log.cancel(
f'Rx task cancel request\n' f'Remote request to cancel task\n'
f'<= canceller: {chan.uid}\n' f'remote actor: {chan.uid}\n'
f'=> uid: {actor.uid}\n' f'task: {target_cid}'
f' |_cid: {target_cid}\n'
) )
try: try:
await _invoke( await _invoke(
@ -2202,18 +2105,17 @@ async def process_messages(
# in the lone case where a ``Context`` is not # in the lone case where a ``Context`` is not
# delivered, it's likely going to be a locally # delivered, it's likely going to be a locally
# scoped exception from ``_invoke()`` itself. # scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception): if isinstance(ctx, Exception):
log.warning( log.warning(
'Task for RPC failed?' f"Task for RPC func {func} failed with"
f'|_ {func}()\n\n' f"{ctx}"
f'{err}'
) )
continue continue
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event() actor._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
# cancelled gracefully if requested # cancelled gracefully if requested
@ -2224,10 +2126,7 @@ async def process_messages(
) )
log.runtime( log.runtime(
'Waiting on next IPC msg from\n' f"Waiting on next msg for {chan} from {chan.uid}")
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
# end of async for, channel disconnect vis # end of async for, channel disconnect vis
# ``trio.EndOfChannel`` # ``trio.EndOfChannel``
@ -2244,12 +2143,9 @@ async def process_messages(
# handshake for them (yet) and instead we simply bail out of # handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean # the message loop and expect the teardown sequence to clean
# up. # up.
# TODO: don't show this msg if it's an emphemeral
# discovery ep call?
log.runtime( log.runtime(
f'channel closed abruptly with\n' f'channel from {chan.uid} closed abruptly:\n'
f'peer: {chan.uid}\n' f'-> {chan.raddr}\n'
f'|_{chan.raddr}\n'
) )
# transport **was** disconnected # transport **was** disconnected
@ -2291,11 +2187,9 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( log.runtime(
'Exiting IPC msg loop with\n' f'Exiting IPC msg loop with {chan.uid} '
f'peer: {chan.uid}\n' f'final msg: {msg}\n'
f'|_{chan}\n\n' f'|_{chan}'
'final msg:\n'
f'{pformat(msg)}\n'
) )
# transport **was not** disconnected # transport **was not** disconnected

View File

@ -400,7 +400,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
else: else:
log.exception( log.exception(
f"Nursery for {current_actor().uid} " f"Nursery for {current_actor().uid} "
"errored with:" "errored with\n"
# TODO: same thing as in # TODO: same thing as in
# `._invoke()` to compute how to # `._invoke()` to compute how to

View File

@ -1,19 +1,18 @@
# tractor: structured concurrent "actors". # tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet. # Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or # This program is free software: you can redistribute it and/or modify
# modify it under the terms of the GNU Affero General Public License # it under the terms of the GNU Affero General Public License as published by
# as published by the Free Software Foundation, either version 3 of # the Free Software Foundation, either version 3 of the License, or
# the License, or (at your option) any later version. # (at your option) any later version.
# This program is distributed in the hope that it will be useful, but # This program is distributed in the hope that it will be useful,
# WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Affero General Public License for more details. # GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public # You should have received a copy of the GNU Affero General Public License
# License along with this program. If not, see # along with this program. If not, see <https://www.gnu.org/licenses/>.
# <https://www.gnu.org/licenses/>.
""" """
Multi-core debugging for da peeps! Multi-core debugging for da peeps!
@ -44,7 +43,6 @@ from types import FrameType
import pdbp import pdbp
import tractor import tractor
import trio import trio
from trio.lowlevel import current_task
from trio_typing import ( from trio_typing import (
TaskStatus, TaskStatus,
# Task, # Task,
@ -52,7 +50,6 @@ from trio_typing import (
from ..log import get_logger from ..log import get_logger
from .._state import ( from .._state import (
current_actor,
is_root_process, is_root_process,
debug_mode, debug_mode,
) )
@ -241,7 +238,7 @@ async def _acquire_debug_lock_from_root_task(
to the ``pdb`` repl. to the ``pdb`` repl.
''' '''
task_name: str = current_task().name task_name: str = trio.lowlevel.current_task().name
we_acquired: bool = False we_acquired: bool = False
log.runtime( log.runtime(
@ -326,7 +323,8 @@ async def lock_tty_for_child(
highly reliable at releasing the mutex complete! highly reliable at releasing the mutex complete!
''' '''
task_name: str = current_task().name task_name = trio.lowlevel.current_task().name
if tuple(subactor_uid) in Lock._blocked: if tuple(subactor_uid) in Lock._blocked:
log.warning( log.warning(
f'Actor {subactor_uid} is blocked from acquiring debug lock\n' f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
@ -409,13 +407,11 @@ async def wait_for_parent_stdin_hijack(
assert val == 'Locked' assert val == 'Locked'
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
try:
# unblock local caller # unblock local caller
try:
assert Lock.local_pdb_complete assert Lock.local_pdb_complete
task_status.started(cs) task_status.started(cs)
# wait for local task to exit and
# release the REPL
await Lock.local_pdb_complete.wait() await Lock.local_pdb_complete.wait()
finally: finally:
@ -472,7 +468,7 @@ def shield_sigint_handler(
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
actor = current_actor() actor = tractor.current_actor()
# print(f'{actor.uid} in HANDLER with ') # print(f'{actor.uid} in HANDLER with ')
def do_cancel(): def do_cancel():
@ -617,7 +613,7 @@ def _set_trace(
shield: bool = False, shield: bool = False,
): ):
__tracebackhide__: bool = True __tracebackhide__: bool = True
actor: tractor.Actor = actor or current_actor() actor: tractor.Actor = actor or tractor.current_actor()
# start 2 levels up in user code # start 2 levels up in user code
frame: FrameType | None = sys._getframe() frame: FrameType | None = sys._getframe()
@ -687,9 +683,9 @@ async def pause(
''' '''
# __tracebackhide__ = True # __tracebackhide__ = True
actor = current_actor() actor = tractor.current_actor()
pdb, undo_sigint = mk_mpdb() pdb, undo_sigint = mk_mpdb()
task_name: str = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
if ( if (
not Lock.local_pdb_complete not Lock.local_pdb_complete
@ -840,7 +836,7 @@ async def pause(
# runtime aware version which takes care of all . # runtime aware version which takes care of all .
def pause_from_sync() -> None: def pause_from_sync() -> None:
print("ENTER SYNC PAUSE") print("ENTER SYNC PAUSE")
actor: tractor.Actor = current_actor( actor: tractor.Actor = tractor.current_actor(
err_on_no_runtime=False, err_on_no_runtime=False,
) )
if actor: if actor:
@ -975,10 +971,9 @@ async def acquire_debug_lock(
''' '''
Grab root's debug lock on entry, release on exit. Grab root's debug lock on entry, release on exit.
This helper is for actor's who don't actually need to acquired This helper is for actor's who don't actually need
the debugger but want to wait until the lock is free in the to acquired the debugger but want to wait until the
process-tree root such that they don't clobber an ongoing pdb lock is free in the process-tree root.
REPL session in some peer or child!
''' '''
if not debug_mode(): if not debug_mode():
@ -1018,71 +1013,43 @@ async def maybe_wait_for_debugger(
# tearing down. # tearing down.
sub_in_debug: tuple[str, str] | None = None sub_in_debug: tuple[str, str] | None = None
for istep in range(poll_steps): for _ in range(poll_steps):
if sub_in_debug := Lock.global_actor_in_debug: if Lock.global_actor_in_debug:
log.pdb( sub_in_debug = tuple(Lock.global_actor_in_debug)
f'Lock in use by {sub_in_debug}'
) log.debug('Root polling for debug')
# TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be with trio.CancelScope(shield=True):
# scheduled and grab the tty lock on the next await trio.sleep(poll_delay)
# tick?
# XXX => but it doesn't seem to work.. # TODO: could this make things more deterministic? wait
# to see if a sub-actor task will be scheduled and grab
# the tty lock on the next tick?
# XXX: doesn't seem to work
# await trio.testing.wait_all_tasks_blocked(cushion=0) # await trio.testing.wait_all_tasks_blocked(cushion=0)
debug_complete: trio.Event|None = Lock.no_remote_has_tty debug_complete = Lock.no_remote_has_tty
if ( if (
debug_complete debug_complete
and not debug_complete.is_set()
and sub_in_debug is not None and sub_in_debug is not None
and not debug_complete.is_set()
): ):
log.pdb( log.pdb(
'Root has errored but pdb is in use by child\n' 'Root has errored but pdb is in use by '
'Waiting on tty lock to release..\n' f'child {sub_in_debug}\n'
f'uid: {sub_in_debug}\n' 'Waiting on tty lock to release..'
) )
await debug_complete.wait() await debug_complete.wait()
log.pdb(
f'Child subactor released debug lock!\n'
f'uid: {sub_in_debug}\n'
)
if debug_complete.is_set():
break
# is no subactor locking debugger currently?
elif (
debug_complete is None
or sub_in_debug is None
):
log.pdb(
'Root acquired debug TTY LOCK from child\n'
f'uid: {sub_in_debug}'
)
break
else:
# TODO: don't need this right?
# await trio.lowlevel.checkpoint()
log.debug(
'Root polling for debug:\n'
f'poll step: {istep}\n'
f'poll delya: {poll_delay}'
)
with trio.CancelScope(shield=True):
await trio.sleep(poll_delay) await trio.sleep(poll_delay)
continue continue
else: else:
log.pdb('Root acquired debug TTY LOCK') log.debug(
'Root acquired TTY LOCK'
)
# else:
# # TODO: non-root call for #320?
# this_uid: tuple[str, str] = current_actor().uid
# async with acquire_debug_lock(
# subactor_uid=this_uid,
# ):
# pass
# TODO: better naming and what additionals? # TODO: better naming and what additionals?
# - [ ] optional runtime plugging? # - [ ] optional runtime plugging?

View File

@ -58,11 +58,6 @@ class NamespacePath(str):
''' '''
_ref: object | type | None = None _ref: object | type | None = None
# TODO: support providing the ns instance in
# order to support 'self.<meth>` style to make
# `Portal.run_from_ns()` work!
# _ns: ModuleType|type|None = None
def load_ref(self) -> object | type: def load_ref(self) -> object | type:
if self._ref is None: if self._ref is None:
self._ref = resolve_name(self) self._ref = resolve_name(self)
@ -105,13 +100,5 @@ class NamespacePath(str):
fqnp: tuple[str, str] = cls._mk_fqnp(ref) fqnp: tuple[str, str] = cls._mk_fqnp(ref)
return cls(':'.join(fqnp)) return cls(':'.join(fqnp))
def to_tuple( def to_tuple(self) -> tuple[str, str]:
self, return self._mk_fqnp(self.load_ref())
# TODO: could this work re `self:<meth>` case from above?
# load_ref: bool = True,
) -> tuple[str, str]:
return self._mk_fqnp(
self.load_ref()
)