Compare commits
No commits in common. "10adf34be582a92f55587259845b09a5054f2265" and "54a0a0000d15210590fab1ba79ce18d85e940e74" have entirely different histories.
10adf34be5
...
54a0a0000d
12
setup.py
12
setup.py
|
@ -26,7 +26,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
|
||||||
setup(
|
setup(
|
||||||
name="tractor",
|
name="tractor",
|
||||||
version='0.1.0a6dev0', # alpha zone
|
version='0.1.0a6dev0', # alpha zone
|
||||||
description='structured concurrent `trio`-"actors"',
|
description='structured concurrrent `trio`-"actors"',
|
||||||
long_description=readme,
|
long_description=readme,
|
||||||
license='AGPLv3',
|
license='AGPLv3',
|
||||||
author='Tyler Goodlet',
|
author='Tyler Goodlet',
|
||||||
|
@ -50,7 +50,6 @@ setup(
|
||||||
'exceptiongroup',
|
'exceptiongroup',
|
||||||
|
|
||||||
# tooling
|
# tooling
|
||||||
'stackscope',
|
|
||||||
'tricycle',
|
'tricycle',
|
||||||
'trio_typing',
|
'trio_typing',
|
||||||
'colorlog',
|
'colorlog',
|
||||||
|
@ -62,15 +61,16 @@ setup(
|
||||||
# debug mode REPL
|
# debug mode REPL
|
||||||
'pdbp',
|
'pdbp',
|
||||||
|
|
||||||
# TODO: distributed transport using
|
|
||||||
# linux kernel networking
|
|
||||||
# 'pyroute2',
|
|
||||||
|
|
||||||
# pip ref docs on these specs:
|
# pip ref docs on these specs:
|
||||||
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
|
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
|
||||||
# and pep:
|
# and pep:
|
||||||
# https://peps.python.org/pep-0440/#version-specifiers
|
# https://peps.python.org/pep-0440/#version-specifiers
|
||||||
|
|
||||||
|
# windows deps workaround for ``pdbpp``
|
||||||
|
# https://github.com/pdbpp/pdbpp/issues/498
|
||||||
|
# https://github.com/pdbpp/fancycompleter/issues/37
|
||||||
|
'pyreadline3 ; platform_system == "Windows"',
|
||||||
|
|
||||||
],
|
],
|
||||||
tests_require=['pytest'],
|
tests_require=['pytest'],
|
||||||
python_requires=">=3.10",
|
python_requires=">=3.10",
|
||||||
|
|
|
@ -8,9 +8,7 @@ sync-opening a ``tractor.Context`` beforehand.
|
||||||
# from contextlib import asynccontextmanager as acm
|
# from contextlib import asynccontextmanager as acm
|
||||||
from itertools import count
|
from itertools import count
|
||||||
import platform
|
import platform
|
||||||
from typing import (
|
from typing import Optional
|
||||||
Callable,
|
|
||||||
)
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
|
@ -71,7 +69,7 @@ _state: bool = False
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def too_many_starteds(
|
async def too_many_starteds(
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Call ``Context.started()`` more then once (an error).
|
Call ``Context.started()`` more then once (an error).
|
||||||
|
@ -86,7 +84,7 @@ async def too_many_starteds(
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def not_started_but_stream_opened(
|
async def not_started_but_stream_opened(
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Enter ``Context.open_stream()`` without calling ``.started()``.
|
Enter ``Context.open_stream()`` without calling ``.started()``.
|
||||||
|
@ -107,15 +105,11 @@ async def not_started_but_stream_opened(
|
||||||
],
|
],
|
||||||
ids='misuse_type={}'.format,
|
ids='misuse_type={}'.format,
|
||||||
)
|
)
|
||||||
def test_started_misuse(
|
def test_started_misuse(target):
|
||||||
target: Callable,
|
|
||||||
debug_mode: bool,
|
|
||||||
):
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as n:
|
||||||
debug_mode=debug_mode,
|
portal = await n.start_actor(
|
||||||
) as an:
|
|
||||||
portal = await an.start_actor(
|
|
||||||
target.__name__,
|
target.__name__,
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -130,7 +124,7 @@ def test_started_misuse(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def simple_setup_teardown(
|
async def simple_setup_teardown(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
data: int,
|
data: int,
|
||||||
block_forever: bool = False,
|
block_forever: bool = False,
|
||||||
|
|
||||||
|
@ -176,7 +170,6 @@ def test_simple_context(
|
||||||
error_parent,
|
error_parent,
|
||||||
callee_blocks_forever,
|
callee_blocks_forever,
|
||||||
pointlessly_open_stream,
|
pointlessly_open_stream,
|
||||||
debug_mode: bool,
|
|
||||||
):
|
):
|
||||||
|
|
||||||
timeout = 1.5 if not platform.system() == 'Windows' else 4
|
timeout = 1.5 if not platform.system() == 'Windows' else 4
|
||||||
|
@ -184,10 +177,9 @@ def test_simple_context(
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(timeout):
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as nursery:
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as an:
|
portal = await nursery.start_actor(
|
||||||
portal = await an.start_actor(
|
|
||||||
'simple_context',
|
'simple_context',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -268,7 +260,6 @@ def test_caller_cancels(
|
||||||
cancel_method: str,
|
cancel_method: str,
|
||||||
chk_ctx_result_before_exit: bool,
|
chk_ctx_result_before_exit: bool,
|
||||||
callee_returns_early: bool,
|
callee_returns_early: bool,
|
||||||
debug_mode: bool,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify that when the opening side of a context (aka the caller)
|
Verify that when the opening side of a context (aka the caller)
|
||||||
|
@ -277,7 +268,7 @@ def test_caller_cancels(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def check_canceller(
|
async def check_canceller(
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
) -> None:
|
) -> None:
|
||||||
# should not raise yet return the remote
|
# should not raise yet return the remote
|
||||||
# context cancelled error.
|
# context cancelled error.
|
||||||
|
@ -296,10 +287,8 @@ def test_caller_cancels(
|
||||||
)
|
)
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as nursery:
|
||||||
debug_mode=debug_mode,
|
portal = await nursery.start_actor(
|
||||||
) as an:
|
|
||||||
portal = await an.start_actor(
|
|
||||||
'simple_context',
|
'simple_context',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -349,7 +338,7 @@ def test_caller_cancels(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def close_ctx_immediately(
|
async def close_ctx_immediately(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
@ -361,33 +350,17 @@ async def close_ctx_immediately(
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_callee_closes_ctx_after_stream_open(
|
async def test_callee_closes_ctx_after_stream_open():
|
||||||
debug_mode: bool,
|
'callee context closes without using stream'
|
||||||
):
|
|
||||||
'''
|
|
||||||
callee context closes without using stream.
|
|
||||||
|
|
||||||
This should result in a msg sequence
|
async with tractor.open_nursery() as n:
|
||||||
|_<root>_
|
|
||||||
|_<fast_stream_closer>
|
|
||||||
|
|
||||||
<= {'started': <Any>, 'cid': <str>}
|
portal = await n.start_actor(
|
||||||
<= {'stop': True, 'cid': <str>}
|
|
||||||
<= {'result': Any, ..}
|
|
||||||
|
|
||||||
(ignored by child)
|
|
||||||
=> {'stop': True, 'cid': <str>}
|
|
||||||
|
|
||||||
'''
|
|
||||||
async with tractor.open_nursery(
|
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as an:
|
|
||||||
portal = await an.start_actor(
|
|
||||||
'fast_stream_closer',
|
'fast_stream_closer',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
with trio.fail_after(0.5):
|
with trio.fail_after(2):
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
close_ctx_immediately,
|
close_ctx_immediately,
|
||||||
|
|
||||||
|
@ -395,9 +368,10 @@ async def test_callee_closes_ctx_after_stream_open(
|
||||||
# cancel_on_exit=True,
|
# cancel_on_exit=True,
|
||||||
|
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
|
|
||||||
assert sent is None
|
assert sent is None
|
||||||
|
|
||||||
with trio.fail_after(0.4):
|
with trio.fail_after(0.5):
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
# should fall through since ``StopAsyncIteration``
|
# should fall through since ``StopAsyncIteration``
|
||||||
|
@ -405,15 +379,12 @@ async def test_callee_closes_ctx_after_stream_open(
|
||||||
# a ``trio.EndOfChannel`` by
|
# a ``trio.EndOfChannel`` by
|
||||||
# ``trio.abc.ReceiveChannel.__anext__()``
|
# ``trio.abc.ReceiveChannel.__anext__()``
|
||||||
async for _ in stream:
|
async for _ in stream:
|
||||||
# trigger failure if we DO NOT
|
|
||||||
# get an EOC!
|
|
||||||
assert 0
|
assert 0
|
||||||
else:
|
else:
|
||||||
|
|
||||||
# verify stream is now closed
|
# verify stream is now closed
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(0.3):
|
await stream.receive()
|
||||||
await stream.receive()
|
|
||||||
except trio.EndOfChannel:
|
except trio.EndOfChannel:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -434,7 +405,7 @@ async def test_callee_closes_ctx_after_stream_open(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def expect_cancelled(
|
async def expect_cancelled(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
global _state
|
global _state
|
||||||
|
@ -463,15 +434,11 @@ async def expect_cancelled(
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_caller_closes_ctx_after_callee_opens_stream(
|
async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||||
use_ctx_cancel_method: bool,
|
use_ctx_cancel_method: bool,
|
||||||
debug_mode: bool,
|
|
||||||
):
|
):
|
||||||
'''
|
'caller context closes without using stream'
|
||||||
caller context closes without using/opening stream
|
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
|
|
||||||
'''
|
|
||||||
async with tractor.open_nursery(
|
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as an:
|
|
||||||
root: Actor = current_actor()
|
root: Actor = current_actor()
|
||||||
|
|
||||||
portal = await an.start_actor(
|
portal = await an.start_actor(
|
||||||
|
@ -555,13 +522,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_multitask_caller_cancels_from_nonroot_task(
|
async def test_multitask_caller_cancels_from_nonroot_task():
|
||||||
debug_mode: bool,
|
|
||||||
):
|
async with tractor.open_nursery() as n:
|
||||||
async with tractor.open_nursery(
|
|
||||||
debug_mode=debug_mode,
|
portal = await n.start_actor(
|
||||||
) as an:
|
|
||||||
portal = await an.start_actor(
|
|
||||||
'ctx_cancelled',
|
'ctx_cancelled',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -608,7 +573,7 @@ async def test_multitask_caller_cancels_from_nonroot_task(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def cancel_self(
|
async def cancel_self(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
global _state
|
global _state
|
||||||
|
@ -645,20 +610,16 @@ async def cancel_self(
|
||||||
|
|
||||||
raise RuntimeError('Context didnt cancel itself?!')
|
raise RuntimeError('Context didnt cancel itself?!')
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_callee_cancels_before_started(
|
async def test_callee_cancels_before_started():
|
||||||
debug_mode: bool,
|
|
||||||
):
|
|
||||||
'''
|
'''
|
||||||
Callee calls `Context.cancel()` while streaming and caller
|
Callee calls `Context.cancel()` while streaming and caller
|
||||||
sees stream terminated in `ContextCancelled`.
|
sees stream terminated in `ContextCancelled`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as n:
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as an:
|
portal = await n.start_actor(
|
||||||
portal = await an.start_actor(
|
|
||||||
'cancels_self',
|
'cancels_self',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -684,7 +645,7 @@ async def test_callee_cancels_before_started(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def never_open_stream(
|
async def never_open_stream(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -698,8 +659,8 @@ async def never_open_stream(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def keep_sending_from_callee(
|
async def keep_sending_from_callee(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
msg_buffer_size: int|None = None,
|
msg_buffer_size: Optional[int] = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -724,10 +685,7 @@ async def keep_sending_from_callee(
|
||||||
],
|
],
|
||||||
ids='overrun_condition={}'.format,
|
ids='overrun_condition={}'.format,
|
||||||
)
|
)
|
||||||
def test_one_end_stream_not_opened(
|
def test_one_end_stream_not_opened(overrun_by):
|
||||||
overrun_by: tuple[str, int, Callable],
|
|
||||||
debug_mode: bool,
|
|
||||||
):
|
|
||||||
'''
|
'''
|
||||||
This should exemplify the bug from:
|
This should exemplify the bug from:
|
||||||
https://github.com/goodboy/tractor/issues/265
|
https://github.com/goodboy/tractor/issues/265
|
||||||
|
@ -738,10 +696,8 @@ def test_one_end_stream_not_opened(
|
||||||
buf_size = buf_size_increase + Actor.msg_buffer_size
|
buf_size = buf_size_increase + Actor.msg_buffer_size
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as n:
|
||||||
debug_mode=debug_mode,
|
portal = await n.start_actor(
|
||||||
) as an:
|
|
||||||
portal = await an.start_actor(
|
|
||||||
entrypoint.__name__,
|
entrypoint.__name__,
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -798,7 +754,7 @@ def test_one_end_stream_not_opened(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def echo_back_sequence(
|
async def echo_back_sequence(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: tractor.Context,
|
||||||
seq: list[int],
|
seq: list[int],
|
||||||
wait_for_cancel: bool,
|
wait_for_cancel: bool,
|
||||||
allow_overruns_side: str,
|
allow_overruns_side: str,
|
||||||
|
@ -881,7 +837,6 @@ def test_maybe_allow_overruns_stream(
|
||||||
slow_side: str,
|
slow_side: str,
|
||||||
allow_overruns_side: str,
|
allow_overruns_side: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
debug_mode: bool,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Demonstrate small overruns of each task back and forth
|
Demonstrate small overruns of each task back and forth
|
||||||
|
@ -900,14 +855,13 @@ def test_maybe_allow_overruns_stream(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as n:
|
||||||
debug_mode=debug_mode,
|
portal = await n.start_actor(
|
||||||
) as an:
|
|
||||||
portal = await an.start_actor(
|
|
||||||
'callee_sends_forever',
|
'callee_sends_forever',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
debug_mode=debug_mode,
|
|
||||||
|
# debug_mode=True,
|
||||||
)
|
)
|
||||||
seq = list(range(10))
|
seq = list(range(10))
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
|
|
|
@ -123,9 +123,7 @@ async def error_before_started(
|
||||||
await peer_ctx.cancel()
|
await peer_ctx.cancel()
|
||||||
|
|
||||||
|
|
||||||
def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
|
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||||
debug_mode: bool,
|
|
||||||
):
|
|
||||||
'''
|
'''
|
||||||
Verify that an error raised in a remote context which itself
|
Verify that an error raised in a remote context which itself
|
||||||
opens YET ANOTHER remote context, which it then cancels, does not
|
opens YET ANOTHER remote context, which it then cancels, does not
|
||||||
|
@ -134,9 +132,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as n:
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as n:
|
|
||||||
portal = await n.start_actor(
|
portal = await n.start_actor(
|
||||||
'errorer',
|
'errorer',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
|
@ -229,16 +225,13 @@ async def stream_from_peer(
|
||||||
# NOTE: cancellation of the (sleeper) peer should always
|
# NOTE: cancellation of the (sleeper) peer should always
|
||||||
# cause a `ContextCancelled` raise in this streaming
|
# cause a `ContextCancelled` raise in this streaming
|
||||||
# actor.
|
# actor.
|
||||||
except ContextCancelled as ctxc:
|
except ContextCancelled as ctxerr:
|
||||||
ctxerr = ctxc
|
err = ctxerr
|
||||||
|
|
||||||
assert peer_ctx._remote_error is ctxerr
|
assert peer_ctx._remote_error is ctxerr
|
||||||
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
|
||||||
assert peer_ctx.canceller == ctxerr.canceller
|
assert peer_ctx.canceller == ctxerr.canceller
|
||||||
|
|
||||||
# caller peer should not be the cancel requester
|
# caller peer should not be the cancel requester
|
||||||
assert not ctx.cancel_called
|
assert not ctx.cancel_called
|
||||||
|
|
||||||
# XXX can never be true since `._invoke` only
|
# XXX can never be true since `._invoke` only
|
||||||
# sets this AFTER the nursery block this task
|
# sets this AFTER the nursery block this task
|
||||||
# was started in, exits.
|
# was started in, exits.
|
||||||
|
@ -276,7 +269,9 @@ async def stream_from_peer(
|
||||||
# assert ctx.canceller[0] == 'root'
|
# assert ctx.canceller[0] == 'root'
|
||||||
# assert peer_ctx.canceller[0] == 'sleeper'
|
# assert peer_ctx.canceller[0] == 'sleeper'
|
||||||
|
|
||||||
raise RuntimeError('Never triggered local `ContextCancelled` ?!?')
|
raise RuntimeError(
|
||||||
|
'peer never triggered local `ContextCancelled`?'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
@ -285,7 +280,6 @@ async def stream_from_peer(
|
||||||
)
|
)
|
||||||
def test_peer_canceller(
|
def test_peer_canceller(
|
||||||
error_during_ctxerr_handling: bool,
|
error_during_ctxerr_handling: bool,
|
||||||
debug_mode: bool,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify that a cancellation triggered by an in-actor-tree peer
|
Verify that a cancellation triggered by an in-actor-tree peer
|
||||||
|
@ -342,7 +336,7 @@ def test_peer_canceller(
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||||
debug_mode=debug_mode,
|
# debug_mode=True
|
||||||
) as an:
|
) as an:
|
||||||
canceller: Portal = await an.start_actor(
|
canceller: Portal = await an.start_actor(
|
||||||
'canceller',
|
'canceller',
|
||||||
|
@ -383,8 +377,7 @@ def test_peer_canceller(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print('PRE CONTEXT RESULT')
|
print('PRE CONTEXT RESULT')
|
||||||
res = await sleeper_ctx.result()
|
await sleeper_ctx.result()
|
||||||
assert res
|
|
||||||
|
|
||||||
# should never get here
|
# should never get here
|
||||||
pytest.fail(
|
pytest.fail(
|
||||||
|
@ -394,10 +387,7 @@ def test_peer_canceller(
|
||||||
# should always raise since this root task does
|
# should always raise since this root task does
|
||||||
# not request the sleeper cancellation ;)
|
# not request the sleeper cancellation ;)
|
||||||
except ContextCancelled as ctxerr:
|
except ContextCancelled as ctxerr:
|
||||||
print(
|
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
||||||
'CAUGHT REMOTE CONTEXT CANCEL FOM\n'
|
|
||||||
f'{ctxerr}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# canceller and caller peers should not
|
# canceller and caller peers should not
|
||||||
# have been remotely cancelled.
|
# have been remotely cancelled.
|
||||||
|
@ -420,31 +410,16 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# XXX SHOULD NEVER EVER GET HERE XXX
|
# XXX SHOULD NEVER EVER GET HERE XXX
|
||||||
except BaseException as berr:
|
except BaseException as berr:
|
||||||
raise
|
err = berr
|
||||||
|
pytest.fail('did not rx ctx-cancelled error?')
|
||||||
# XXX if needed to debug failure
|
|
||||||
# _err = berr
|
|
||||||
# await tractor.pause()
|
|
||||||
# await trio.sleep_forever()
|
|
||||||
|
|
||||||
pytest.fail(
|
|
||||||
'did not rx ctxc ?!?\n\n'
|
|
||||||
|
|
||||||
f'{berr}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
pytest.fail(
|
pytest.fail('did not rx ctx-cancelled error?')
|
||||||
'did not rx ctxc ?!?\n\n'
|
|
||||||
|
|
||||||
f'{ctxs}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
except (
|
except (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
RuntimeError,
|
RuntimeError,
|
||||||
)as loc_err:
|
)as ctxerr:
|
||||||
_loc_err = loc_err
|
_err = ctxerr
|
||||||
|
|
||||||
# NOTE: the main state to check on `Context` is:
|
# NOTE: the main state to check on `Context` is:
|
||||||
# - `.cancelled_caught` (maps to nursery cs)
|
# - `.cancelled_caught` (maps to nursery cs)
|
||||||
|
@ -461,7 +436,7 @@ def test_peer_canceller(
|
||||||
# `ContextCancelled` inside `.open_context()`
|
# `ContextCancelled` inside `.open_context()`
|
||||||
# block
|
# block
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
assert isinstance(loc_err, RuntimeError)
|
assert isinstance(ctxerr, RuntimeError)
|
||||||
|
|
||||||
# NOTE: this root actor task should have
|
# NOTE: this root actor task should have
|
||||||
# called `Context.cancel()` on the
|
# called `Context.cancel()` on the
|
||||||
|
@ -497,10 +472,9 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# CASE: standard teardown inside in `.open_context()` block
|
# CASE: standard teardown inside in `.open_context()` block
|
||||||
else:
|
else:
|
||||||
assert isinstance(loc_err, ContextCancelled)
|
assert ctxerr.canceller == sleeper_ctx.canceller
|
||||||
assert loc_err.canceller == sleeper_ctx.canceller
|
|
||||||
assert (
|
assert (
|
||||||
loc_err.canceller[0]
|
ctxerr.canceller[0]
|
||||||
==
|
==
|
||||||
sleeper_ctx.canceller[0]
|
sleeper_ctx.canceller[0]
|
||||||
==
|
==
|
||||||
|
@ -510,7 +484,7 @@ def test_peer_canceller(
|
||||||
# the sleeper's remote error is the error bubbled
|
# the sleeper's remote error is the error bubbled
|
||||||
# out of the context-stack above!
|
# out of the context-stack above!
|
||||||
re = sleeper_ctx._remote_error
|
re = sleeper_ctx._remote_error
|
||||||
assert re is loc_err
|
assert re is ctxerr
|
||||||
|
|
||||||
for ctx in ctxs:
|
for ctx in ctxs:
|
||||||
re: BaseException | None = ctx._remote_error
|
re: BaseException | None = ctx._remote_error
|
||||||
|
@ -580,14 +554,3 @@ def test_peer_canceller(
|
||||||
|
|
||||||
assert excinfo.value.type == ContextCancelled
|
assert excinfo.value.type == ContextCancelled
|
||||||
assert excinfo.value.canceller[0] == 'canceller'
|
assert excinfo.value.canceller[0] == 'canceller'
|
||||||
|
|
||||||
|
|
||||||
def test_client_tree_spawns_and_cancels_service_subactor():
|
|
||||||
...
|
|
||||||
# TODO: test for the modden `mod wks open piker` bug!
|
|
||||||
# -> start actor-tree (server) that offers sub-actor spawns via
|
|
||||||
# context API
|
|
||||||
# -> start another full actor-tree (client) which requests to the first to
|
|
||||||
# spawn over its `@context` ep / api.
|
|
||||||
# -> client actor cancels the context and should exit gracefully
|
|
||||||
# and the server's spawned child should cancel and terminate!
|
|
||||||
|
|
|
@ -33,15 +33,12 @@ import exceptiongroup as eg
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
from .log import get_logger
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._context import Context
|
from ._context import Context
|
||||||
from ._stream import MsgStream
|
from ._stream import MsgStream
|
||||||
from .log import StackLevelAdapter
|
from .log import StackLevelAdapter
|
||||||
|
|
||||||
log = get_logger('tractor')
|
|
||||||
|
|
||||||
_this_mod = importlib.import_module(__name__)
|
_this_mod = importlib.import_module(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -115,36 +112,11 @@ class ContextCancelled(RemoteActorError):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
@property
|
@property
|
||||||
def canceller(self) -> tuple[str, str]|None:
|
def canceller(self) -> tuple[str, str] | None:
|
||||||
'''
|
|
||||||
Return the (maybe) `Actor.uid` for the requesting-author
|
|
||||||
of this ctxc.
|
|
||||||
|
|
||||||
Emit a warning msg when `.canceller` has not been set,
|
|
||||||
which usually idicates that a `None` msg-loop setinel was
|
|
||||||
sent before expected in the runtime. This can happen in
|
|
||||||
a few situations:
|
|
||||||
|
|
||||||
- (simulating) an IPC transport network outage
|
|
||||||
- a (malicious) pkt sent specifically to cancel an actor's
|
|
||||||
runtime non-gracefully without ensuring ongoing RPC tasks are
|
|
||||||
incrementally cancelled as is done with:
|
|
||||||
`Actor`
|
|
||||||
|_`.cancel()`
|
|
||||||
|_`.cancel_soon()`
|
|
||||||
|_`._cancel_task()`
|
|
||||||
|
|
||||||
'''
|
|
||||||
value = self.msgdata.get('canceller')
|
value = self.msgdata.get('canceller')
|
||||||
if value:
|
if value:
|
||||||
return tuple(value)
|
return tuple(value)
|
||||||
|
|
||||||
log.warning(
|
|
||||||
'IPC Context cancelled without a requesting actor?\n'
|
|
||||||
'Maybe the IPC transport ended abruptly?\n\n'
|
|
||||||
f'{self}'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TransportClosed(trio.ClosedResourceError):
|
class TransportClosed(trio.ClosedResourceError):
|
||||||
"Underlying channel transport was closed prior to use"
|
"Underlying channel transport was closed prior to use"
|
||||||
|
@ -227,6 +199,7 @@ def pack_error(
|
||||||
):
|
):
|
||||||
error_msg.update(exc.msgdata)
|
error_msg.update(exc.msgdata)
|
||||||
|
|
||||||
|
|
||||||
pkt: dict = {'error': error_msg}
|
pkt: dict = {'error': error_msg}
|
||||||
if cid:
|
if cid:
|
||||||
pkt['cid'] = cid
|
pkt['cid'] = cid
|
||||||
|
@ -237,10 +210,8 @@ def pack_error(
|
||||||
def unpack_error(
|
def unpack_error(
|
||||||
|
|
||||||
msg: dict[str, Any],
|
msg: dict[str, Any],
|
||||||
|
|
||||||
chan=None,
|
chan=None,
|
||||||
err_type=RemoteActorError,
|
err_type=RemoteActorError,
|
||||||
|
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> None|Exception:
|
) -> None|Exception:
|
||||||
|
@ -316,61 +287,37 @@ def _raise_from_no_key_in_msg(
|
||||||
msg: dict,
|
msg: dict,
|
||||||
src_err: KeyError,
|
src_err: KeyError,
|
||||||
log: StackLevelAdapter, # caller specific `log` obj
|
log: StackLevelAdapter, # caller specific `log` obj
|
||||||
|
|
||||||
expect_key: str = 'yield',
|
expect_key: str = 'yield',
|
||||||
stream: MsgStream | None = None,
|
stream: MsgStream | None = None,
|
||||||
|
|
||||||
# allow "deeper" tbs when debugging B^o
|
|
||||||
hide_tb: bool = True,
|
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Raise an appopriate local error when a
|
Raise an appopriate local error when a `MsgStream` msg arrives
|
||||||
`MsgStream` msg arrives which does not
|
which does not contain the expected (under normal operation)
|
||||||
contain the expected (at least under normal
|
`'yield'` field.
|
||||||
operation) `'yield'` field.
|
|
||||||
|
|
||||||
`Context` and any embedded `MsgStream` termination,
|
|
||||||
as well as remote task errors are handled in order
|
|
||||||
of priority as:
|
|
||||||
|
|
||||||
- any 'error' msg is re-boxed and raised locally as
|
|
||||||
-> `RemoteActorError`|`ContextCancelled`
|
|
||||||
|
|
||||||
- a `MsgStream` 'stop' msg is constructed, assigned
|
|
||||||
and raised locally as -> `trio.EndOfChannel`
|
|
||||||
|
|
||||||
- All other mis-keyed msgss (like say a "final result"
|
|
||||||
'return' msg, normally delivered from `Context.result()`)
|
|
||||||
are re-boxed inside a `MessagingError` with an explicit
|
|
||||||
exc content describing the missing IPC-msg-key.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = True
|
||||||
|
|
||||||
# an internal error should never get here
|
# internal error should never get here
|
||||||
try:
|
try:
|
||||||
cid: str = msg['cid']
|
cid: str = msg['cid']
|
||||||
except KeyError as src_err:
|
except KeyError as src_err:
|
||||||
raise MessagingError(
|
raise MessagingError(
|
||||||
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
|
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
|
||||||
f'cid: {cid}\n\n'
|
f'cid: {cid}\n'
|
||||||
|
'received msg:\n'
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
||||||
# TODO: test that shows stream raising an expected error!!!
|
# TODO: test that shows stream raising an expected error!!!
|
||||||
|
|
||||||
# raise the error message in a boxed exception type!
|
|
||||||
if msg.get('error'):
|
if msg.get('error'):
|
||||||
|
# raise the error message
|
||||||
raise unpack_error(
|
raise unpack_error(
|
||||||
msg,
|
msg,
|
||||||
ctx.chan,
|
ctx.chan,
|
||||||
hide_tb=hide_tb,
|
|
||||||
|
|
||||||
) from None
|
) from None
|
||||||
|
|
||||||
# `MsgStream` termination msg.
|
|
||||||
elif (
|
elif (
|
||||||
msg.get('stop')
|
msg.get('stop')
|
||||||
or (
|
or (
|
||||||
|
@ -383,26 +330,29 @@ def _raise_from_no_key_in_msg(
|
||||||
f'cid: {cid}\n'
|
f'cid: {cid}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX: important to set so that a new ``.receive()``
|
||||||
|
# call (likely by another task using a broadcast receiver)
|
||||||
|
# doesn't accidentally pull the ``return`` message
|
||||||
|
# value out of the underlying feed mem chan!
|
||||||
|
stream._eoc: bool = True
|
||||||
|
|
||||||
# TODO: if the a local task is already blocking on
|
# TODO: if the a local task is already blocking on
|
||||||
# a `Context.result()` and thus a `.receive()` on the
|
# a `Context.result()` and thus a `.receive()` on the
|
||||||
# rx-chan, we close the chan and set state ensuring that
|
# rx-chan, we close the chan and set state ensuring that
|
||||||
# an eoc is raised!
|
# an eoc is raised!
|
||||||
|
|
||||||
|
# # when the send is closed we assume the stream has
|
||||||
|
# # terminated and signal this local iterator to stop
|
||||||
|
# await stream.aclose()
|
||||||
|
|
||||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||||
# block below it will trigger ``.aclose()``.
|
# block below it will trigger ``.aclose()``.
|
||||||
eoc = trio.EndOfChannel(
|
raise trio.EndOfChannel(
|
||||||
f'Context stream ended due to msg:\n\n'
|
f'Context stream ended due to msg:\n'
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}'
|
||||||
)
|
) from src_err
|
||||||
# XXX: important to set so that a new `.receive()`
|
|
||||||
# call (likely by another task using a broadcast receiver)
|
|
||||||
# doesn't accidentally pull the `return` message
|
|
||||||
# value out of the underlying feed mem chan which is
|
|
||||||
# destined for the `Context.result()` call during ctx-exit!
|
|
||||||
stream._eoc: Exception = eoc
|
|
||||||
|
|
||||||
raise eoc from src_err
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
stream
|
stream
|
||||||
|
|
|
@ -138,19 +138,13 @@ async def open_root_actor(
|
||||||
)
|
)
|
||||||
assert registry_addrs
|
assert registry_addrs
|
||||||
|
|
||||||
loglevel = (
|
loglevel = (loglevel or log._default_loglevel).upper()
|
||||||
loglevel
|
|
||||||
or log._default_loglevel
|
|
||||||
).upper()
|
|
||||||
|
|
||||||
if (
|
if debug_mode and _spawn._spawn_method == 'trio':
|
||||||
debug_mode
|
|
||||||
and _spawn._spawn_method == 'trio'
|
|
||||||
):
|
|
||||||
_state._runtime_vars['_debug_mode'] = True
|
_state._runtime_vars['_debug_mode'] = True
|
||||||
|
|
||||||
# expose internal debug module to every actor allowing for
|
# expose internal debug module to every actor allowing
|
||||||
# use of ``await tractor.pause()``
|
# for use of ``await tractor.breakpoint()``
|
||||||
enable_modules.append('tractor.devx._debug')
|
enable_modules.append('tractor.devx._debug')
|
||||||
|
|
||||||
# if debug mode get's enabled *at least* use that level of
|
# if debug mode get's enabled *at least* use that level of
|
||||||
|
@ -169,20 +163,7 @@ async def open_root_actor(
|
||||||
"Debug mode is only supported for the `trio` backend!"
|
"Debug mode is only supported for the `trio` backend!"
|
||||||
)
|
)
|
||||||
|
|
||||||
assert loglevel
|
log.get_console_log(loglevel)
|
||||||
_log = log.get_console_log(loglevel)
|
|
||||||
assert _log
|
|
||||||
|
|
||||||
# TODO: factor this into `.devx._stackscope`!!
|
|
||||||
if debug_mode:
|
|
||||||
try:
|
|
||||||
logger.info('Enabling `stackscope` traces on SIGUSR1')
|
|
||||||
from .devx import enable_stack_on_sig
|
|
||||||
enable_stack_on_sig()
|
|
||||||
except ImportError:
|
|
||||||
logger.warning(
|
|
||||||
'`stackscope` not installed for use in debug mode!'
|
|
||||||
)
|
|
||||||
|
|
||||||
# closed into below ping task-func
|
# closed into below ping task-func
|
||||||
ponged_addrs: list[tuple[str, int]] = []
|
ponged_addrs: list[tuple[str, int]] = []
|
||||||
|
|
|
@ -48,12 +48,15 @@ import trio
|
||||||
from trio import (
|
from trio import (
|
||||||
CancelScope,
|
CancelScope,
|
||||||
)
|
)
|
||||||
|
from trio.lowlevel import (
|
||||||
|
current_task,
|
||||||
|
Task,
|
||||||
|
)
|
||||||
from trio_typing import (
|
from trio_typing import (
|
||||||
Nursery,
|
Nursery,
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .msg import NamespacePath
|
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._context import (
|
from ._context import (
|
||||||
mk_context,
|
mk_context,
|
||||||
|
@ -142,9 +145,8 @@ async def _invoke(
|
||||||
cs: CancelScope | None = None
|
cs: CancelScope | None = None
|
||||||
|
|
||||||
ctx = actor.get_context(
|
ctx = actor.get_context(
|
||||||
chan=chan,
|
chan,
|
||||||
cid=cid,
|
cid,
|
||||||
nsf=NamespacePath.from_ref(func),
|
|
||||||
# We shouldn't ever need to pass this through right?
|
# We shouldn't ever need to pass this through right?
|
||||||
# it's up to the soon-to-be called rpc task to
|
# it's up to the soon-to-be called rpc task to
|
||||||
# open the stream with this option.
|
# open the stream with this option.
|
||||||
|
@ -274,8 +276,8 @@ async def _invoke(
|
||||||
|
|
||||||
# TODO: should would be nice to have our
|
# TODO: should would be nice to have our
|
||||||
# `TaskMngr` nursery here!
|
# `TaskMngr` nursery here!
|
||||||
res: Any = await coro
|
# res: Any = await coro
|
||||||
ctx._result = res
|
res = await coro
|
||||||
|
|
||||||
# deliver final result to caller side.
|
# deliver final result to caller side.
|
||||||
await chan.send({
|
await chan.send({
|
||||||
|
@ -312,18 +314,11 @@ async def _invoke(
|
||||||
# don't pop the local context until we know the
|
# don't pop the local context until we know the
|
||||||
# associated child isn't in debug any more
|
# associated child isn't in debug any more
|
||||||
await maybe_wait_for_debugger()
|
await maybe_wait_for_debugger()
|
||||||
ctx: Context = actor._contexts.pop(
|
ctx: Context = actor._contexts.pop((chan.uid, cid))
|
||||||
(chan.uid, cid)
|
|
||||||
)
|
|
||||||
|
|
||||||
res_str: str = (
|
|
||||||
'error: {ctx._local_error}'
|
|
||||||
if ctx._local_error
|
|
||||||
else f'result: {ctx._result}'
|
|
||||||
)
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'IPC context terminated with final {res_str}\n'
|
f'Context task was terminated:\n'
|
||||||
f'|_{pformat(ctx)}\n'
|
f'func: {func}\n'
|
||||||
|
f'ctx: {pformat(ctx)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if ctx.cancelled_caught:
|
if ctx.cancelled_caught:
|
||||||
|
@ -336,6 +331,7 @@ async def _invoke(
|
||||||
ctx._maybe_raise_remote_err(re)
|
ctx._maybe_raise_remote_err(re)
|
||||||
|
|
||||||
# fname: str = func.__name__
|
# fname: str = func.__name__
|
||||||
|
task: Task = current_task()
|
||||||
cs: CancelScope = ctx._scope
|
cs: CancelScope = ctx._scope
|
||||||
if cs.cancel_called:
|
if cs.cancel_called:
|
||||||
our_uid: tuple = actor.uid
|
our_uid: tuple = actor.uid
|
||||||
|
@ -382,16 +378,16 @@ async def _invoke(
|
||||||
div_str +
|
div_str +
|
||||||
f'<= canceller: {canceller}\n'
|
f'<= canceller: {canceller}\n'
|
||||||
f'=> uid: {our_uid}\n'
|
f'=> uid: {our_uid}\n'
|
||||||
f' |_{ctx._task}()\n'
|
f' |_ task: `{task.name}()`'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: does this ever get set any more or can
|
# TODO: does this ever get set any more or can
|
||||||
# we remove it?
|
# we remove it?
|
||||||
if ctx._cancel_msg:
|
if ctx._cancel_msg:
|
||||||
msg += (
|
msg += (
|
||||||
# '------ - ------\n'
|
'------ - ------\n'
|
||||||
# 'IPC msg:\n'
|
'IPC msg:\n'
|
||||||
f'\n{ctx._cancel_msg}'
|
f'{ctx._cancel_msg}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# task-contex was either cancelled by request using
|
# task-contex was either cancelled by request using
|
||||||
|
@ -439,12 +435,7 @@ async def _invoke(
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
result = await coro
|
result = await coro
|
||||||
fname: str = func.__name__
|
fname: str = func.__name__
|
||||||
log.runtime(
|
log.runtime(f'{fname}() result: {result}')
|
||||||
'RPC complete:\n'
|
|
||||||
f'task: {ctx._task}\n'
|
|
||||||
f'|_cid={ctx.cid}\n'
|
|
||||||
f'|_{fname}() -> {pformat(result)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: only send result if we know IPC isn't down
|
# NOTE: only send result if we know IPC isn't down
|
||||||
if (
|
if (
|
||||||
|
@ -974,7 +965,7 @@ class Actor:
|
||||||
# and bail after timeout (2-generals on closure).
|
# and bail after timeout (2-generals on closure).
|
||||||
assert chan.msgstream
|
assert chan.msgstream
|
||||||
|
|
||||||
log.warning(
|
log.runtime(
|
||||||
f'Draining lingering msgs from stream {chan.msgstream}'
|
f'Draining lingering msgs from stream {chan.msgstream}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -986,24 +977,13 @@ class Actor:
|
||||||
# making sure any RPC response to that call is
|
# making sure any RPC response to that call is
|
||||||
# delivered the local calling task.
|
# delivered the local calling task.
|
||||||
# TODO: factor this into a helper?
|
# TODO: factor this into a helper?
|
||||||
log.warning(
|
log.runtime(f'drained {msg} for {chan.uid}')
|
||||||
'Draining msg from disconnected\n'
|
|
||||||
f'peer: {chan.uid}]\n\n'
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
cid = msg.get('cid')
|
cid = msg.get('cid')
|
||||||
if cid:
|
if cid:
|
||||||
# deliver response to local caller/waiter
|
# deliver response to local caller/waiter
|
||||||
await self._push_result(
|
await self._push_result(chan, cid, msg)
|
||||||
chan,
|
|
||||||
cid,
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
|
|
||||||
log.runtime(
|
log.runtime('Waiting on actor nursery to exit..')
|
||||||
'Waiting on local actor nursery to exit..\n'
|
|
||||||
f'|_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
await local_nursery.exited.wait()
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
if disconnected:
|
if disconnected:
|
||||||
|
@ -1187,7 +1167,6 @@ class Actor:
|
||||||
self,
|
self,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
cid: str,
|
cid: str,
|
||||||
nsf: NamespacePath,
|
|
||||||
|
|
||||||
msg_buffer_size: int | None = None,
|
msg_buffer_size: int | None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
|
@ -1201,15 +1180,11 @@ class Actor:
|
||||||
task-as-function invocation.
|
task-as-function invocation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
log.runtime(f"Getting result queue for {chan.uid} cid {cid}")
|
||||||
actor_uid = chan.uid
|
actor_uid = chan.uid
|
||||||
assert actor_uid
|
assert actor_uid
|
||||||
try:
|
try:
|
||||||
ctx = self._contexts[(actor_uid, cid)]
|
ctx = self._contexts[(actor_uid, cid)]
|
||||||
log.runtime(
|
|
||||||
f'Retreived cached IPC ctx for\n'
|
|
||||||
f'peer: {chan.uid}\n'
|
|
||||||
f'cid:{cid}\n'
|
|
||||||
)
|
|
||||||
ctx._allow_overruns = allow_overruns
|
ctx._allow_overruns = allow_overruns
|
||||||
|
|
||||||
# adjust buffer size if specified
|
# adjust buffer size if specified
|
||||||
|
@ -1218,15 +1193,9 @@ class Actor:
|
||||||
state.max_buffer_size = msg_buffer_size
|
state.max_buffer_size = msg_buffer_size
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.runtime(
|
|
||||||
f'Creating NEW IPC ctx for\n'
|
|
||||||
f'peer: {chan.uid}\n'
|
|
||||||
f'cid: {cid}\n'
|
|
||||||
)
|
|
||||||
ctx = mk_context(
|
ctx = mk_context(
|
||||||
chan,
|
chan,
|
||||||
cid,
|
cid,
|
||||||
nsf=nsf,
|
|
||||||
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
||||||
_allow_overruns=allow_overruns,
|
_allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
@ -1237,13 +1206,11 @@ class Actor:
|
||||||
async def start_remote_task(
|
async def start_remote_task(
|
||||||
self,
|
self,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
nsf: NamespacePath,
|
ns: str,
|
||||||
|
func: str,
|
||||||
kwargs: dict,
|
kwargs: dict,
|
||||||
|
|
||||||
# IPC channel config
|
|
||||||
msg_buffer_size: int | None = None,
|
msg_buffer_size: int | None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
load_nsf: bool = False,
|
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -1258,43 +1225,20 @@ class Actor:
|
||||||
cid = str(uuid.uuid4())
|
cid = str(uuid.uuid4())
|
||||||
assert chan.uid
|
assert chan.uid
|
||||||
ctx = self.get_context(
|
ctx = self.get_context(
|
||||||
chan=chan,
|
chan,
|
||||||
cid=cid,
|
cid,
|
||||||
nsf=nsf,
|
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||||
if (
|
|
||||||
'self' in nsf
|
|
||||||
or not load_nsf
|
|
||||||
):
|
|
||||||
ns, _, func = nsf.partition(':')
|
|
||||||
else:
|
|
||||||
# TODO: pass nsf directly over wire!
|
|
||||||
# -[ ] but, how to do `self:<Actor.meth>`??
|
|
||||||
ns, func = nsf.to_tuple()
|
|
||||||
|
|
||||||
log.runtime(
|
|
||||||
'Sending cmd to\n'
|
|
||||||
f'peer: {chan.uid} => \n'
|
|
||||||
'\n'
|
|
||||||
f'=> {ns}.{func}({kwargs})\n'
|
|
||||||
)
|
|
||||||
await chan.send(
|
await chan.send(
|
||||||
{'cmd': (
|
{'cmd': (ns, func, kwargs, self.uid, cid)}
|
||||||
ns,
|
|
||||||
func,
|
|
||||||
kwargs,
|
|
||||||
self.uid,
|
|
||||||
cid,
|
|
||||||
)}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Wait on first response msg and validate; this should be
|
# Wait on first response msg and validate; this should be
|
||||||
# immediate.
|
# immediate.
|
||||||
first_msg: dict = await ctx._recv_chan.receive()
|
first_msg = await ctx._recv_chan.receive()
|
||||||
functype: str = first_msg.get('functype')
|
functype = first_msg.get('functype')
|
||||||
|
|
||||||
if 'error' in first_msg:
|
if 'error' in first_msg:
|
||||||
raise unpack_error(first_msg, chan)
|
raise unpack_error(first_msg, chan)
|
||||||
|
@ -1336,19 +1280,14 @@ class Actor:
|
||||||
parent_data: dict[str, Any]
|
parent_data: dict[str, Any]
|
||||||
parent_data = await chan.recv()
|
parent_data = await chan.recv()
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Received state from parent:\n\n'
|
"Received state from parent:\n"
|
||||||
# TODO: eventually all these msgs as
|
f"{parent_data}"
|
||||||
# `msgspec.Struct` with a special mode that
|
|
||||||
# pformats them in multi-line mode, BUT only
|
|
||||||
# if "trace"/"util" mode is enabled?
|
|
||||||
f'{pformat(parent_data)}\n'
|
|
||||||
)
|
)
|
||||||
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
|
accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
|
||||||
rvs = parent_data.pop('_runtime_vars')
|
rvs = parent_data.pop('_runtime_vars')
|
||||||
|
|
||||||
if rvs['_debug_mode']:
|
if rvs['_debug_mode']:
|
||||||
try:
|
try:
|
||||||
log.info('Enabling `stackscope` traces on SIGUSR1')
|
|
||||||
from .devx import enable_stack_on_sig
|
from .devx import enable_stack_on_sig
|
||||||
enable_stack_on_sig()
|
enable_stack_on_sig()
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -1429,8 +1368,7 @@ class Actor:
|
||||||
for listener in listeners
|
for listener in listeners
|
||||||
]
|
]
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Started TCP server(s)\n'
|
f'Started tcp server(s) on {sockets}'
|
||||||
f'|_{sockets}\n'
|
|
||||||
)
|
)
|
||||||
self._listeners.extend(listeners)
|
self._listeners.extend(listeners)
|
||||||
|
|
||||||
|
@ -1542,20 +1480,8 @@ class Actor:
|
||||||
# be cancelled was indeed spawned by a request from this channel
|
# be cancelled was indeed spawned by a request from this channel
|
||||||
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
||||||
scope: CancelScope = ctx._scope
|
scope: CancelScope = ctx._scope
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# NOTE: during msging race conditions this will often
|
log.cancel(f"{cid} has already completed/terminated?")
|
||||||
# emit, some examples:
|
|
||||||
# - callee returns a result before cancel-msg/ctxc-raised
|
|
||||||
# - callee self raises ctxc before caller send request,
|
|
||||||
# - callee errors prior to cancel req.
|
|
||||||
log.cancel(
|
|
||||||
'Cancel request invalid, RPC task already completed?\n'
|
|
||||||
f'<= canceller: {requesting_uid}\n'
|
|
||||||
f' |_{chan}\n\n'
|
|
||||||
|
|
||||||
f'=> ctx id: {cid}\n'
|
|
||||||
)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
@ -1997,7 +1923,7 @@ async def process_messages(
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Entering IPC msg loop:\n'
|
'Entering IPC msg loop:\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'|_{chan}\n'
|
f'|_{chan}'
|
||||||
)
|
)
|
||||||
nursery_cancelled_before_task: bool = False
|
nursery_cancelled_before_task: bool = False
|
||||||
msg: dict | None = None
|
msg: dict | None = None
|
||||||
|
@ -2034,10 +1960,8 @@ async def process_messages(
|
||||||
|
|
||||||
log.transport( # type: ignore
|
log.transport( # type: ignore
|
||||||
f'<= IPC msg from peer: {chan.uid}\n\n'
|
f'<= IPC msg from peer: {chan.uid}\n\n'
|
||||||
|
|
||||||
# TODO: conditionally avoid fmting depending
|
# TODO: conditionally avoid fmting depending
|
||||||
# on log level (for perf)?
|
# on log level (for perf)?
|
||||||
# => specifically `pformat()` sub-call..?
|
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2045,35 +1969,19 @@ async def process_messages(
|
||||||
if cid:
|
if cid:
|
||||||
# deliver response to local caller/waiter
|
# deliver response to local caller/waiter
|
||||||
# via its per-remote-context memory channel.
|
# via its per-remote-context memory channel.
|
||||||
await actor._push_result(
|
await actor._push_result(chan, cid, msg)
|
||||||
chan,
|
|
||||||
cid,
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on next IPC msg from\n'
|
f'Waiting on next IPC msg from {chan.uid}:\n'
|
||||||
f'peer: {chan.uid}:\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
|
|
||||||
# f'last msg: {msg}\n'
|
# f'last msg: {msg}\n'
|
||||||
|
f'|_{chan}'
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# process a 'cmd' request-msg upack
|
# TODO: implement with ``match:`` syntax?
|
||||||
# TODO: impl with native `msgspec.Struct` support !!
|
# process command request
|
||||||
# -[ ] implement with ``match:`` syntax?
|
|
||||||
# -[ ] discard un-authed msgs as per,
|
|
||||||
# <TODO put issue for typed msging structs>
|
|
||||||
try:
|
try:
|
||||||
(
|
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
||||||
ns,
|
|
||||||
funcname,
|
|
||||||
kwargs,
|
|
||||||
actorid,
|
|
||||||
cid,
|
|
||||||
) = msg['cmd']
|
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# This is the non-rpc error case, that is, an
|
# This is the non-rpc error case, that is, an
|
||||||
# error **not** raised inside a call to ``_invoke()``
|
# error **not** raised inside a call to ``_invoke()``
|
||||||
|
@ -2086,33 +1994,29 @@ async def process_messages(
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Handling RPC cmd from\n'
|
f"Processing request from {actorid}\n"
|
||||||
f'peer: {actorid}\n'
|
f"{ns}.{funcname}({kwargs})")
|
||||||
'\n'
|
|
||||||
f'=> {ns}.{funcname}({kwargs})\n'
|
|
||||||
)
|
|
||||||
if ns == 'self':
|
if ns == 'self':
|
||||||
uid: tuple = chan.uid
|
|
||||||
if funcname == 'cancel':
|
if funcname == 'cancel':
|
||||||
func: Callable = actor.cancel
|
func: Callable = actor.cancel
|
||||||
kwargs['requesting_uid'] = uid
|
kwargs['requesting_uid'] = chan.uid
|
||||||
|
|
||||||
# don't start entire actor runtime cancellation
|
# don't start entire actor runtime cancellation
|
||||||
# if this actor is currently in debug mode!
|
# if this actor is currently in debug mode!
|
||||||
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
|
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
|
||||||
# Either of `Actor.cancel()`/`.cancel_soon()`
|
# we immediately start the runtime machinery
|
||||||
# was called, so terminate this IPC msg
|
# shutdown
|
||||||
# loop, exit back out into `async_main()`,
|
|
||||||
# and immediately start the core runtime
|
|
||||||
# machinery shutdown!
|
|
||||||
with CancelScope(shield=True):
|
with CancelScope(shield=True):
|
||||||
|
# actor.cancel() was called so kill this
|
||||||
|
# msg loop and break out into
|
||||||
|
# ``async_main()``
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Cancel request for `Actor` runtime\n'
|
"Actor runtime for was remotely cancelled "
|
||||||
f'<= canceller: {uid}\n'
|
f"by {chan.uid}"
|
||||||
# f'=> uid: {actor.uid}\n'
|
|
||||||
)
|
)
|
||||||
await _invoke(
|
await _invoke(
|
||||||
actor,
|
actor,
|
||||||
|
@ -2139,10 +2043,9 @@ async def process_messages(
|
||||||
target_cid = kwargs['cid']
|
target_cid = kwargs['cid']
|
||||||
kwargs['requesting_uid'] = chan.uid
|
kwargs['requesting_uid'] = chan.uid
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Rx task cancel request\n'
|
f'Remote request to cancel task\n'
|
||||||
f'<= canceller: {chan.uid}\n'
|
f'remote actor: {chan.uid}\n'
|
||||||
f'=> uid: {actor.uid}\n'
|
f'task: {target_cid}'
|
||||||
f' |_cid: {target_cid}\n'
|
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
await _invoke(
|
await _invoke(
|
||||||
|
@ -2202,18 +2105,17 @@ async def process_messages(
|
||||||
# in the lone case where a ``Context`` is not
|
# in the lone case where a ``Context`` is not
|
||||||
# delivered, it's likely going to be a locally
|
# delivered, it's likely going to be a locally
|
||||||
# scoped exception from ``_invoke()`` itself.
|
# scoped exception from ``_invoke()`` itself.
|
||||||
if isinstance(err := ctx, Exception):
|
if isinstance(ctx, Exception):
|
||||||
log.warning(
|
log.warning(
|
||||||
'Task for RPC failed?'
|
f"Task for RPC func {func} failed with"
|
||||||
f'|_ {func}()\n\n'
|
f"{ctx}"
|
||||||
|
|
||||||
f'{err}'
|
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# mark that we have ongoing rpc tasks
|
# mark that we have ongoing rpc tasks
|
||||||
actor._ongoing_rpc_tasks = trio.Event()
|
actor._ongoing_rpc_tasks = trio.Event()
|
||||||
|
log.runtime(f"RPC func is {func}")
|
||||||
|
|
||||||
# store cancel scope such that the rpc task can be
|
# store cancel scope such that the rpc task can be
|
||||||
# cancelled gracefully if requested
|
# cancelled gracefully if requested
|
||||||
|
@ -2224,10 +2126,7 @@ async def process_messages(
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on next IPC msg from\n'
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
f'peer: {chan.uid}\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# end of async for, channel disconnect vis
|
# end of async for, channel disconnect vis
|
||||||
# ``trio.EndOfChannel``
|
# ``trio.EndOfChannel``
|
||||||
|
@ -2244,12 +2143,9 @@ async def process_messages(
|
||||||
# handshake for them (yet) and instead we simply bail out of
|
# handshake for them (yet) and instead we simply bail out of
|
||||||
# the message loop and expect the teardown sequence to clean
|
# the message loop and expect the teardown sequence to clean
|
||||||
# up.
|
# up.
|
||||||
# TODO: don't show this msg if it's an emphemeral
|
|
||||||
# discovery ep call?
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'channel closed abruptly with\n'
|
f'channel from {chan.uid} closed abruptly:\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'-> {chan.raddr}\n'
|
||||||
f'|_{chan.raddr}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# transport **was** disconnected
|
# transport **was** disconnected
|
||||||
|
@ -2291,11 +2187,9 @@ async def process_messages(
|
||||||
finally:
|
finally:
|
||||||
# msg debugging for when he machinery is brokey
|
# msg debugging for when he machinery is brokey
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Exiting IPC msg loop with\n'
|
f'Exiting IPC msg loop with {chan.uid} '
|
||||||
f'peer: {chan.uid}\n'
|
f'final msg: {msg}\n'
|
||||||
f'|_{chan}\n\n'
|
f'|_{chan}'
|
||||||
'final msg:\n'
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# transport **was not** disconnected
|
# transport **was not** disconnected
|
||||||
|
|
|
@ -400,7 +400,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
else:
|
else:
|
||||||
log.exception(
|
log.exception(
|
||||||
f"Nursery for {current_actor().uid} "
|
f"Nursery for {current_actor().uid} "
|
||||||
"errored with:"
|
"errored with\n"
|
||||||
|
|
||||||
# TODO: same thing as in
|
# TODO: same thing as in
|
||||||
# `._invoke()` to compute how to
|
# `._invoke()` to compute how to
|
||||||
|
|
|
@ -1,19 +1,18 @@
|
||||||
# tractor: structured concurrent "actors".
|
# tractor: structured concurrent "actors".
|
||||||
# Copyright 2018-eternity Tyler Goodlet.
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# modify it under the terms of the GNU Affero General Public License
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
# as published by the Free Software Foundation, either version 3 of
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
# the License, or (at your option) any later version.
|
# (at your option) any later version.
|
||||||
|
|
||||||
# This program is distributed in the hope that it will be useful, but
|
# This program is distributed in the hope that it will be useful,
|
||||||
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
# Affero General Public License for more details.
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
# You should have received a copy of the GNU Affero General Public
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# License along with this program. If not, see
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
# <https://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Multi-core debugging for da peeps!
|
Multi-core debugging for da peeps!
|
||||||
|
@ -44,7 +43,6 @@ from types import FrameType
|
||||||
import pdbp
|
import pdbp
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
from trio.lowlevel import current_task
|
|
||||||
from trio_typing import (
|
from trio_typing import (
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
# Task,
|
# Task,
|
||||||
|
@ -52,7 +50,6 @@ from trio_typing import (
|
||||||
|
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from .._state import (
|
from .._state import (
|
||||||
current_actor,
|
|
||||||
is_root_process,
|
is_root_process,
|
||||||
debug_mode,
|
debug_mode,
|
||||||
)
|
)
|
||||||
|
@ -241,7 +238,7 @@ async def _acquire_debug_lock_from_root_task(
|
||||||
to the ``pdb`` repl.
|
to the ``pdb`` repl.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
task_name: str = current_task().name
|
task_name: str = trio.lowlevel.current_task().name
|
||||||
we_acquired: bool = False
|
we_acquired: bool = False
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -326,7 +323,8 @@ async def lock_tty_for_child(
|
||||||
highly reliable at releasing the mutex complete!
|
highly reliable at releasing the mutex complete!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
task_name: str = current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
if tuple(subactor_uid) in Lock._blocked:
|
if tuple(subactor_uid) in Lock._blocked:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
|
f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
|
||||||
|
@ -409,13 +407,11 @@ async def wait_for_parent_stdin_hijack(
|
||||||
assert val == 'Locked'
|
assert val == 'Locked'
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
# unblock local caller
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# unblock local caller
|
|
||||||
assert Lock.local_pdb_complete
|
assert Lock.local_pdb_complete
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
|
|
||||||
# wait for local task to exit and
|
|
||||||
# release the REPL
|
|
||||||
await Lock.local_pdb_complete.wait()
|
await Lock.local_pdb_complete.wait()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -472,7 +468,7 @@ def shield_sigint_handler(
|
||||||
|
|
||||||
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
|
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
|
||||||
|
|
||||||
actor = current_actor()
|
actor = tractor.current_actor()
|
||||||
# print(f'{actor.uid} in HANDLER with ')
|
# print(f'{actor.uid} in HANDLER with ')
|
||||||
|
|
||||||
def do_cancel():
|
def do_cancel():
|
||||||
|
@ -617,7 +613,7 @@ def _set_trace(
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
):
|
):
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
actor: tractor.Actor = actor or current_actor()
|
actor: tractor.Actor = actor or tractor.current_actor()
|
||||||
|
|
||||||
# start 2 levels up in user code
|
# start 2 levels up in user code
|
||||||
frame: FrameType | None = sys._getframe()
|
frame: FrameType | None = sys._getframe()
|
||||||
|
@ -687,9 +683,9 @@ async def pause(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# __tracebackhide__ = True
|
# __tracebackhide__ = True
|
||||||
actor = current_actor()
|
actor = tractor.current_actor()
|
||||||
pdb, undo_sigint = mk_mpdb()
|
pdb, undo_sigint = mk_mpdb()
|
||||||
task_name: str = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not Lock.local_pdb_complete
|
not Lock.local_pdb_complete
|
||||||
|
@ -840,7 +836,7 @@ async def pause(
|
||||||
# runtime aware version which takes care of all .
|
# runtime aware version which takes care of all .
|
||||||
def pause_from_sync() -> None:
|
def pause_from_sync() -> None:
|
||||||
print("ENTER SYNC PAUSE")
|
print("ENTER SYNC PAUSE")
|
||||||
actor: tractor.Actor = current_actor(
|
actor: tractor.Actor = tractor.current_actor(
|
||||||
err_on_no_runtime=False,
|
err_on_no_runtime=False,
|
||||||
)
|
)
|
||||||
if actor:
|
if actor:
|
||||||
|
@ -975,10 +971,9 @@ async def acquire_debug_lock(
|
||||||
'''
|
'''
|
||||||
Grab root's debug lock on entry, release on exit.
|
Grab root's debug lock on entry, release on exit.
|
||||||
|
|
||||||
This helper is for actor's who don't actually need to acquired
|
This helper is for actor's who don't actually need
|
||||||
the debugger but want to wait until the lock is free in the
|
to acquired the debugger but want to wait until the
|
||||||
process-tree root such that they don't clobber an ongoing pdb
|
lock is free in the process-tree root.
|
||||||
REPL session in some peer or child!
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if not debug_mode():
|
if not debug_mode():
|
||||||
|
@ -1018,71 +1013,43 @@ async def maybe_wait_for_debugger(
|
||||||
# tearing down.
|
# tearing down.
|
||||||
sub_in_debug: tuple[str, str] | None = None
|
sub_in_debug: tuple[str, str] | None = None
|
||||||
|
|
||||||
for istep in range(poll_steps):
|
for _ in range(poll_steps):
|
||||||
|
|
||||||
if sub_in_debug := Lock.global_actor_in_debug:
|
if Lock.global_actor_in_debug:
|
||||||
log.pdb(
|
sub_in_debug = tuple(Lock.global_actor_in_debug)
|
||||||
f'Lock in use by {sub_in_debug}'
|
|
||||||
)
|
log.debug('Root polling for debug')
|
||||||
# TODO: could this make things more deterministic?
|
|
||||||
# wait to see if a sub-actor task will be
|
with trio.CancelScope(shield=True):
|
||||||
# scheduled and grab the tty lock on the next
|
await trio.sleep(poll_delay)
|
||||||
# tick?
|
|
||||||
# XXX => but it doesn't seem to work..
|
# TODO: could this make things more deterministic? wait
|
||||||
|
# to see if a sub-actor task will be scheduled and grab
|
||||||
|
# the tty lock on the next tick?
|
||||||
|
# XXX: doesn't seem to work
|
||||||
# await trio.testing.wait_all_tasks_blocked(cushion=0)
|
# await trio.testing.wait_all_tasks_blocked(cushion=0)
|
||||||
|
|
||||||
debug_complete: trio.Event|None = Lock.no_remote_has_tty
|
debug_complete = Lock.no_remote_has_tty
|
||||||
if (
|
if (
|
||||||
debug_complete
|
debug_complete
|
||||||
and not debug_complete.is_set()
|
and sub_in_debug is not None
|
||||||
and sub_in_debug is not None
|
and not debug_complete.is_set()
|
||||||
):
|
):
|
||||||
log.pdb(
|
log.pdb(
|
||||||
'Root has errored but pdb is in use by child\n'
|
'Root has errored but pdb is in use by '
|
||||||
'Waiting on tty lock to release..\n'
|
f'child {sub_in_debug}\n'
|
||||||
f'uid: {sub_in_debug}\n'
|
'Waiting on tty lock to release..'
|
||||||
)
|
)
|
||||||
await debug_complete.wait()
|
|
||||||
log.pdb(
|
|
||||||
f'Child subactor released debug lock!\n'
|
|
||||||
f'uid: {sub_in_debug}\n'
|
|
||||||
)
|
|
||||||
if debug_complete.is_set():
|
|
||||||
break
|
|
||||||
|
|
||||||
# is no subactor locking debugger currently?
|
await debug_complete.wait()
|
||||||
elif (
|
|
||||||
debug_complete is None
|
|
||||||
or sub_in_debug is None
|
|
||||||
):
|
|
||||||
log.pdb(
|
|
||||||
'Root acquired debug TTY LOCK from child\n'
|
|
||||||
f'uid: {sub_in_debug}'
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
else:
|
await trio.sleep(poll_delay)
|
||||||
# TODO: don't need this right?
|
continue
|
||||||
# await trio.lowlevel.checkpoint()
|
|
||||||
|
|
||||||
log.debug(
|
|
||||||
'Root polling for debug:\n'
|
|
||||||
f'poll step: {istep}\n'
|
|
||||||
f'poll delya: {poll_delay}'
|
|
||||||
)
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await trio.sleep(poll_delay)
|
|
||||||
continue
|
|
||||||
else:
|
else:
|
||||||
log.pdb('Root acquired debug TTY LOCK')
|
log.debug(
|
||||||
|
'Root acquired TTY LOCK'
|
||||||
|
)
|
||||||
|
|
||||||
# else:
|
|
||||||
# # TODO: non-root call for #320?
|
|
||||||
# this_uid: tuple[str, str] = current_actor().uid
|
|
||||||
# async with acquire_debug_lock(
|
|
||||||
# subactor_uid=this_uid,
|
|
||||||
# ):
|
|
||||||
# pass
|
|
||||||
|
|
||||||
# TODO: better naming and what additionals?
|
# TODO: better naming and what additionals?
|
||||||
# - [ ] optional runtime plugging?
|
# - [ ] optional runtime plugging?
|
||||||
|
|
|
@ -58,11 +58,6 @@ class NamespacePath(str):
|
||||||
'''
|
'''
|
||||||
_ref: object | type | None = None
|
_ref: object | type | None = None
|
||||||
|
|
||||||
# TODO: support providing the ns instance in
|
|
||||||
# order to support 'self.<meth>` style to make
|
|
||||||
# `Portal.run_from_ns()` work!
|
|
||||||
# _ns: ModuleType|type|None = None
|
|
||||||
|
|
||||||
def load_ref(self) -> object | type:
|
def load_ref(self) -> object | type:
|
||||||
if self._ref is None:
|
if self._ref is None:
|
||||||
self._ref = resolve_name(self)
|
self._ref = resolve_name(self)
|
||||||
|
@ -105,13 +100,5 @@ class NamespacePath(str):
|
||||||
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
|
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
|
||||||
return cls(':'.join(fqnp))
|
return cls(':'.join(fqnp))
|
||||||
|
|
||||||
def to_tuple(
|
def to_tuple(self) -> tuple[str, str]:
|
||||||
self,
|
return self._mk_fqnp(self.load_ref())
|
||||||
|
|
||||||
# TODO: could this work re `self:<meth>` case from above?
|
|
||||||
# load_ref: bool = True,
|
|
||||||
|
|
||||||
) -> tuple[str, str]:
|
|
||||||
return self._mk_fqnp(
|
|
||||||
self.load_ref()
|
|
||||||
)
|
|
||||||
|
|
Loading…
Reference in New Issue