Compare commits

..

No commits in common. "10adf34be582a92f55587259845b09a5054f2265" and "54a0a0000d15210590fab1ba79ce18d85e940e74" have entirely different histories.

9 changed files with 225 additions and 529 deletions

View File

@ -26,7 +26,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup(
name="tractor",
version='0.1.0a6dev0', # alpha zone
description='structured concurrent `trio`-"actors"',
description='structured concurrrent `trio`-"actors"',
long_description=readme,
license='AGPLv3',
author='Tyler Goodlet',
@ -50,7 +50,6 @@ setup(
'exceptiongroup',
# tooling
'stackscope',
'tricycle',
'trio_typing',
'colorlog',
@ -62,15 +61,16 @@ setup(
# debug mode REPL
'pdbp',
# TODO: distributed transport using
# linux kernel networking
# 'pyroute2',
# pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
# and pep:
# https://peps.python.org/pep-0440/#version-specifiers
# windows deps workaround for ``pdbpp``
# https://github.com/pdbpp/pdbpp/issues/498
# https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"',
],
tests_require=['pytest'],
python_requires=">=3.10",

View File

@ -8,9 +8,7 @@ sync-opening a ``tractor.Context`` beforehand.
# from contextlib import asynccontextmanager as acm
from itertools import count
import platform
from typing import (
Callable,
)
from typing import Optional
import pytest
import trio
@ -71,7 +69,7 @@ _state: bool = False
@tractor.context
async def too_many_starteds(
ctx: Context,
ctx: tractor.Context,
) -> None:
'''
Call ``Context.started()`` more then once (an error).
@ -86,7 +84,7 @@ async def too_many_starteds(
@tractor.context
async def not_started_but_stream_opened(
ctx: Context,
ctx: tractor.Context,
) -> None:
'''
Enter ``Context.open_stream()`` without calling ``.started()``.
@ -107,15 +105,11 @@ async def not_started_but_stream_opened(
],
ids='misuse_type={}'.format,
)
def test_started_misuse(
target: Callable,
debug_mode: bool,
):
def test_started_misuse(target):
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
async with tractor.open_nursery() as n:
portal = await n.start_actor(
target.__name__,
enable_modules=[__name__],
)
@ -130,7 +124,7 @@ def test_started_misuse(
@tractor.context
async def simple_setup_teardown(
ctx: Context,
ctx: tractor.Context,
data: int,
block_forever: bool = False,
@ -176,7 +170,6 @@ def test_simple_context(
error_parent,
callee_blocks_forever,
pointlessly_open_stream,
debug_mode: bool,
):
timeout = 1.5 if not platform.system() == 'Windows' else 4
@ -184,10 +177,9 @@ def test_simple_context(
async def main():
with trio.fail_after(timeout):
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
'simple_context',
enable_modules=[__name__],
)
@ -268,7 +260,6 @@ def test_caller_cancels(
cancel_method: str,
chk_ctx_result_before_exit: bool,
callee_returns_early: bool,
debug_mode: bool,
):
'''
Verify that when the opening side of a context (aka the caller)
@ -277,7 +268,7 @@ def test_caller_cancels(
'''
async def check_canceller(
ctx: Context,
ctx: tractor.Context,
) -> None:
# should not raise yet return the remote
# context cancelled error.
@ -296,10 +287,8 @@ def test_caller_cancels(
)
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
'simple_context',
enable_modules=[__name__],
)
@ -349,7 +338,7 @@ def test_caller_cancels(
@tractor.context
async def close_ctx_immediately(
ctx: Context,
ctx: tractor.Context,
) -> None:
@ -361,33 +350,17 @@ async def close_ctx_immediately(
@tractor_test
async def test_callee_closes_ctx_after_stream_open(
debug_mode: bool,
):
'''
callee context closes without using stream.
async def test_callee_closes_ctx_after_stream_open():
'callee context closes without using stream'
This should result in a msg sequence
|_<root>_
|_<fast_stream_closer>
async with tractor.open_nursery() as n:
<= {'started': <Any>, 'cid': <str>}
<= {'stop': True, 'cid': <str>}
<= {'result': Any, ..}
(ignored by child)
=> {'stop': True, 'cid': <str>}
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
portal = await n.start_actor(
'fast_stream_closer',
enable_modules=[__name__],
)
with trio.fail_after(0.5):
with trio.fail_after(2):
async with portal.open_context(
close_ctx_immediately,
@ -395,9 +368,10 @@ async def test_callee_closes_ctx_after_stream_open(
# cancel_on_exit=True,
) as (ctx, sent):
assert sent is None
with trio.fail_after(0.4):
with trio.fail_after(0.5):
async with ctx.open_stream() as stream:
# should fall through since ``StopAsyncIteration``
@ -405,14 +379,11 @@ async def test_callee_closes_ctx_after_stream_open(
# a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream:
# trigger failure if we DO NOT
# get an EOC!
assert 0
else:
# verify stream is now closed
try:
with trio.fail_after(0.3):
await stream.receive()
except trio.EndOfChannel:
pass
@ -434,7 +405,7 @@ async def test_callee_closes_ctx_after_stream_open(
@tractor.context
async def expect_cancelled(
ctx: Context,
ctx: tractor.Context,
) -> None:
global _state
@ -463,15 +434,11 @@ async def expect_cancelled(
@tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool,
debug_mode: bool,
):
'''
caller context closes without using/opening stream
'caller context closes without using stream'
async with tractor.open_nursery() as an:
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
root: Actor = current_actor()
portal = await an.start_actor(
@ -555,13 +522,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
@tractor_test
async def test_multitask_caller_cancels_from_nonroot_task(
debug_mode: bool,
):
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
async def test_multitask_caller_cancels_from_nonroot_task():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
@ -608,7 +573,7 @@ async def test_multitask_caller_cancels_from_nonroot_task(
@tractor.context
async def cancel_self(
ctx: Context,
ctx: tractor.Context,
) -> None:
global _state
@ -645,20 +610,16 @@ async def cancel_self(
raise RuntimeError('Context didnt cancel itself?!')
@tractor_test
async def test_callee_cancels_before_started(
debug_mode: bool,
):
async def test_callee_cancels_before_started():
'''
Callee calls `Context.cancel()` while streaming and caller
sees stream terminated in `ContextCancelled`.
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'cancels_self',
enable_modules=[__name__],
)
@ -684,7 +645,7 @@ async def test_callee_cancels_before_started(
@tractor.context
async def never_open_stream(
ctx: Context,
ctx: tractor.Context,
) -> None:
'''
@ -698,8 +659,8 @@ async def never_open_stream(
@tractor.context
async def keep_sending_from_callee(
ctx: Context,
msg_buffer_size: int|None = None,
ctx: tractor.Context,
msg_buffer_size: Optional[int] = None,
) -> None:
'''
@ -724,10 +685,7 @@ async def keep_sending_from_callee(
],
ids='overrun_condition={}'.format,
)
def test_one_end_stream_not_opened(
overrun_by: tuple[str, int, Callable],
debug_mode: bool,
):
def test_one_end_stream_not_opened(overrun_by):
'''
This should exemplify the bug from:
https://github.com/goodboy/tractor/issues/265
@ -738,10 +696,8 @@ def test_one_end_stream_not_opened(
buf_size = buf_size_increase + Actor.msg_buffer_size
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
async with tractor.open_nursery() as n:
portal = await n.start_actor(
entrypoint.__name__,
enable_modules=[__name__],
)
@ -798,7 +754,7 @@ def test_one_end_stream_not_opened(
@tractor.context
async def echo_back_sequence(
ctx: Context,
ctx: tractor.Context,
seq: list[int],
wait_for_cancel: bool,
allow_overruns_side: str,
@ -881,7 +837,6 @@ def test_maybe_allow_overruns_stream(
slow_side: str,
allow_overruns_side: str,
loglevel: str,
debug_mode: bool,
):
'''
Demonstrate small overruns of each task back and forth
@ -900,14 +855,13 @@ def test_maybe_allow_overruns_stream(
'''
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'callee_sends_forever',
enable_modules=[__name__],
loglevel=loglevel,
debug_mode=debug_mode,
# debug_mode=True,
)
seq = list(range(10))
async with portal.open_context(

View File

@ -123,9 +123,7 @@ async def error_before_started(
await peer_ctx.cancel()
def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
debug_mode: bool,
):
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
'''
Verify that an error raised in a remote context which itself
opens YET ANOTHER remote context, which it then cancels, does not
@ -134,9 +132,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
'''
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as n:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'errorer',
enable_modules=[__name__],
@ -229,16 +225,13 @@ async def stream_from_peer(
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxc:
ctxerr = ctxc
except ContextCancelled as ctxerr:
err = ctxerr
assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
# XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task
# was started in, exits.
@ -276,7 +269,9 @@ async def stream_from_peer(
# assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper'
raise RuntimeError('Never triggered local `ContextCancelled` ?!?')
raise RuntimeError(
'peer never triggered local `ContextCancelled`?'
)
@pytest.mark.parametrize(
@ -285,7 +280,6 @@ async def stream_from_peer(
)
def test_peer_canceller(
error_during_ctxerr_handling: bool,
debug_mode: bool,
):
'''
Verify that a cancellation triggered by an in-actor-tree peer
@ -342,7 +336,7 @@ def test_peer_canceller(
async def main():
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode,
# debug_mode=True
) as an:
canceller: Portal = await an.start_actor(
'canceller',
@ -383,8 +377,7 @@ def test_peer_canceller(
try:
print('PRE CONTEXT RESULT')
res = await sleeper_ctx.result()
assert res
await sleeper_ctx.result()
# should never get here
pytest.fail(
@ -394,10 +387,7 @@ def test_peer_canceller(
# should always raise since this root task does
# not request the sleeper cancellation ;)
except ContextCancelled as ctxerr:
print(
'CAUGHT REMOTE CONTEXT CANCEL FOM\n'
f'{ctxerr}'
)
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
# canceller and caller peers should not
# have been remotely cancelled.
@ -420,31 +410,16 @@ def test_peer_canceller(
# XXX SHOULD NEVER EVER GET HERE XXX
except BaseException as berr:
raise
# XXX if needed to debug failure
# _err = berr
# await tractor.pause()
# await trio.sleep_forever()
pytest.fail(
'did not rx ctxc ?!?\n\n'
f'{berr}\n'
)
err = berr
pytest.fail('did not rx ctx-cancelled error?')
else:
pytest.fail(
'did not rx ctxc ?!?\n\n'
f'{ctxs}\n'
)
pytest.fail('did not rx ctx-cancelled error?')
except (
ContextCancelled,
RuntimeError,
)as loc_err:
_loc_err = loc_err
)as ctxerr:
_err = ctxerr
# NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs)
@ -461,7 +436,7 @@ def test_peer_canceller(
# `ContextCancelled` inside `.open_context()`
# block
if error_during_ctxerr_handling:
assert isinstance(loc_err, RuntimeError)
assert isinstance(ctxerr, RuntimeError)
# NOTE: this root actor task should have
# called `Context.cancel()` on the
@ -497,10 +472,9 @@ def test_peer_canceller(
# CASE: standard teardown inside in `.open_context()` block
else:
assert isinstance(loc_err, ContextCancelled)
assert loc_err.canceller == sleeper_ctx.canceller
assert ctxerr.canceller == sleeper_ctx.canceller
assert (
loc_err.canceller[0]
ctxerr.canceller[0]
==
sleeper_ctx.canceller[0]
==
@ -510,7 +484,7 @@ def test_peer_canceller(
# the sleeper's remote error is the error bubbled
# out of the context-stack above!
re = sleeper_ctx._remote_error
assert re is loc_err
assert re is ctxerr
for ctx in ctxs:
re: BaseException | None = ctx._remote_error
@ -580,14 +554,3 @@ def test_peer_canceller(
assert excinfo.value.type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller'
def test_client_tree_spawns_and_cancels_service_subactor():
...
# TODO: test for the modden `mod wks open piker` bug!
# -> start actor-tree (server) that offers sub-actor spawns via
# context API
# -> start another full actor-tree (client) which requests to the first to
# spawn over its `@context` ep / api.
# -> client actor cancels the context and should exit gracefully
# and the server's spawned child should cancel and terminate!

View File

@ -33,15 +33,12 @@ import exceptiongroup as eg
import trio
from ._state import current_actor
from .log import get_logger
if TYPE_CHECKING:
from ._context import Context
from ._stream import MsgStream
from .log import StackLevelAdapter
log = get_logger('tractor')
_this_mod = importlib.import_module(__name__)
@ -116,35 +113,10 @@ class ContextCancelled(RemoteActorError):
'''
@property
def canceller(self) -> tuple[str, str] | None:
'''
Return the (maybe) `Actor.uid` for the requesting-author
of this ctxc.
Emit a warning msg when `.canceller` has not been set,
which usually idicates that a `None` msg-loop setinel was
sent before expected in the runtime. This can happen in
a few situations:
- (simulating) an IPC transport network outage
- a (malicious) pkt sent specifically to cancel an actor's
runtime non-gracefully without ensuring ongoing RPC tasks are
incrementally cancelled as is done with:
`Actor`
|_`.cancel()`
|_`.cancel_soon()`
|_`._cancel_task()`
'''
value = self.msgdata.get('canceller')
if value:
return tuple(value)
log.warning(
'IPC Context cancelled without a requesting actor?\n'
'Maybe the IPC transport ended abruptly?\n\n'
f'{self}'
)
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
@ -227,6 +199,7 @@ def pack_error(
):
error_msg.update(exc.msgdata)
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
@ -237,10 +210,8 @@ def pack_error(
def unpack_error(
msg: dict[str, Any],
chan=None,
err_type=RemoteActorError,
hide_tb: bool = True,
) -> None|Exception:
@ -316,61 +287,37 @@ def _raise_from_no_key_in_msg(
msg: dict,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
stream: MsgStream | None = None,
# allow "deeper" tbs when debugging B^o
hide_tb: bool = True,
) -> bool:
'''
Raise an appopriate local error when a
`MsgStream` msg arrives which does not
contain the expected (at least under normal
operation) `'yield'` field.
`Context` and any embedded `MsgStream` termination,
as well as remote task errors are handled in order
of priority as:
- any 'error' msg is re-boxed and raised locally as
-> `RemoteActorError`|`ContextCancelled`
- a `MsgStream` 'stop' msg is constructed, assigned
and raised locally as -> `trio.EndOfChannel`
- All other mis-keyed msgss (like say a "final result"
'return' msg, normally delivered from `Context.result()`)
are re-boxed inside a `MessagingError` with an explicit
exc content describing the missing IPC-msg-key.
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = hide_tb
__tracebackhide__: bool = True
# an internal error should never get here
# internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}\n'
) from src_err
# TODO: test that shows stream raising an expected error!!!
# raise the error message in a boxed exception type!
if msg.get('error'):
# raise the error message
raise unpack_error(
msg,
ctx.chan,
hide_tb=hide_tb,
) from None
# `MsgStream` termination msg.
elif (
msg.get('stop')
or (
@ -383,26 +330,29 @@ def _raise_from_no_key_in_msg(
f'cid: {cid}\n'
)
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
eoc = trio.EndOfChannel(
f'Context stream ended due to msg:\n\n'
f'{pformat(msg)}\n'
)
# XXX: important to set so that a new `.receive()`
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the `return` message
# value out of the underlying feed mem chan which is
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
raise trio.EndOfChannel(
f'Context stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
raise eoc from src_err
if (
stream

View File

@ -138,19 +138,13 @@ async def open_root_actor(
)
assert registry_addrs
loglevel = (
loglevel
or log._default_loglevel
).upper()
loglevel = (loglevel or log._default_loglevel).upper()
if (
debug_mode
and _spawn._spawn_method == 'trio'
):
if debug_mode and _spawn._spawn_method == 'trio':
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing for
# use of ``await tractor.pause()``
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
enable_modules.append('tractor.devx._debug')
# if debug mode get's enabled *at least* use that level of
@ -169,20 +163,7 @@ async def open_root_actor(
"Debug mode is only supported for the `trio` backend!"
)
assert loglevel
_log = log.get_console_log(loglevel)
assert _log
# TODO: factor this into `.devx._stackscope`!!
if debug_mode:
try:
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
log.get_console_log(loglevel)
# closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = []

View File

@ -48,12 +48,15 @@ import trio
from trio import (
CancelScope,
)
from trio.lowlevel import (
current_task,
Task,
)
from trio_typing import (
Nursery,
TaskStatus,
)
from .msg import NamespacePath
from ._ipc import Channel
from ._context import (
mk_context,
@ -142,9 +145,8 @@ async def _invoke(
cs: CancelScope | None = None
ctx = actor.get_context(
chan=chan,
cid=cid,
nsf=NamespacePath.from_ref(func),
chan,
cid,
# We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to
# open the stream with this option.
@ -274,8 +276,8 @@ async def _invoke(
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
res: Any = await coro
ctx._result = res
# res: Any = await coro
res = await coro
# deliver final result to caller side.
await chan.send({
@ -312,18 +314,11 @@ async def _invoke(
# don't pop the local context until we know the
# associated child isn't in debug any more
await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop(
(chan.uid, cid)
)
res_str: str = (
'error: {ctx._local_error}'
if ctx._local_error
else f'result: {ctx._result}'
)
ctx: Context = actor._contexts.pop((chan.uid, cid))
log.cancel(
f'IPC context terminated with final {res_str}\n'
f'|_{pformat(ctx)}\n'
f'Context task was terminated:\n'
f'func: {func}\n'
f'ctx: {pformat(ctx)}'
)
if ctx.cancelled_caught:
@ -336,6 +331,7 @@ async def _invoke(
ctx._maybe_raise_remote_err(re)
# fname: str = func.__name__
task: Task = current_task()
cs: CancelScope = ctx._scope
if cs.cancel_called:
our_uid: tuple = actor.uid
@ -382,16 +378,16 @@ async def _invoke(
div_str +
f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n'
f' |_{ctx._task}()\n'
f' |_ task: `{task.name}()`'
)
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += (
# '------ - ------\n'
# 'IPC msg:\n'
f'\n{ctx._cancel_msg}'
'------ - ------\n'
'IPC msg:\n'
f'{ctx._cancel_msg}'
)
# task-contex was either cancelled by request using
@ -439,12 +435,7 @@ async def _invoke(
task_status.started(ctx)
result = await coro
fname: str = func.__name__
log.runtime(
'RPC complete:\n'
f'task: {ctx._task}\n'
f'|_cid={ctx.cid}\n'
f'|_{fname}() -> {pformat(result)}\n'
)
log.runtime(f'{fname}() result: {result}')
# NOTE: only send result if we know IPC isn't down
if (
@ -974,7 +965,7 @@ class Actor:
# and bail after timeout (2-generals on closure).
assert chan.msgstream
log.warning(
log.runtime(
f'Draining lingering msgs from stream {chan.msgstream}'
)
@ -986,24 +977,13 @@ class Actor:
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected\n'
f'peer: {chan.uid}]\n\n'
f'{pformat(msg)}\n'
)
log.runtime(f'drained {msg} for {chan.uid}')
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter
await self._push_result(
chan,
cid,
msg,
)
await self._push_result(chan, cid, msg)
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
log.runtime('Waiting on actor nursery to exit..')
await local_nursery.exited.wait()
if disconnected:
@ -1187,7 +1167,6 @@ class Actor:
self,
chan: Channel,
cid: str,
nsf: NamespacePath,
msg_buffer_size: int | None = None,
allow_overruns: bool = False,
@ -1201,15 +1180,11 @@ class Actor:
task-as-function invocation.
'''
log.runtime(f"Getting result queue for {chan.uid} cid {cid}")
actor_uid = chan.uid
assert actor_uid
try:
ctx = self._contexts[(actor_uid, cid)]
log.runtime(
f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n'
f'cid:{cid}\n'
)
ctx._allow_overruns = allow_overruns
# adjust buffer size if specified
@ -1218,15 +1193,9 @@ class Actor:
state.max_buffer_size = msg_buffer_size
except KeyError:
log.runtime(
f'Creating NEW IPC ctx for\n'
f'peer: {chan.uid}\n'
f'cid: {cid}\n'
)
ctx = mk_context(
chan,
cid,
nsf=nsf,
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
_allow_overruns=allow_overruns,
)
@ -1237,13 +1206,11 @@ class Actor:
async def start_remote_task(
self,
chan: Channel,
nsf: NamespacePath,
ns: str,
func: str,
kwargs: dict,
# IPC channel config
msg_buffer_size: int | None = None,
allow_overruns: bool = False,
load_nsf: bool = False,
) -> Context:
'''
@ -1258,43 +1225,20 @@ class Actor:
cid = str(uuid.uuid4())
assert chan.uid
ctx = self.get_context(
chan=chan,
cid=cid,
nsf=nsf,
chan,
cid,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
if (
'self' in nsf
or not load_nsf
):
ns, _, func = nsf.partition(':')
else:
# TODO: pass nsf directly over wire!
# -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple()
log.runtime(
'Sending cmd to\n'
f'peer: {chan.uid} => \n'
'\n'
f'=> {ns}.{func}({kwargs})\n'
)
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send(
{'cmd': (
ns,
func,
kwargs,
self.uid,
cid,
)}
{'cmd': (ns, func, kwargs, self.uid, cid)}
)
# Wait on first response msg and validate; this should be
# immediate.
first_msg: dict = await ctx._recv_chan.receive()
functype: str = first_msg.get('functype')
first_msg = await ctx._recv_chan.receive()
functype = first_msg.get('functype')
if 'error' in first_msg:
raise unpack_error(first_msg, chan)
@ -1336,19 +1280,14 @@ class Actor:
parent_data: dict[str, Any]
parent_data = await chan.recv()
log.runtime(
'Received state from parent:\n\n'
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pformat(parent_data)}\n'
"Received state from parent:\n"
f"{parent_data}"
)
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
try:
log.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
@ -1429,8 +1368,7 @@ class Actor:
for listener in listeners
]
log.runtime(
'Started TCP server(s)\n'
f'|_{sockets}\n'
f'Started tcp server(s) on {sockets}'
)
self._listeners.extend(listeners)
@ -1542,20 +1480,8 @@ class Actor:
# be cancelled was indeed spawned by a request from this channel
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope: CancelScope = ctx._scope
except KeyError:
# NOTE: during msging race conditions this will often
# emit, some examples:
# - callee returns a result before cancel-msg/ctxc-raised
# - callee self raises ctxc before caller send request,
# - callee errors prior to cancel req.
log.cancel(
'Cancel request invalid, RPC task already completed?\n'
f'<= canceller: {requesting_uid}\n'
f' |_{chan}\n\n'
f'=> ctx id: {cid}\n'
)
log.cancel(f"{cid} has already completed/terminated?")
return True
log.cancel(
@ -1997,7 +1923,7 @@ async def process_messages(
log.runtime(
'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
f'|_{chan}'
)
nursery_cancelled_before_task: bool = False
msg: dict | None = None
@ -2034,10 +1960,8 @@ async def process_messages(
log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: conditionally avoid fmting depending
# on log level (for perf)?
# => specifically `pformat()` sub-call..?
f'{pformat(msg)}\n'
)
@ -2045,35 +1969,19 @@ async def process_messages(
if cid:
# deliver response to local caller/waiter
# via its per-remote-context memory channel.
await actor._push_result(
chan,
cid,
msg,
)
await actor._push_result(chan, cid, msg)
log.runtime(
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}:\n'
f'|_{chan}\n'
f'Waiting on next IPC msg from {chan.uid}:\n'
# f'last msg: {msg}\n'
f'|_{chan}'
)
continue
# process a 'cmd' request-msg upack
# TODO: impl with native `msgspec.Struct` support !!
# -[ ] implement with ``match:`` syntax?
# -[ ] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
# TODO: implement with ``match:`` syntax?
# process command request
try:
(
ns,
funcname,
kwargs,
actorid,
cid,
) = msg['cmd']
ns, funcname, kwargs, actorid, cid = msg['cmd']
except KeyError:
# This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()``
@ -2086,16 +1994,13 @@ async def process_messages(
raise exc
log.runtime(
'Handling RPC cmd from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
uid: tuple = chan.uid
if funcname == 'cancel':
func: Callable = actor.cancel
kwargs['requesting_uid'] = uid
kwargs['requesting_uid'] = chan.uid
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
@ -2103,16 +2008,15 @@ async def process_messages(
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
# we immediately start the runtime machinery
# shutdown
with CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``async_main()``
log.cancel(
f'Cancel request for `Actor` runtime\n'
f'<= canceller: {uid}\n'
# f'=> uid: {actor.uid}\n'
"Actor runtime for was remotely cancelled "
f"by {chan.uid}"
)
await _invoke(
actor,
@ -2139,10 +2043,9 @@ async def process_messages(
target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid
log.cancel(
f'Rx task cancel request\n'
f'<= canceller: {chan.uid}\n'
f'=> uid: {actor.uid}\n'
f' |_cid: {target_cid}\n'
f'Remote request to cancel task\n'
f'remote actor: {chan.uid}\n'
f'task: {target_cid}'
)
try:
await _invoke(
@ -2202,18 +2105,17 @@ async def process_messages(
# in the lone case where a ``Context`` is not
# delivered, it's likely going to be a locally
# scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception):
if isinstance(ctx, Exception):
log.warning(
'Task for RPC failed?'
f'|_ {func}()\n\n'
f'{err}'
f"Task for RPC func {func} failed with"
f"{ctx}"
)
continue
else:
# mark that we have ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
@ -2224,10 +2126,7 @@ async def process_messages(
)
log.runtime(
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
f"Waiting on next msg for {chan} from {chan.uid}")
# end of async for, channel disconnect vis
# ``trio.EndOfChannel``
@ -2244,12 +2143,9 @@ async def process_messages(
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
# TODO: don't show this msg if it's an emphemeral
# discovery ep call?
log.runtime(
f'channel closed abruptly with\n'
f'peer: {chan.uid}\n'
f'|_{chan.raddr}\n'
f'channel from {chan.uid} closed abruptly:\n'
f'-> {chan.raddr}\n'
)
# transport **was** disconnected
@ -2291,11 +2187,9 @@ async def process_messages(
finally:
# msg debugging for when he machinery is brokey
log.runtime(
'Exiting IPC msg loop with\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n\n'
'final msg:\n'
f'{pformat(msg)}\n'
f'Exiting IPC msg loop with {chan.uid} '
f'final msg: {msg}\n'
f'|_{chan}'
)
# transport **was not** disconnected

View File

@ -400,7 +400,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
else:
log.exception(
f"Nursery for {current_actor().uid} "
"errored with:"
"errored with\n"
# TODO: same thing as in
# `._invoke()` to compute how to

View File

@ -1,19 +1,18 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Multi-core debugging for da peeps!
@ -44,7 +43,6 @@ from types import FrameType
import pdbp
import tractor
import trio
from trio.lowlevel import current_task
from trio_typing import (
TaskStatus,
# Task,
@ -52,7 +50,6 @@ from trio_typing import (
from ..log import get_logger
from .._state import (
current_actor,
is_root_process,
debug_mode,
)
@ -241,7 +238,7 @@ async def _acquire_debug_lock_from_root_task(
to the ``pdb`` repl.
'''
task_name: str = current_task().name
task_name: str = trio.lowlevel.current_task().name
we_acquired: bool = False
log.runtime(
@ -326,7 +323,8 @@ async def lock_tty_for_child(
highly reliable at releasing the mutex complete!
'''
task_name: str = current_task().name
task_name = trio.lowlevel.current_task().name
if tuple(subactor_uid) in Lock._blocked:
log.warning(
f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
@ -409,13 +407,11 @@ async def wait_for_parent_stdin_hijack(
assert val == 'Locked'
async with ctx.open_stream() as stream:
try:
# unblock local caller
try:
assert Lock.local_pdb_complete
task_status.started(cs)
# wait for local task to exit and
# release the REPL
await Lock.local_pdb_complete.wait()
finally:
@ -472,7 +468,7 @@ def shield_sigint_handler(
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
actor = current_actor()
actor = tractor.current_actor()
# print(f'{actor.uid} in HANDLER with ')
def do_cancel():
@ -617,7 +613,7 @@ def _set_trace(
shield: bool = False,
):
__tracebackhide__: bool = True
actor: tractor.Actor = actor or current_actor()
actor: tractor.Actor = actor or tractor.current_actor()
# start 2 levels up in user code
frame: FrameType | None = sys._getframe()
@ -687,9 +683,9 @@ async def pause(
'''
# __tracebackhide__ = True
actor = current_actor()
actor = tractor.current_actor()
pdb, undo_sigint = mk_mpdb()
task_name: str = trio.lowlevel.current_task().name
task_name = trio.lowlevel.current_task().name
if (
not Lock.local_pdb_complete
@ -840,7 +836,7 @@ async def pause(
# runtime aware version which takes care of all .
def pause_from_sync() -> None:
print("ENTER SYNC PAUSE")
actor: tractor.Actor = current_actor(
actor: tractor.Actor = tractor.current_actor(
err_on_no_runtime=False,
)
if actor:
@ -975,10 +971,9 @@ async def acquire_debug_lock(
'''
Grab root's debug lock on entry, release on exit.
This helper is for actor's who don't actually need to acquired
the debugger but want to wait until the lock is free in the
process-tree root such that they don't clobber an ongoing pdb
REPL session in some peer or child!
This helper is for actor's who don't actually need
to acquired the debugger but want to wait until the
lock is free in the process-tree root.
'''
if not debug_mode():
@ -1018,71 +1013,43 @@ async def maybe_wait_for_debugger(
# tearing down.
sub_in_debug: tuple[str, str] | None = None
for istep in range(poll_steps):
for _ in range(poll_steps):
if sub_in_debug := Lock.global_actor_in_debug:
log.pdb(
f'Lock in use by {sub_in_debug}'
)
# TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be
# scheduled and grab the tty lock on the next
# tick?
# XXX => but it doesn't seem to work..
if Lock.global_actor_in_debug:
sub_in_debug = tuple(Lock.global_actor_in_debug)
log.debug('Root polling for debug')
with trio.CancelScope(shield=True):
await trio.sleep(poll_delay)
# TODO: could this make things more deterministic? wait
# to see if a sub-actor task will be scheduled and grab
# the tty lock on the next tick?
# XXX: doesn't seem to work
# await trio.testing.wait_all_tasks_blocked(cushion=0)
debug_complete: trio.Event|None = Lock.no_remote_has_tty
debug_complete = Lock.no_remote_has_tty
if (
debug_complete
and not debug_complete.is_set()
and sub_in_debug is not None
and not debug_complete.is_set()
):
log.pdb(
'Root has errored but pdb is in use by child\n'
'Waiting on tty lock to release..\n'
f'uid: {sub_in_debug}\n'
'Root has errored but pdb is in use by '
f'child {sub_in_debug}\n'
'Waiting on tty lock to release..'
)
await debug_complete.wait()
log.pdb(
f'Child subactor released debug lock!\n'
f'uid: {sub_in_debug}\n'
)
if debug_complete.is_set():
break
# is no subactor locking debugger currently?
elif (
debug_complete is None
or sub_in_debug is None
):
log.pdb(
'Root acquired debug TTY LOCK from child\n'
f'uid: {sub_in_debug}'
)
break
else:
# TODO: don't need this right?
# await trio.lowlevel.checkpoint()
log.debug(
'Root polling for debug:\n'
f'poll step: {istep}\n'
f'poll delya: {poll_delay}'
)
with trio.CancelScope(shield=True):
await trio.sleep(poll_delay)
continue
else:
log.pdb('Root acquired debug TTY LOCK')
log.debug(
'Root acquired TTY LOCK'
)
# else:
# # TODO: non-root call for #320?
# this_uid: tuple[str, str] = current_actor().uid
# async with acquire_debug_lock(
# subactor_uid=this_uid,
# ):
# pass
# TODO: better naming and what additionals?
# - [ ] optional runtime plugging?

View File

@ -58,11 +58,6 @@ class NamespacePath(str):
'''
_ref: object | type | None = None
# TODO: support providing the ns instance in
# order to support 'self.<meth>` style to make
# `Portal.run_from_ns()` work!
# _ns: ModuleType|type|None = None
def load_ref(self) -> object | type:
if self._ref is None:
self._ref = resolve_name(self)
@ -105,13 +100,5 @@ class NamespacePath(str):
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
return cls(':'.join(fqnp))
def to_tuple(
self,
# TODO: could this work re `self:<meth>` case from above?
# load_ref: bool = True,
) -> tuple[str, str]:
return self._mk_fqnp(
self.load_ref()
)
def to_tuple(self) -> tuple[str, str]:
return self._mk_fqnp(self.load_ref())