Compare commits
25 Commits
fdf0c43bfa
...
9fc9b10b53
Author | SHA1 | Date |
---|---|---|
|
9fc9b10b53 | |
|
a86275996c | |
|
b5431c0343 | |
|
cdee6f9354 | |
|
a2f1bcc23f | |
|
4aa89bf391 | |
|
45e9cb4d09 | |
|
27c5ffe5a7 | |
|
914efd80eb | |
|
2d2d1ca1c4 | |
|
74aa5aa9cd | |
|
44e386dd99 | |
|
13fbcc723f | |
|
315f0fc7eb | |
|
fea111e882 | |
|
a1bf4db1e3 | |
|
bac9523ecf | |
|
abe31e9e2c | |
|
0222180c11 | |
|
7d5fda4485 | |
|
f5fcd8ca2e | |
|
04217f319a | |
|
8cb8390201 | |
|
5035617adf | |
|
715348c5c2 |
|
@ -47,7 +47,7 @@ async def do_nuthin():
|
|||
],
|
||||
ids=['no_args', 'unexpected_args'],
|
||||
)
|
||||
def test_remote_error(arb_addr, args_err):
|
||||
def test_remote_error(reg_addr, args_err):
|
||||
"""Verify an error raised in a subactor that is propagated
|
||||
to the parent nursery, contains the underlying boxed builtin
|
||||
error type info and causes cancellation and reraising all the
|
||||
|
@ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err):
|
|||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
) as nursery:
|
||||
|
||||
# on a remote type error caused by bad input args
|
||||
|
@ -97,7 +97,7 @@ def test_remote_error(arb_addr, args_err):
|
|||
assert exc.type == errtype
|
||||
|
||||
|
||||
def test_multierror(arb_addr):
|
||||
def test_multierror(reg_addr):
|
||||
'''
|
||||
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
||||
more then one actor errors.
|
||||
|
@ -105,7 +105,7 @@ def test_multierror(arb_addr):
|
|||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
) as nursery:
|
||||
|
||||
await nursery.run_in_actor(assert_err, name='errorer1')
|
||||
|
@ -130,14 +130,14 @@ def test_multierror(arb_addr):
|
|||
@pytest.mark.parametrize(
|
||||
'num_subactors', range(25, 26),
|
||||
)
|
||||
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
|
||||
def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
|
||||
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
||||
more then one actor errors and also with a delay before failure
|
||||
to test failure during an ongoing spawning.
|
||||
"""
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
) as nursery:
|
||||
|
||||
for i in range(num_subactors):
|
||||
|
@ -175,15 +175,20 @@ async def do_nothing():
|
|||
|
||||
|
||||
@pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt])
|
||||
def test_cancel_single_subactor(arb_addr, mechanism):
|
||||
"""Ensure a ``ActorNursery.start_actor()`` spawned subactor
|
||||
def test_cancel_single_subactor(reg_addr, mechanism):
|
||||
'''
|
||||
Ensure a ``ActorNursery.start_actor()`` spawned subactor
|
||||
cancels when the nursery is cancelled.
|
||||
"""
|
||||
|
||||
'''
|
||||
async def spawn_actor():
|
||||
"""Spawn an actor that blocks indefinitely.
|
||||
"""
|
||||
'''
|
||||
Spawn an actor that blocks indefinitely then cancel via
|
||||
either `ActorNursery.cancel()` or an exception raise.
|
||||
|
||||
'''
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
) as nursery:
|
||||
|
||||
portal = await nursery.start_actor(
|
||||
|
|
|
@ -141,7 +141,7 @@ async def open_actor_local_nursery(
|
|||
)
|
||||
def test_actor_managed_trio_nursery_task_error_cancels_aio(
|
||||
asyncio_mode: bool,
|
||||
arb_addr
|
||||
reg_addr: tuple,
|
||||
):
|
||||
'''
|
||||
Verify that a ``trio`` nursery created managed in a child actor
|
||||
|
|
|
@ -5,7 +5,7 @@ Verify the we raise errors when streams are opened prior to
|
|||
sync-opening a ``tractor.Context`` beforehand.
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
# from contextlib import asynccontextmanager as acm
|
||||
from itertools import count
|
||||
import platform
|
||||
from typing import Optional
|
||||
|
@ -13,6 +13,11 @@ from typing import Optional
|
|||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import (
|
||||
Actor,
|
||||
Context,
|
||||
current_actor,
|
||||
)
|
||||
from tractor._exceptions import (
|
||||
StreamOverrun,
|
||||
ContextCancelled,
|
||||
|
@ -193,9 +198,6 @@ def test_simple_context(
|
|||
else:
|
||||
assert await ctx.result() == 'yo'
|
||||
|
||||
if not error_parent:
|
||||
await ctx.cancel()
|
||||
|
||||
if pointlessly_open_stream:
|
||||
async with ctx.open_stream():
|
||||
if error_parent:
|
||||
|
@ -208,10 +210,15 @@ def test_simple_context(
|
|||
# 'stop' msg to the far end which needs
|
||||
# to be ignored
|
||||
pass
|
||||
|
||||
else:
|
||||
if error_parent:
|
||||
raise error_parent
|
||||
|
||||
# cancel AFTER we open a stream
|
||||
# to avoid a cancel raised inside
|
||||
# `.open_stream()`
|
||||
await ctx.cancel()
|
||||
finally:
|
||||
|
||||
# after cancellation
|
||||
|
@ -276,7 +283,7 @@ def test_caller_cancels(
|
|||
assert (
|
||||
tuple(err.canceller)
|
||||
==
|
||||
tractor.current_actor().uid
|
||||
current_actor().uid
|
||||
)
|
||||
|
||||
async def main():
|
||||
|
@ -430,9 +437,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
):
|
||||
'caller context closes without using stream'
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
async with tractor.open_nursery() as an:
|
||||
|
||||
portal = await n.start_actor(
|
||||
root: Actor = current_actor()
|
||||
|
||||
portal = await an.start_actor(
|
||||
'ctx_cancelled',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -440,10 +449,10 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
async with portal.open_context(
|
||||
expect_cancelled,
|
||||
) as (ctx, sent):
|
||||
await portal.run(assert_state, value=True)
|
||||
|
||||
assert sent is None
|
||||
|
||||
await portal.run(assert_state, value=True)
|
||||
|
||||
# call cancel explicitly
|
||||
if use_ctx_cancel_method:
|
||||
|
||||
|
@ -454,8 +463,21 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
async for msg in stream:
|
||||
pass
|
||||
|
||||
except tractor.ContextCancelled:
|
||||
raise # XXX: must be propagated to __aexit__
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
# XXX: the cause is US since we call
|
||||
# `Context.cancel()` just above!
|
||||
assert (
|
||||
ctxc.canceller
|
||||
==
|
||||
current_actor().uid
|
||||
==
|
||||
root.uid
|
||||
)
|
||||
|
||||
# XXX: must be propagated to __aexit__
|
||||
# and should be silently absorbed there
|
||||
# since we called `.cancel()` just above ;)
|
||||
raise
|
||||
|
||||
else:
|
||||
assert 0, "Should have context cancelled?"
|
||||
|
@ -472,7 +494,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
await ctx.result()
|
||||
assert 0, "Callee should have blocked!?"
|
||||
except trio.TooSlowError:
|
||||
# NO-OP -> since already called above
|
||||
await ctx.cancel()
|
||||
|
||||
# local scope should have absorbed the cancellation
|
||||
assert ctx.cancelled_caught
|
||||
assert ctx._remote_error is ctx._local_error
|
||||
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
async for msg in stream:
|
||||
|
@ -551,19 +579,25 @@ async def cancel_self(
|
|||
global _state
|
||||
_state = True
|
||||
|
||||
# since we call this the below `.open_stream()` should always
|
||||
# error!
|
||||
await ctx.cancel()
|
||||
|
||||
# should inline raise immediately
|
||||
try:
|
||||
async with ctx.open_stream():
|
||||
pass
|
||||
except tractor.ContextCancelled:
|
||||
# except tractor.ContextCancelled:
|
||||
except RuntimeError:
|
||||
# suppress for now so we can do checkpoint tests below
|
||||
pass
|
||||
print('Got expected runtime error for stream-after-cancel')
|
||||
|
||||
else:
|
||||
raise RuntimeError('Context didnt cancel itself?!')
|
||||
|
||||
# check a real ``trio.Cancelled`` is raised on a checkpoint
|
||||
# check that``trio.Cancelled`` is now raised on any further
|
||||
# checkpoints since the self cancel above will have cancelled
|
||||
# the `Context._scope.cancel_scope: trio.CancelScope`
|
||||
try:
|
||||
with trio.fail_after(0.1):
|
||||
await trio.sleep_forever()
|
||||
|
@ -574,6 +608,7 @@ async def cancel_self(
|
|||
# should never get here
|
||||
assert 0
|
||||
|
||||
raise RuntimeError('Context didnt cancel itself?!')
|
||||
|
||||
@tractor_test
|
||||
async def test_callee_cancels_before_started():
|
||||
|
@ -601,7 +636,7 @@ async def test_callee_cancels_before_started():
|
|||
ce.type == trio.Cancelled
|
||||
|
||||
# the traceback should be informative
|
||||
assert 'cancelled itself' in ce.msgdata['tb_str']
|
||||
assert 'itself' in ce.msgdata['tb_str']
|
||||
|
||||
# teardown the actor
|
||||
await portal.cancel_actor()
|
||||
|
@ -773,7 +808,7 @@ async def echo_back_sequence(
|
|||
|
||||
print(
|
||||
'EXITING CALLEEE:\n'
|
||||
f'{ctx.cancel_called_remote}'
|
||||
f'{ctx.canceller}'
|
||||
)
|
||||
return 'yo'
|
||||
|
||||
|
@ -871,7 +906,7 @@ def test_maybe_allow_overruns_stream(
|
|||
|
||||
if cancel_ctx:
|
||||
assert isinstance(res, ContextCancelled)
|
||||
assert tuple(res.canceller) == tractor.current_actor().uid
|
||||
assert tuple(res.canceller) == current_actor().uid
|
||||
|
||||
else:
|
||||
print(f'RX ROOT SIDE RESULT {res}')
|
||||
|
|
|
@ -78,7 +78,7 @@ has_nested_actors = pytest.mark.has_nested_actors
|
|||
def spawn(
|
||||
start_method,
|
||||
testdir,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
) -> 'pexpect.spawn':
|
||||
|
||||
if start_method != 'trio':
|
||||
|
|
|
@ -15,19 +15,19 @@ from conftest import tractor_test
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_reg_then_unreg(arb_addr):
|
||||
async def test_reg_then_unreg(reg_addr):
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
assert len(actor._registry) == 1 # only self is registered
|
||||
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
) as n:
|
||||
|
||||
portal = await n.start_actor('actor', enable_modules=[__name__])
|
||||
uid = portal.channel.uid
|
||||
|
||||
async with tractor.get_arbiter(*arb_addr) as aportal:
|
||||
async with tractor.get_arbiter(*reg_addr) as aportal:
|
||||
# this local actor should be the arbiter
|
||||
assert actor is aportal.actor
|
||||
|
||||
|
@ -53,15 +53,27 @@ async def hi():
|
|||
return the_line.format(tractor.current_actor().name)
|
||||
|
||||
|
||||
async def say_hello(other_actor):
|
||||
async def say_hello(
|
||||
other_actor: str,
|
||||
reg_addr: tuple[str, int],
|
||||
):
|
||||
await trio.sleep(1) # wait for other actor to spawn
|
||||
async with tractor.find_actor(other_actor) as portal:
|
||||
async with tractor.find_actor(
|
||||
other_actor,
|
||||
registry_addrs=[reg_addr],
|
||||
) as portal:
|
||||
assert portal is not None
|
||||
return await portal.run(__name__, 'hi')
|
||||
|
||||
|
||||
async def say_hello_use_wait(other_actor):
|
||||
async with tractor.wait_for_actor(other_actor) as portal:
|
||||
async def say_hello_use_wait(
|
||||
other_actor: str,
|
||||
reg_addr: tuple[str, int],
|
||||
):
|
||||
async with tractor.wait_for_actor(
|
||||
other_actor,
|
||||
registry_addr=reg_addr,
|
||||
) as portal:
|
||||
assert portal is not None
|
||||
result = await portal.run(__name__, 'hi')
|
||||
return result
|
||||
|
@ -69,21 +81,29 @@ async def say_hello_use_wait(other_actor):
|
|||
|
||||
@tractor_test
|
||||
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
|
||||
async def test_trynamic_trio(func, start_method, arb_addr):
|
||||
"""Main tractor entry point, the "master" process (for now
|
||||
acts as the "director").
|
||||
"""
|
||||
async def test_trynamic_trio(
|
||||
func,
|
||||
start_method,
|
||||
reg_addr,
|
||||
):
|
||||
'''
|
||||
Root actor acting as the "director" and running one-shot-task-actors
|
||||
for the directed subs.
|
||||
|
||||
'''
|
||||
async with tractor.open_nursery() as n:
|
||||
print("Alright... Action!")
|
||||
|
||||
donny = await n.run_in_actor(
|
||||
func,
|
||||
other_actor='gretchen',
|
||||
reg_addr=reg_addr,
|
||||
name='donny',
|
||||
)
|
||||
gretchen = await n.run_in_actor(
|
||||
func,
|
||||
other_actor='donny',
|
||||
reg_addr=reg_addr,
|
||||
name='gretchen',
|
||||
)
|
||||
print(await gretchen.result())
|
||||
|
@ -131,7 +151,7 @@ async def unpack_reg(actor_or_portal):
|
|||
|
||||
|
||||
async def spawn_and_check_registry(
|
||||
arb_addr: tuple,
|
||||
reg_addr: tuple,
|
||||
use_signal: bool,
|
||||
remote_arbiter: bool = False,
|
||||
with_streaming: bool = False,
|
||||
|
@ -139,9 +159,9 @@ async def spawn_and_check_registry(
|
|||
) -> None:
|
||||
|
||||
async with tractor.open_root_actor(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
):
|
||||
async with tractor.get_arbiter(*arb_addr) as portal:
|
||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
||||
# runtime needs to be up to call this
|
||||
actor = tractor.current_actor()
|
||||
|
||||
|
@ -213,17 +233,19 @@ async def spawn_and_check_registry(
|
|||
def test_subactors_unregister_on_cancel(
|
||||
start_method,
|
||||
use_signal,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
with_streaming,
|
||||
):
|
||||
"""Verify that cancelling a nursery results in all subactors
|
||||
'''
|
||||
Verify that cancelling a nursery results in all subactors
|
||||
deregistering themselves with the arbiter.
|
||||
"""
|
||||
|
||||
'''
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
trio.run(
|
||||
partial(
|
||||
spawn_and_check_registry,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
use_signal,
|
||||
remote_arbiter=False,
|
||||
with_streaming=with_streaming,
|
||||
|
@ -237,7 +259,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
|
|||
daemon,
|
||||
start_method,
|
||||
use_signal,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
with_streaming,
|
||||
):
|
||||
"""Verify that cancelling a nursery results in all subactors
|
||||
|
@ -248,7 +270,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
|
|||
trio.run(
|
||||
partial(
|
||||
spawn_and_check_registry,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
use_signal,
|
||||
remote_arbiter=True,
|
||||
with_streaming=with_streaming,
|
||||
|
@ -262,7 +284,7 @@ async def streamer(agen):
|
|||
|
||||
|
||||
async def close_chans_before_nursery(
|
||||
arb_addr: tuple,
|
||||
reg_addr: tuple,
|
||||
use_signal: bool,
|
||||
remote_arbiter: bool = False,
|
||||
) -> None:
|
||||
|
@ -275,9 +297,9 @@ async def close_chans_before_nursery(
|
|||
entries_at_end = 1
|
||||
|
||||
async with tractor.open_root_actor(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
):
|
||||
async with tractor.get_arbiter(*arb_addr) as aportal:
|
||||
async with tractor.get_arbiter(*reg_addr) as aportal:
|
||||
try:
|
||||
get_reg = partial(unpack_reg, aportal)
|
||||
|
||||
|
@ -329,7 +351,7 @@ async def close_chans_before_nursery(
|
|||
def test_close_channel_explicit(
|
||||
start_method,
|
||||
use_signal,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
):
|
||||
"""Verify that closing a stream explicitly and killing the actor's
|
||||
"root nursery" **before** the containing nursery tears down also
|
||||
|
@ -339,7 +361,7 @@ def test_close_channel_explicit(
|
|||
trio.run(
|
||||
partial(
|
||||
close_chans_before_nursery,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
use_signal,
|
||||
remote_arbiter=False,
|
||||
),
|
||||
|
@ -351,7 +373,7 @@ def test_close_channel_explicit_remote_arbiter(
|
|||
daemon,
|
||||
start_method,
|
||||
use_signal,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
):
|
||||
"""Verify that closing a stream explicitly and killing the actor's
|
||||
"root nursery" **before** the containing nursery tears down also
|
||||
|
@ -361,7 +383,7 @@ def test_close_channel_explicit_remote_arbiter(
|
|||
trio.run(
|
||||
partial(
|
||||
close_chans_before_nursery,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
use_signal,
|
||||
remote_arbiter=True,
|
||||
),
|
||||
|
|
|
@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task():
|
|||
await tractor.to_asyncio.run_task(sleep_forever)
|
||||
|
||||
|
||||
def test_trio_cancels_aio_on_actor_side(arb_addr):
|
||||
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||
'''
|
||||
Spawn an infected actor that is cancelled by the ``trio`` side
|
||||
task using std cancel scope apis.
|
||||
|
@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(arb_addr):
|
|||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr
|
||||
registry_addrs=[reg_addr]
|
||||
) as n:
|
||||
await n.run_in_actor(
|
||||
trio_cancels_single_aio_task,
|
||||
|
@ -94,7 +94,7 @@ async def asyncio_actor(
|
|||
raise
|
||||
|
||||
|
||||
def test_aio_simple_error(arb_addr):
|
||||
def test_aio_simple_error(reg_addr):
|
||||
'''
|
||||
Verify a simple remote asyncio error propagates back through trio
|
||||
to the parent actor.
|
||||
|
@ -103,7 +103,7 @@ def test_aio_simple_error(arb_addr):
|
|||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr
|
||||
registry_addrs=[reg_addr]
|
||||
) as n:
|
||||
await n.run_in_actor(
|
||||
asyncio_actor,
|
||||
|
@ -120,7 +120,7 @@ def test_aio_simple_error(arb_addr):
|
|||
assert err.type == AssertionError
|
||||
|
||||
|
||||
def test_tractor_cancels_aio(arb_addr):
|
||||
def test_tractor_cancels_aio(reg_addr):
|
||||
'''
|
||||
Verify we can cancel a spawned asyncio task gracefully.
|
||||
|
||||
|
@ -139,7 +139,7 @@ def test_tractor_cancels_aio(arb_addr):
|
|||
trio.run(main)
|
||||
|
||||
|
||||
def test_trio_cancels_aio(arb_addr):
|
||||
def test_trio_cancels_aio(reg_addr):
|
||||
'''
|
||||
Much like the above test with ``tractor.Portal.cancel_actor()``
|
||||
except we just use a standard ``trio`` cancellation api.
|
||||
|
@ -194,7 +194,7 @@ async def trio_ctx(
|
|||
ids='parent_actor_cancels_child={}'.format
|
||||
)
|
||||
def test_context_spawns_aio_task_that_errors(
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
parent_cancels: bool,
|
||||
):
|
||||
'''
|
||||
|
@ -225,7 +225,7 @@ def test_context_spawns_aio_task_that_errors(
|
|||
|
||||
await trio.sleep_forever()
|
||||
|
||||
return await ctx.result()
|
||||
return await ctx.result()
|
||||
|
||||
if parent_cancels:
|
||||
# bc the parent made the cancel request,
|
||||
|
@ -258,7 +258,7 @@ async def aio_cancel():
|
|||
await sleep_forever()
|
||||
|
||||
|
||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
|
||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
|
@ -395,7 +395,7 @@ async def stream_from_aio(
|
|||
'fan_out', [False, True],
|
||||
ids='fan_out_w_chan_subscribe={}'.format
|
||||
)
|
||||
def test_basic_interloop_channel_stream(arb_addr, fan_out):
|
||||
def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
|
@ -409,7 +409,7 @@ def test_basic_interloop_channel_stream(arb_addr, fan_out):
|
|||
|
||||
|
||||
# TODO: parametrize the above test and avoid the duplication here?
|
||||
def test_trio_error_cancels_intertask_chan(arb_addr):
|
||||
def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
|
@ -428,7 +428,7 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
|
|||
assert exc.type == Exception
|
||||
|
||||
|
||||
def test_trio_closes_early_and_channel_exits(arb_addr):
|
||||
def test_trio_closes_early_and_channel_exits(reg_addr):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
|
@ -443,7 +443,7 @@ def test_trio_closes_early_and_channel_exits(arb_addr):
|
|||
trio.run(main)
|
||||
|
||||
|
||||
def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
|
||||
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
|
@ -520,7 +520,7 @@ async def trio_to_aio_echo_server(
|
|||
ids='raise_error={}'.format,
|
||||
)
|
||||
def test_echoserver_detailed_mechanics(
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
raise_error_mid_stream,
|
||||
):
|
||||
|
||||
|
|
|
@ -15,6 +15,26 @@ from tractor import ( # typing
|
|||
ContextCancelled,
|
||||
)
|
||||
|
||||
# XXX TODO cases:
|
||||
# - [ ] peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# - [x] WE cancelled the peer and thus should not see any raised
|
||||
# `ContextCancelled` as it should be reaped silently?
|
||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||
# already covers this case?
|
||||
|
||||
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
||||
# Portal.cancel_actor().
|
||||
# => all other connected peers should get that cancel requesting peer's
|
||||
# uid in the ctx-cancelled error msg raised in all open ctxs
|
||||
# with that peer.
|
||||
|
||||
# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which
|
||||
# (also) spawned a failing task which was unhandled and
|
||||
# propagated up to the immediate parent - the peer to the actor
|
||||
# that also spawned a remote task task in that same peer-parent.
|
||||
|
||||
|
||||
# def test_self_cancel():
|
||||
# '''
|
||||
|
@ -29,14 +49,30 @@ from tractor import ( # typing
|
|||
@tractor.context
|
||||
async def sleep_forever(
|
||||
ctx: Context,
|
||||
expect_ctxc: bool = False,
|
||||
) -> None:
|
||||
'''
|
||||
Sync the context, open a stream then just sleep.
|
||||
|
||||
Allow checking for (context) cancellation locally.
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
async with ctx.open_stream():
|
||||
await trio.sleep_forever()
|
||||
try:
|
||||
await ctx.started()
|
||||
async with ctx.open_stream():
|
||||
await trio.sleep_forever()
|
||||
|
||||
except BaseException as berr:
|
||||
|
||||
# TODO: it'd sure be nice to be able to inject our own
|
||||
# `ContextCancelled` here instead of of `trio.Cancelled`
|
||||
# so that our runtime can expect it and this "user code"
|
||||
# would be able to tell the diff between a generic trio
|
||||
# cancel and a tractor runtime-IPC cancel.
|
||||
if expect_ctxc:
|
||||
assert isinstance(berr, trio.Cancelled)
|
||||
|
||||
raise
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -145,6 +181,7 @@ async def stream_ints(
|
|||
async with ctx.open_stream() as stream:
|
||||
for i in itertools.count():
|
||||
await stream.send(i)
|
||||
await trio.sleep(0.01)
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -161,73 +198,81 @@ async def stream_from_peer(
|
|||
peer_ctx.open_stream() as stream,
|
||||
):
|
||||
await ctx.started()
|
||||
# XXX TODO: big set of questions for this
|
||||
# XXX QUESTIONS & TODO: for further details around this
|
||||
# in the longer run..
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
# - should we raise `ContextCancelled` or `Cancelled` (rn
|
||||
# it does that) here?!
|
||||
# - test the `ContextCancelled` OUTSIDE the
|
||||
# `.open_context()` call?
|
||||
try:
|
||||
async for msg in stream:
|
||||
print(msg)
|
||||
|
||||
except trio.Cancelled:
|
||||
assert not ctx.cancel_called
|
||||
assert not ctx.cancelled_caught
|
||||
|
||||
assert not peer_ctx.cancel_called
|
||||
assert not peer_ctx.cancelled_caught
|
||||
|
||||
assert 'root' in ctx.cancel_called_remote
|
||||
|
||||
raise # XXX MUST NEVER MASK IT!!
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
await tractor.pause()
|
||||
# pass
|
||||
# pytest.fail(
|
||||
raise RuntimeError(
|
||||
'peer never triggered local `[Context]Cancelled`?!?'
|
||||
)
|
||||
# it does latter) and should/could it be implemented
|
||||
# as a general injection override for `trio` such
|
||||
# that ANY next checkpoint would raise the "cancel
|
||||
# error type" of choice?
|
||||
# - should the `ContextCancelled` bubble from
|
||||
# all `Context` and `MsgStream` apis wherein it
|
||||
# prolly makes the most sense to make it
|
||||
# a `trio.Cancelled` subtype?
|
||||
# - what about IPC-transport specific errors, should
|
||||
# they bubble from the async for and trigger
|
||||
# other special cases?
|
||||
# NOTE: current ctl flow:
|
||||
# - stream raises `trio.EndOfChannel` and
|
||||
# exits the loop
|
||||
# - `.open_context()` will raise the ctxcanc
|
||||
# received from the sleeper.
|
||||
async for msg in stream:
|
||||
assert msg is not None
|
||||
print(msg)
|
||||
|
||||
# NOTE: cancellation of the (sleeper) peer should always
|
||||
# cause a `ContextCancelled` raise in this streaming
|
||||
# actor.
|
||||
except ContextCancelled as ctxerr:
|
||||
assert ctxerr.canceller == 'canceller'
|
||||
assert ctxerr._remote_error is ctxerr
|
||||
err = ctxerr
|
||||
assert peer_ctx._remote_error is ctxerr
|
||||
assert peer_ctx.canceller == ctxerr.canceller
|
||||
|
||||
# caller peer should not be the cancel requester
|
||||
assert not ctx.cancel_called
|
||||
# XXX can never be true since `._invoke` only
|
||||
# sets this AFTER the nursery block this task
|
||||
# was started in, exits.
|
||||
assert not ctx.cancelled_caught
|
||||
|
||||
# we never requested cancellation
|
||||
assert not peer_ctx.cancel_called
|
||||
# the `.open_context()` exit definitely caught
|
||||
# a cancellation in the internal `Context._scope` since
|
||||
# likely the runtime called `_deliver_msg()` after
|
||||
# receiving the remote error from the streaming task.
|
||||
assert peer_ctx.cancelled_caught
|
||||
|
||||
# TODO / NOTE `.canceller` won't have been set yet
|
||||
# here because that machinery is inside
|
||||
# `.open_context().__aexit__()` BUT, if we had
|
||||
# a way to know immediately (from the last
|
||||
# checkpoint) that cancellation was due to
|
||||
# a remote, we COULD assert this here..see,
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
|
||||
# root/parent actor task should NEVER HAVE cancelled us!
|
||||
assert not ctx.canceller
|
||||
assert 'canceller' in peer_ctx.canceller
|
||||
|
||||
# CASE 1: we were cancelled by our parent, the root actor.
|
||||
# TODO: there are other cases depending on how the root
|
||||
# actor and it's caller side task are written:
|
||||
# - if the root does not req us to cancel then an
|
||||
# IPC-transport related error should bubble from the async
|
||||
# for loop and thus cause local cancellation both here
|
||||
# and in the root (since in that case this task cancels the
|
||||
# context with the root, not the other way around)
|
||||
assert ctx.cancel_called_remote[0] == 'root'
|
||||
raise
|
||||
# TODO: IN THEORY we could have other cases depending on
|
||||
# who cancels first, the root actor or the canceller peer?.
|
||||
#
|
||||
# 1- when the peer request is first then the `.canceller`
|
||||
# field should obvi be set to the 'canceller' uid,
|
||||
#
|
||||
# 2-if the root DOES req cancel then we should see the same
|
||||
# `trio.Cancelled` implicitly raised
|
||||
# assert ctx.canceller[0] == 'root'
|
||||
# assert peer_ctx.canceller[0] == 'sleeper'
|
||||
|
||||
# except BaseException as err:
|
||||
raise RuntimeError(
|
||||
'peer never triggered local `ContextCancelled`?'
|
||||
)
|
||||
|
||||
# raise
|
||||
|
||||
# cases:
|
||||
# - some arbitrary remote peer cancels via Portal.cancel_actor().
|
||||
# => all other connected peers should get that cancel requesting peer's
|
||||
# uid in the ctx-cancelled error msg.
|
||||
|
||||
# - peer spawned a sub-actor which (also) spawned a failing task
|
||||
# which was unhandled and propagated up to the immediate
|
||||
# parent, the peer to the actor that also spawned a remote task
|
||||
# task in that same peer-parent.
|
||||
|
||||
# - peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# - WE cancelled the peer and thus should not see any raised
|
||||
# `ContextCancelled` as it should be reaped silently?
|
||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||
# already covers this case?
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'error_during_ctxerr_handling',
|
||||
|
@ -251,8 +296,8 @@ def test_peer_canceller(
|
|||
line and be less indented.
|
||||
|
||||
.actor0> ()-> .actor1>
|
||||
a inter-actor task context opened (by `async with `Portal.open_context()`)
|
||||
from actor0 *into* actor1.
|
||||
a inter-actor task context opened (by `async with
|
||||
`Portal.open_context()`) from actor0 *into* actor1.
|
||||
|
||||
.actor0> ()<=> .actor1>
|
||||
a inter-actor task context opened (as above)
|
||||
|
@ -287,11 +332,12 @@ def test_peer_canceller(
|
|||
5. .canceller> ()-> .sleeper>
|
||||
- calls `Portal.cancel_actor()`
|
||||
|
||||
|
||||
'''
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as an:
|
||||
async with tractor.open_nursery(
|
||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||
# debug_mode=True
|
||||
) as an:
|
||||
canceller: Portal = await an.start_actor(
|
||||
'canceller',
|
||||
enable_modules=[__name__],
|
||||
|
@ -305,10 +351,13 @@ def test_peer_canceller(
|
|||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
root = tractor.current_actor()
|
||||
|
||||
try:
|
||||
async with (
|
||||
sleeper.open_context(
|
||||
sleep_forever,
|
||||
expect_ctxc=True,
|
||||
) as (sleeper_ctx, sent),
|
||||
|
||||
just_caller.open_context(
|
||||
|
@ -335,16 +384,15 @@ def test_peer_canceller(
|
|||
'Context.result() did not raise ctx-cancelled?'
|
||||
)
|
||||
|
||||
# TODO: not sure why this isn't catching
|
||||
# but maybe we need an `ExceptionGroup` and
|
||||
# the whole except *errs: thinger in 3.11?
|
||||
# should always raise since this root task does
|
||||
# not request the sleeper cancellation ;)
|
||||
except ContextCancelled as ctxerr:
|
||||
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
||||
|
||||
# canceller and caller peers should not
|
||||
# have been remotely cancelled.
|
||||
assert canceller_ctx.cancel_called_remote is None
|
||||
assert caller_ctx.cancel_called_remote is None
|
||||
assert canceller_ctx.canceller is None
|
||||
assert caller_ctx.canceller is None
|
||||
|
||||
assert ctxerr.canceller[0] == 'canceller'
|
||||
|
||||
|
@ -355,16 +403,14 @@ def test_peer_canceller(
|
|||
# block it should be.
|
||||
assert not sleeper_ctx.cancelled_caught
|
||||
|
||||
# TODO: a test which ensures this error is
|
||||
# bubbled and caught (NOT MASKED) by the
|
||||
# runtime!!!
|
||||
if error_during_ctxerr_handling:
|
||||
raise RuntimeError('Simulated error during teardown')
|
||||
|
||||
raise
|
||||
|
||||
# SHOULD NEVER GET HERE!
|
||||
except BaseException:
|
||||
# XXX SHOULD NEVER EVER GET HERE XXX
|
||||
except BaseException as berr:
|
||||
err = berr
|
||||
pytest.fail('did not rx ctx-cancelled error?')
|
||||
else:
|
||||
pytest.fail('did not rx ctx-cancelled error?')
|
||||
|
@ -375,6 +421,20 @@ def test_peer_canceller(
|
|||
)as ctxerr:
|
||||
_err = ctxerr
|
||||
|
||||
# NOTE: the main state to check on `Context` is:
|
||||
# - `.cancelled_caught` (maps to nursery cs)
|
||||
# - `.cancel_called` (bool of whether this side
|
||||
# requested)
|
||||
# - `.canceller` (uid of cancel-causing actor-task)
|
||||
# - `._remote_error` (any `RemoteActorError`
|
||||
# instance from other side of context)
|
||||
# TODO: are we really planning to use this tho?
|
||||
# - `._cancel_msg` (any msg that caused the
|
||||
# cancel)
|
||||
|
||||
# CASE: error raised during handling of
|
||||
# `ContextCancelled` inside `.open_context()`
|
||||
# block
|
||||
if error_during_ctxerr_handling:
|
||||
assert isinstance(ctxerr, RuntimeError)
|
||||
|
||||
|
@ -384,20 +444,42 @@ def test_peer_canceller(
|
|||
for ctx in ctxs:
|
||||
assert ctx.cancel_called
|
||||
|
||||
# each context should have received
|
||||
# a silently absorbed context cancellation
|
||||
# from its peer actor's task.
|
||||
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||
|
||||
# this root actor task should have
|
||||
# cancelled all opened contexts except
|
||||
# the sleeper which is cancelled by its
|
||||
# peer "canceller"
|
||||
if ctx is not sleeper_ctx:
|
||||
assert ctx._remote_error.canceller[0] == 'root'
|
||||
# cancelled all opened contexts except the
|
||||
# sleeper which is obvi by the "canceller"
|
||||
# peer.
|
||||
re = ctx._remote_error
|
||||
if (
|
||||
ctx is sleeper_ctx
|
||||
or ctx is caller_ctx
|
||||
):
|
||||
assert (
|
||||
re.canceller
|
||||
==
|
||||
ctx.canceller
|
||||
==
|
||||
canceller.channel.uid
|
||||
)
|
||||
|
||||
else:
|
||||
assert (
|
||||
re.canceller
|
||||
==
|
||||
ctx.canceller
|
||||
==
|
||||
root.uid
|
||||
)
|
||||
|
||||
# CASE: standard teardown inside in `.open_context()` block
|
||||
else:
|
||||
assert ctxerr.canceller[0] == 'canceller'
|
||||
assert ctxerr.canceller == sleeper_ctx.canceller
|
||||
assert (
|
||||
ctxerr.canceller[0]
|
||||
==
|
||||
sleeper_ctx.canceller[0]
|
||||
==
|
||||
'canceller'
|
||||
)
|
||||
|
||||
# the sleeper's remote error is the error bubbled
|
||||
# out of the context-stack above!
|
||||
|
@ -405,18 +487,43 @@ def test_peer_canceller(
|
|||
assert re is ctxerr
|
||||
|
||||
for ctx in ctxs:
|
||||
re: BaseException | None = ctx._remote_error
|
||||
assert re
|
||||
|
||||
# root doesn't cancel sleeper since it's
|
||||
# cancelled by its peer.
|
||||
if ctx is sleeper_ctx:
|
||||
assert not ctx.cancel_called
|
||||
# since sleeper_ctx.result() IS called
|
||||
# above we should have (silently)
|
||||
# absorbed the corresponding
|
||||
# `ContextCancelled` for it and thus
|
||||
# the logic inside `.cancelled_caught`
|
||||
# should trigger!
|
||||
assert ctx.cancelled_caught
|
||||
|
||||
elif ctx is caller_ctx:
|
||||
# since its context was remotely
|
||||
# cancelled, we never needed to
|
||||
# call `Context.cancel()` bc it was
|
||||
# done by the peer and also we never
|
||||
assert ctx.cancel_called
|
||||
|
||||
# TODO: figure out the details of
|
||||
# this..
|
||||
# if you look the `._local_error` here
|
||||
# is a multi of ctxc + 2 Cancelleds?
|
||||
# assert not ctx.cancelled_caught
|
||||
|
||||
else:
|
||||
assert ctx.cancel_called
|
||||
assert not ctx.cancelled_caught
|
||||
|
||||
# each context should have received
|
||||
# TODO: do we even need this flag?
|
||||
# -> each context should have received
|
||||
# a silently absorbed context cancellation
|
||||
# from its peer actor's task.
|
||||
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||
# in its remote nursery scope.
|
||||
# assert ctx.chan.uid == ctx.canceller
|
||||
|
||||
# NOTE: when an inter-peer cancellation
|
||||
# occurred, we DO NOT expect this
|
||||
|
@ -434,9 +541,7 @@ def test_peer_canceller(
|
|||
# including the case where ctx-cancel handling
|
||||
# itself errors.
|
||||
assert sleeper_ctx.cancelled_caught
|
||||
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
|
||||
|
||||
# await tractor.pause()
|
||||
raise # always to ensure teardown
|
||||
|
||||
if error_during_ctxerr_handling:
|
||||
|
|
|
@ -55,7 +55,7 @@ async def context_stream(
|
|||
|
||||
|
||||
async def stream_from_single_subactor(
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
stream_func,
|
||||
):
|
||||
|
@ -64,7 +64,7 @@ async def stream_from_single_subactor(
|
|||
# only one per host address, spawns an actor if None
|
||||
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
start_method=start_method,
|
||||
) as nursery:
|
||||
|
||||
|
@ -115,13 +115,13 @@ async def stream_from_single_subactor(
|
|||
@pytest.mark.parametrize(
|
||||
'stream_func', [async_gen_stream, context_stream]
|
||||
)
|
||||
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
|
||||
def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
|
||||
"""Verify streaming from a spawned async generator.
|
||||
"""
|
||||
trio.run(
|
||||
partial(
|
||||
stream_from_single_subactor,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
stream_func=stream_func,
|
||||
),
|
||||
|
@ -225,14 +225,14 @@ async def a_quadruple_example():
|
|||
return result_stream
|
||||
|
||||
|
||||
async def cancel_after(wait, arb_addr):
|
||||
async with tractor.open_root_actor(arbiter_addr=arb_addr):
|
||||
async def cancel_after(wait, reg_addr):
|
||||
async with tractor.open_root_actor(registry_addrs=[reg_addr]):
|
||||
with trio.move_on_after(wait):
|
||||
return await a_quadruple_example()
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def time_quad_ex(arb_addr, ci_env, spawn_backend):
|
||||
def time_quad_ex(reg_addr, ci_env, spawn_backend):
|
||||
if spawn_backend == 'mp':
|
||||
"""no idea but the mp *nix runs are flaking out here often...
|
||||
"""
|
||||
|
@ -240,7 +240,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend):
|
|||
|
||||
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
|
||||
start = time.time()
|
||||
results = trio.run(cancel_after, timeout, arb_addr)
|
||||
results = trio.run(cancel_after, timeout, reg_addr)
|
||||
diff = time.time() - start
|
||||
assert results
|
||||
return results, diff
|
||||
|
@ -260,14 +260,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
|
|||
list(map(lambda i: i/10, range(3, 9)))
|
||||
)
|
||||
def test_not_fast_enough_quad(
|
||||
arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
|
||||
reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
|
||||
):
|
||||
"""Verify we can cancel midway through the quad example and all actors
|
||||
cancel gracefully.
|
||||
"""
|
||||
results, diff = time_quad_ex
|
||||
delay = max(diff - cancel_delay, 0)
|
||||
results = trio.run(cancel_after, delay, arb_addr)
|
||||
results = trio.run(cancel_after, delay, reg_addr)
|
||||
system = platform.system()
|
||||
if system in ('Windows', 'Darwin') and results is not None:
|
||||
# In CI envoirments it seems later runs are quicker then the first
|
||||
|
@ -280,7 +280,7 @@ def test_not_fast_enough_quad(
|
|||
|
||||
@tractor_test
|
||||
async def test_respawn_consumer_task(
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
spawn_backend,
|
||||
loglevel,
|
||||
):
|
||||
|
|
|
@ -24,7 +24,7 @@ async def test_no_runtime():
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_self_is_registered(arb_addr):
|
||||
async def test_self_is_registered(reg_addr):
|
||||
"Verify waiting on the arbiter to register itself using the standard api."
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
|
@ -34,20 +34,20 @@ async def test_self_is_registered(arb_addr):
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_self_is_registered_localportal(arb_addr):
|
||||
async def test_self_is_registered_localportal(reg_addr):
|
||||
"Verify waiting on the arbiter to register itself using a local portal."
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
async with tractor.get_arbiter(*arb_addr) as portal:
|
||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
||||
assert isinstance(portal, tractor._portal.LocalPortal)
|
||||
|
||||
with trio.fail_after(0.2):
|
||||
sockaddr = await portal.run_from_ns(
|
||||
'self', 'wait_for_actor', name='root')
|
||||
assert sockaddr[0] == arb_addr
|
||||
assert sockaddr[0] == reg_addr
|
||||
|
||||
|
||||
def test_local_actor_async_func(arb_addr):
|
||||
def test_local_actor_async_func(reg_addr):
|
||||
"""Verify a simple async function in-process.
|
||||
"""
|
||||
nums = []
|
||||
|
@ -55,7 +55,7 @@ def test_local_actor_async_func(arb_addr):
|
|||
async def print_loop():
|
||||
|
||||
async with tractor.open_root_actor(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
):
|
||||
# arbiter is started in-proc if dne
|
||||
assert tractor.current_actor().is_arbiter
|
||||
|
|
|
@ -28,9 +28,9 @@ def test_abort_on_sigint(daemon):
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_cancel_remote_arbiter(daemon, arb_addr):
|
||||
async def test_cancel_remote_arbiter(daemon, reg_addr):
|
||||
assert not tractor.current_actor().is_arbiter
|
||||
async with tractor.get_arbiter(*arb_addr) as portal:
|
||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
||||
await portal.cancel_actor()
|
||||
|
||||
time.sleep(0.1)
|
||||
|
@ -39,16 +39,16 @@ async def test_cancel_remote_arbiter(daemon, arb_addr):
|
|||
|
||||
# no arbiter socket should exist
|
||||
with pytest.raises(OSError):
|
||||
async with tractor.get_arbiter(*arb_addr) as portal:
|
||||
async with tractor.get_arbiter(*reg_addr) as portal:
|
||||
pass
|
||||
|
||||
|
||||
def test_register_duplicate_name(daemon, arb_addr):
|
||||
def test_register_duplicate_name(daemon, reg_addr):
|
||||
|
||||
async def main():
|
||||
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
) as n:
|
||||
|
||||
assert not tractor.current_actor().is_arbiter
|
||||
|
|
|
@ -160,7 +160,7 @@ async def test_required_args(callwith_expecterror):
|
|||
)
|
||||
def test_multi_actor_subs_arbiter_pub(
|
||||
loglevel,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
pub_actor,
|
||||
):
|
||||
"""Try out the neato @pub decorator system.
|
||||
|
@ -170,7 +170,7 @@ def test_multi_actor_subs_arbiter_pub(
|
|||
async def main():
|
||||
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
enable_modules=[__name__],
|
||||
) as n:
|
||||
|
||||
|
@ -255,12 +255,12 @@ def test_multi_actor_subs_arbiter_pub(
|
|||
|
||||
def test_single_subactor_pub_multitask_subs(
|
||||
loglevel,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
):
|
||||
async def main():
|
||||
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
registry_addrs=[reg_addr],
|
||||
enable_modules=[__name__],
|
||||
) as n:
|
||||
|
||||
|
|
|
@ -34,7 +34,6 @@ def test_resource_only_entered_once(key_on):
|
|||
global _resource
|
||||
_resource = 0
|
||||
|
||||
kwargs = {}
|
||||
key = None
|
||||
if key_on == 'key_value':
|
||||
key = 'some_common_key'
|
||||
|
@ -139,7 +138,7 @@ def test_open_local_sub_to_stream():
|
|||
N local tasks using ``trionics.maybe_open_context():``.
|
||||
|
||||
'''
|
||||
timeout = 3 if platform.system() != "Windows" else 10
|
||||
timeout: float = 3.6 if platform.system() != "Windows" else 10
|
||||
|
||||
async def main():
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ async def short_sleep():
|
|||
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
|
||||
'fail_on_syntax'],
|
||||
)
|
||||
def test_rpc_errors(arb_addr, to_call, testdir):
|
||||
def test_rpc_errors(reg_addr, to_call, testdir):
|
||||
"""Test errors when making various RPC requests to an actor
|
||||
that either doesn't have the requested module exposed or doesn't define
|
||||
the named function.
|
||||
|
@ -77,7 +77,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
|
|||
|
||||
# spawn a subactor which calls us back
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
arbiter_addr=reg_addr,
|
||||
enable_modules=exposed_mods.copy(),
|
||||
) as n:
|
||||
|
||||
|
|
|
@ -16,14 +16,14 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4}
|
|||
async def spawn(
|
||||
is_arbiter: bool,
|
||||
data: dict,
|
||||
arb_addr: tuple[str, int],
|
||||
reg_addr: tuple[str, int],
|
||||
):
|
||||
namespaces = [__name__]
|
||||
|
||||
await trio.sleep(0.1)
|
||||
|
||||
async with tractor.open_root_actor(
|
||||
arbiter_addr=arb_addr,
|
||||
arbiter_addr=reg_addr,
|
||||
):
|
||||
|
||||
actor = tractor.current_actor()
|
||||
|
@ -41,7 +41,7 @@ async def spawn(
|
|||
is_arbiter=False,
|
||||
name='sub-actor',
|
||||
data=data,
|
||||
arb_addr=arb_addr,
|
||||
reg_addr=reg_addr,
|
||||
enable_modules=namespaces,
|
||||
)
|
||||
|
||||
|
@ -55,12 +55,12 @@ async def spawn(
|
|||
return 10
|
||||
|
||||
|
||||
def test_local_arbiter_subactor_global_state(arb_addr):
|
||||
def test_local_arbiter_subactor_global_state(reg_addr):
|
||||
result = trio.run(
|
||||
spawn,
|
||||
True,
|
||||
data_to_pass_down,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
)
|
||||
assert result == 10
|
||||
|
||||
|
@ -140,7 +140,7 @@ async def check_loglevel(level):
|
|||
def test_loglevel_propagated_to_subactor(
|
||||
start_method,
|
||||
capfd,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
):
|
||||
if start_method == 'mp_forkserver':
|
||||
pytest.skip(
|
||||
|
@ -152,7 +152,7 @@ def test_loglevel_propagated_to_subactor(
|
|||
async with tractor.open_nursery(
|
||||
name='arbiter',
|
||||
start_method=start_method,
|
||||
arbiter_addr=arb_addr,
|
||||
arbiter_addr=reg_addr,
|
||||
|
||||
) as tn:
|
||||
await tn.run_in_actor(
|
||||
|
|
|
@ -66,13 +66,13 @@ async def ensure_sequence(
|
|||
async def open_sequence_streamer(
|
||||
|
||||
sequence: list[int],
|
||||
arb_addr: tuple[str, int],
|
||||
reg_addr: tuple[str, int],
|
||||
start_method: str,
|
||||
|
||||
) -> tractor.MsgStream:
|
||||
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=arb_addr,
|
||||
arbiter_addr=reg_addr,
|
||||
start_method=start_method,
|
||||
) as tn:
|
||||
|
||||
|
@ -93,7 +93,7 @@ async def open_sequence_streamer(
|
|||
|
||||
|
||||
def test_stream_fan_out_to_local_subscriptions(
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
):
|
||||
|
||||
|
@ -103,7 +103,7 @@ def test_stream_fan_out_to_local_subscriptions(
|
|||
|
||||
async with open_sequence_streamer(
|
||||
sequence,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
) as stream:
|
||||
|
||||
|
@ -138,7 +138,7 @@ def test_stream_fan_out_to_local_subscriptions(
|
|||
]
|
||||
)
|
||||
def test_consumer_and_parent_maybe_lag(
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
task_delays,
|
||||
):
|
||||
|
@ -150,7 +150,7 @@ def test_consumer_and_parent_maybe_lag(
|
|||
|
||||
async with open_sequence_streamer(
|
||||
sequence,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
) as stream:
|
||||
|
||||
|
@ -211,7 +211,7 @@ def test_consumer_and_parent_maybe_lag(
|
|||
|
||||
|
||||
def test_faster_task_to_recv_is_cancelled_by_slower(
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
):
|
||||
'''
|
||||
|
@ -225,7 +225,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
|
|||
|
||||
async with open_sequence_streamer(
|
||||
sequence,
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
|
||||
) as stream:
|
||||
|
@ -302,7 +302,7 @@ def test_subscribe_errors_after_close():
|
|||
|
||||
|
||||
def test_ensure_slow_consumers_lag_out(
|
||||
arb_addr,
|
||||
reg_addr,
|
||||
start_method,
|
||||
):
|
||||
'''This is a pure local task test; no tractor
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
This is the "bootloader" for actors started using the native trio backend.
|
||||
|
||||
"""
|
||||
import sys
|
||||
import trio
|
||||
import argparse
|
||||
|
||||
from ast import literal_eval
|
||||
|
@ -37,8 +35,6 @@ def parse_ipaddr(arg):
|
|||
return (str(host), int(port))
|
||||
|
||||
|
||||
from ._entry import _trio_main
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
|
|
|
@ -44,9 +44,11 @@ import warnings
|
|||
import trio
|
||||
|
||||
from ._exceptions import (
|
||||
# _raise_from_no_key_in_msg,
|
||||
unpack_error,
|
||||
pack_error,
|
||||
ContextCancelled,
|
||||
# MessagingError,
|
||||
StreamOverrun,
|
||||
)
|
||||
from .log import get_logger
|
||||
|
@ -56,28 +58,36 @@ from ._state import current_actor
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from ._portal import Portal
|
||||
from ._runtime import Actor
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
# TODO: make this a msgspec.Struct!
|
||||
@dataclass
|
||||
class Context:
|
||||
'''
|
||||
An inter-actor, ``trio``-task communication context.
|
||||
An inter-actor, SC transitive, `trio.Task` communication context.
|
||||
|
||||
NB: This class should never be instatiated directly, it is delivered
|
||||
by either,
|
||||
- runtime machinery to a remotely started task or,
|
||||
- by entering ``Portal.open_context()``.
|
||||
NB: This class should **never be instatiated directly**, it is allocated
|
||||
by the runtime in 2 ways:
|
||||
- by entering ``Portal.open_context()`` which is the primary
|
||||
public API for any "caller" task or,
|
||||
- by the RPC machinery's `._runtime._invoke()` as a `ctx` arg
|
||||
to a remotely scheduled "callee" function.
|
||||
|
||||
and is always constructed using ``mkt_context()``.
|
||||
AND is always constructed using the below ``mk_context()``.
|
||||
|
||||
Allows maintaining task or protocol specific state between
|
||||
2 communicating, parallel executing actor tasks. A unique context is
|
||||
allocated on each side of any task RPC-linked msg dialog, for
|
||||
every request to a remote actor from a portal. On the "callee"
|
||||
side a context is always allocated inside ``._runtime._invoke()``.
|
||||
2 cancel-scope-linked, communicating and parallel executing
|
||||
`trio.Task`s. Contexts are allocated on each side of any task
|
||||
RPC-linked msg dialog, i.e. for every request to a remote
|
||||
actor from a `Portal`. On the "callee" side a context is
|
||||
always allocated inside ``._runtime._invoke()``.
|
||||
|
||||
# TODO: more detailed writeup on cancellation, error and
|
||||
# streaming semantics..
|
||||
|
||||
A context can be cancelled and (possibly eventually restarted) from
|
||||
either side of the underlying IPC channel, it can also open task
|
||||
|
@ -108,12 +118,31 @@ class Context:
|
|||
# which is exactly the primitive that allows for
|
||||
# cross-actor-task-supervision and thus SC.
|
||||
_scope: trio.CancelScope | None = None
|
||||
|
||||
# on a clean exit there should be a final value
|
||||
# delivered from the far end "callee" task, so
|
||||
# this value is only set on one side.
|
||||
_result: Any | int = None
|
||||
|
||||
# if the local "caller" task errors this
|
||||
# value is always set to the error that was
|
||||
# captured in the `Portal.open_context().__aexit__()`
|
||||
# teardown.
|
||||
_local_error: BaseException | None = None
|
||||
|
||||
# if the either side gets an error from the other
|
||||
# this value is set to that error unpacked from an
|
||||
# IPC msg.
|
||||
_remote_error: BaseException | None = None
|
||||
|
||||
# cancellation state
|
||||
# only set if the local task called `.cancel()`
|
||||
_cancel_called: bool = False # did WE cancel the far end?
|
||||
_cancelled_remote: tuple[str, str] | None = None
|
||||
|
||||
# TODO: do we even need this? we can assume that if we're
|
||||
# cancelled that the other side is as well, so maybe we should
|
||||
# instead just have a `.canceller` pulled from the
|
||||
# `ContextCancelled`?
|
||||
_canceller: tuple[str, str] | None = None
|
||||
|
||||
# NOTE: we try to ensure assignment of a "cancel msg" since
|
||||
# there's always going to be an "underlying reason" that any
|
||||
|
@ -145,23 +174,47 @@ class Context:
|
|||
return self._cancel_called
|
||||
|
||||
@property
|
||||
def cancel_called_remote(self) -> tuple[str, str] | None:
|
||||
def canceller(self) -> tuple[str, str] | None:
|
||||
'''
|
||||
``Actor.uid`` of the remote actor who's task was cancelled
|
||||
causing this side of the context to also be cancelled.
|
||||
``Actor.uid: tuple[str, str]`` of the (remote)
|
||||
actor-process who's task was cancelled thus causing this
|
||||
(side of the) context to also be cancelled.
|
||||
|
||||
'''
|
||||
remote_uid = self._cancelled_remote
|
||||
if remote_uid:
|
||||
return tuple(remote_uid)
|
||||
return self._canceller
|
||||
|
||||
@property
|
||||
def cancelled_caught(self) -> bool:
|
||||
return self._scope.cancelled_caught
|
||||
return (
|
||||
# the local scope was cancelled either by
|
||||
# remote error or self-request
|
||||
self._scope.cancelled_caught
|
||||
|
||||
# the local scope was never cancelled
|
||||
# and instead likely we received a remote side
|
||||
# cancellation that was raised inside `.result()`
|
||||
or (
|
||||
(se := self._local_error)
|
||||
and
|
||||
isinstance(se, ContextCancelled)
|
||||
and (
|
||||
se.canceller == self.canceller
|
||||
or
|
||||
se is self._remote_error
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def side(self) -> str:
|
||||
'''
|
||||
Return string indicating which task this instance is wrapping.
|
||||
|
||||
'''
|
||||
return 'caller' if self._portal else 'callee'
|
||||
|
||||
# init and streaming state
|
||||
_started_called: bool = False
|
||||
_started_received: bool = False
|
||||
_stream_opened: bool = False
|
||||
|
||||
# overrun handling machinery
|
||||
|
@ -196,7 +249,7 @@ class Context:
|
|||
async def send_stop(self) -> None:
|
||||
await self.chan.send({'stop': True, 'cid': self.cid})
|
||||
|
||||
async def _maybe_cancel_and_set_remote_error(
|
||||
def _maybe_cancel_and_set_remote_error(
|
||||
self,
|
||||
error: BaseException,
|
||||
|
||||
|
@ -269,16 +322,19 @@ class Context:
|
|||
# that error as the reason.
|
||||
self._remote_error: BaseException = error
|
||||
|
||||
# always record the remote actor's uid since its cancellation
|
||||
# state is directly linked to ours (the local one).
|
||||
self._cancelled_remote = self.chan.uid
|
||||
|
||||
if (
|
||||
isinstance(error, ContextCancelled)
|
||||
):
|
||||
# always record the cancelling actor's uid since its cancellation
|
||||
# state is linked and we want to know which process was
|
||||
# the cause / requester of the cancellation.
|
||||
self._canceller = error.canceller
|
||||
|
||||
log.cancel(
|
||||
'Remote task-context sucessfully cancelled for '
|
||||
f'{self.chan.uid}:{self.cid}'
|
||||
'Remote task-context was cancelled for '
|
||||
f'actor: {self.chan.uid}\n'
|
||||
f'task: {self.cid}\n'
|
||||
f'canceller: {error.canceller}\n'
|
||||
)
|
||||
|
||||
if self._cancel_called:
|
||||
|
@ -289,22 +345,37 @@ class Context:
|
|||
# and we **don't need to raise it** in local cancel
|
||||
# scope since it will potentially override a real error.
|
||||
return
|
||||
|
||||
else:
|
||||
log.error(
|
||||
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||
f'Remote context error,\n'
|
||||
f'remote actor: {self.chan.uid}\n'
|
||||
f'task: {self.cid}\n'
|
||||
f'{error}'
|
||||
)
|
||||
self._canceller = self.chan.uid
|
||||
|
||||
# TODO: tempted to **not** do this by-reraising in a
|
||||
# nursery and instead cancel a surrounding scope, detect
|
||||
# the cancellation, then lookup the error that was set?
|
||||
# YES! this is way better and simpler!
|
||||
if self._scope:
|
||||
cs: trio.CancelScope = self._scope
|
||||
if (
|
||||
cs
|
||||
and not cs.cancel_called
|
||||
and not cs.cancelled_caught
|
||||
):
|
||||
|
||||
# TODO: we can for sure drop this right?
|
||||
# from trio.testing import wait_all_tasks_blocked
|
||||
# await wait_all_tasks_blocked()
|
||||
# self._cancelled_remote = self.chan.uid
|
||||
|
||||
# TODO: it'd sure be handy to inject our own
|
||||
# `trio.Cancelled` subtype here ;)
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
self._scope.cancel()
|
||||
|
||||
# this REPL usage actually works here BD
|
||||
# NOTE: this REPL usage actually works here dawg! Bo
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
|
@ -320,13 +391,19 @@ class Context:
|
|||
Timeout quickly in an attempt to sidestep 2-generals...
|
||||
|
||||
'''
|
||||
side: str = 'caller' if self._portal else 'callee'
|
||||
side: str = self.side
|
||||
log.cancel(
|
||||
f'Cancelling {side} side of context to {self.chan.uid}'
|
||||
)
|
||||
|
||||
self._cancel_called: bool = True
|
||||
|
||||
# caller side who entered `Portal.open_context()`
|
||||
# NOTE: on the call side we never manually call
|
||||
# `._scope.cancel()` since we expect the eventual
|
||||
# `ContextCancelled` from the other side to trigger this
|
||||
# when the runtime finally receives it during teardown
|
||||
# (normally in `.result()` called from
|
||||
# `Portal.open_context().__aexit__()`)
|
||||
if side == 'caller':
|
||||
if not self._portal:
|
||||
raise RuntimeError(
|
||||
|
@ -349,7 +426,6 @@ class Context:
|
|||
'_cancel_task',
|
||||
cid=cid,
|
||||
)
|
||||
# print("EXITING CANCEL CALL")
|
||||
|
||||
if cs.cancelled_caught:
|
||||
# XXX: there's no way to know if the remote task was indeed
|
||||
|
@ -368,6 +444,9 @@ class Context:
|
|||
)
|
||||
|
||||
# callee side remote task
|
||||
# NOTE: on this side we ALWAYS cancel the local scope since
|
||||
# the caller expects a `ContextCancelled` to be sent from
|
||||
# `._runtime._invoke()` back to the other side.
|
||||
else:
|
||||
# TODO: should we have an explicit cancel message
|
||||
# or is relaying the local `trio.Cancelled` as an
|
||||
|
@ -403,7 +482,7 @@ class Context:
|
|||
``trio``'s cancellation system.
|
||||
|
||||
'''
|
||||
actor = current_actor()
|
||||
actor: Actor = current_actor()
|
||||
|
||||
# here we create a mem chan that corresponds to the
|
||||
# far end caller / callee.
|
||||
|
@ -413,12 +492,34 @@ class Context:
|
|||
# killed
|
||||
|
||||
if self._cancel_called:
|
||||
task = trio.lowlevel.current_task().name
|
||||
raise ContextCancelled(
|
||||
f'Context around {actor.uid[0]}:{task} was already cancelled!'
|
||||
|
||||
# XXX NOTE: ALWAYS RAISE any remote error here even if
|
||||
# it's an expected `ContextCancelled` due to a local
|
||||
# task having called `.cancel()`!
|
||||
#
|
||||
# WHY: we expect the error to always bubble up to the
|
||||
# surrounding `Portal.open_context()` call and be
|
||||
# absorbed there (silently) and we DO NOT want to
|
||||
# actually try to stream - a cancel msg was already
|
||||
# sent to the other side!
|
||||
if self._remote_error:
|
||||
raise self._remote_error
|
||||
|
||||
# XXX NOTE: if no `ContextCancelled` has been responded
|
||||
# back from the other side (yet), we raise a different
|
||||
# runtime error indicating that this task's usage of
|
||||
# `Context.cancel()` and then `.open_stream()` is WRONG!
|
||||
task: str = trio.lowlevel.current_task().name
|
||||
raise RuntimeError(
|
||||
'Stream opened after `Context.cancel()` called..?\n'
|
||||
f'task: {actor.uid[0]}:{task}\n'
|
||||
f'{self}'
|
||||
)
|
||||
|
||||
if not self._portal and not self._started_called:
|
||||
if (
|
||||
not self._portal
|
||||
and not self._started_called
|
||||
):
|
||||
raise RuntimeError(
|
||||
'Context.started()` must be called before opening a stream'
|
||||
)
|
||||
|
@ -434,7 +535,7 @@ class Context:
|
|||
msg_buffer_size=msg_buffer_size,
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
ctx._allow_overruns = allow_overruns
|
||||
ctx._allow_overruns: bool = allow_overruns
|
||||
assert ctx is self
|
||||
|
||||
# XXX: If the underlying channel feeder receive mem chan has
|
||||
|
@ -444,27 +545,32 @@ class Context:
|
|||
|
||||
if ctx._recv_chan._closed:
|
||||
raise trio.ClosedResourceError(
|
||||
'The underlying channel for this stream was already closed!?')
|
||||
'The underlying channel for this stream was already closed!?'
|
||||
)
|
||||
|
||||
async with MsgStream(
|
||||
ctx=self,
|
||||
rx_chan=ctx._recv_chan,
|
||||
) as stream:
|
||||
|
||||
# NOTE: we track all existing streams per portal for
|
||||
# the purposes of attempting graceful closes on runtime
|
||||
# cancel requests.
|
||||
if self._portal:
|
||||
self._portal._streams.add(stream)
|
||||
|
||||
try:
|
||||
self._stream_opened = True
|
||||
self._stream_opened: bool = True
|
||||
|
||||
# XXX: do we need this?
|
||||
# ensure we aren't cancelled before yielding the stream
|
||||
# await trio.lowlevel.checkpoint()
|
||||
yield stream
|
||||
|
||||
# NOTE: Make the stream "one-shot use". On exit, signal
|
||||
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
|
||||
# far end.
|
||||
# NOTE: Make the stream "one-shot use". On exit,
|
||||
# signal
|
||||
# ``trio.EndOfChannel``/``StopAsyncIteration`` to
|
||||
# the far end.
|
||||
await stream.aclose()
|
||||
|
||||
finally:
|
||||
|
@ -495,14 +601,22 @@ class Context:
|
|||
# whenever ``CancelScope.cancel()`` was called) and
|
||||
# instead silently reap the expected cancellation
|
||||
# "error"-msg.
|
||||
our_uid: tuple[str, str] = current_actor().uid
|
||||
if (
|
||||
isinstance(err, ContextCancelled)
|
||||
and (
|
||||
self._cancel_called
|
||||
or self.chan._cancel_called
|
||||
or tuple(err.canceller) == current_actor().uid
|
||||
or self.canceller == our_uid
|
||||
or tuple(err.canceller) == our_uid
|
||||
)
|
||||
):
|
||||
# NOTE: we set the local scope error to any "self
|
||||
# cancellation" error-response thus "absorbing"
|
||||
# the error silently B)
|
||||
if self._local_error is None:
|
||||
self._local_error = err
|
||||
|
||||
return err
|
||||
|
||||
# NOTE: currently we are masking underlying runtime errors
|
||||
|
@ -515,7 +629,7 @@ class Context:
|
|||
# runtime frames from the tb explicitly?
|
||||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||
# https://stackoverflow.com/a/24752607
|
||||
__tracebackhide__: bool = True
|
||||
# __tracebackhide__: bool = True
|
||||
raise err from None
|
||||
|
||||
async def result(self) -> Any | Exception:
|
||||
|
@ -544,7 +658,6 @@ class Context:
|
|||
of the remote cancellation.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
assert self._portal, "Context.result() can not be called from callee!"
|
||||
assert self._recv_chan
|
||||
|
||||
|
@ -607,13 +720,15 @@ class Context:
|
|||
"Received internal error at portal?"
|
||||
)
|
||||
|
||||
err = unpack_error(
|
||||
if err:= unpack_error(
|
||||
msg,
|
||||
self._portal.channel
|
||||
) # from msgerr
|
||||
): # from msgerr
|
||||
self._maybe_cancel_and_set_remote_error(err)
|
||||
self._maybe_raise_remote_err(err)
|
||||
|
||||
err = self._maybe_raise_remote_err(err)
|
||||
self._remote_error = err
|
||||
else:
|
||||
raise
|
||||
|
||||
if re := self._remote_error:
|
||||
return self._maybe_raise_remote_err(re)
|
||||
|
@ -724,13 +839,17 @@ class Context:
|
|||
f"Delivering {msg} from {uid} to caller {cid}"
|
||||
)
|
||||
|
||||
error = msg.get('error')
|
||||
if error := unpack_error(
|
||||
msg,
|
||||
self.chan,
|
||||
if (
|
||||
msg.get('error') # check for field
|
||||
and (
|
||||
error := unpack_error(
|
||||
msg,
|
||||
self.chan,
|
||||
)
|
||||
)
|
||||
):
|
||||
self._cancel_msg = msg
|
||||
await self._maybe_cancel_and_set_remote_error(error)
|
||||
self._maybe_cancel_and_set_remote_error(error)
|
||||
|
||||
if (
|
||||
self._in_overrun
|
||||
|
@ -765,7 +884,7 @@ class Context:
|
|||
|
||||
# XXX: always push an error even if the local
|
||||
# receiver is in overrun state.
|
||||
# await self._maybe_cancel_and_set_remote_error(msg)
|
||||
# self._maybe_cancel_and_set_remote_error(msg)
|
||||
|
||||
local_uid = current_actor().uid
|
||||
lines = [
|
||||
|
|
|
@ -14,15 +14,18 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
'''
|
||||
Our classy exception set.
|
||||
|
||||
"""
|
||||
'''
|
||||
from __future__ import annotations
|
||||
import builtins
|
||||
import importlib
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
import traceback
|
||||
|
||||
|
@ -31,6 +34,11 @@ import trio
|
|||
|
||||
from ._state import current_actor
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._context import Context
|
||||
from ._stream import MsgStream
|
||||
from .log import StackLevelAdapter
|
||||
|
||||
_this_mod = importlib.import_module(__name__)
|
||||
|
||||
|
||||
|
@ -38,12 +46,17 @@ class ActorFailure(Exception):
|
|||
"General actor failure"
|
||||
|
||||
|
||||
# TODO: rename to just `RemoteError`?
|
||||
class RemoteActorError(Exception):
|
||||
'''
|
||||
Remote actor exception bundled locally
|
||||
A box(ing) type which bundles a remote actor `BaseException` for
|
||||
(near identical, and only if possible,) local object/instance
|
||||
re-construction in the local process memory domain.
|
||||
|
||||
Normally each instance is expected to be constructed from
|
||||
a special "error" IPC msg sent by some remote actor-runtime.
|
||||
|
||||
'''
|
||||
# TODO: local recontruction of remote exception deats
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
|
@ -53,13 +66,36 @@ class RemoteActorError(Exception):
|
|||
) -> None:
|
||||
super().__init__(message)
|
||||
|
||||
self.type = suberror_type
|
||||
self.msgdata = msgdata
|
||||
# TODO: maybe a better name?
|
||||
# - .errtype
|
||||
# - .retype
|
||||
# - .boxed_errtype
|
||||
# - .boxed_type
|
||||
# - .remote_type
|
||||
# also pertains to our long long oustanding issue XD
|
||||
# https://github.com/goodboy/tractor/issues/5
|
||||
self.type: str = suberror_type
|
||||
self.msgdata: dict[str, Any] = msgdata
|
||||
|
||||
@property
|
||||
def src_actor_uid(self) -> tuple[str, str] | None:
|
||||
return self.msgdata.get('src_actor_uid')
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if remote_tb := self.msgdata.get('tb_str'):
|
||||
pformat(remote_tb)
|
||||
return (
|
||||
f'{type(self).__name__}(\n'
|
||||
f'msgdata={pformat(self.msgdata)}\n'
|
||||
')'
|
||||
)
|
||||
|
||||
return super().__repr__()
|
||||
|
||||
# TODO: local recontruction of remote exception deats
|
||||
# def unbox(self) -> BaseException:
|
||||
# ...
|
||||
|
||||
|
||||
class InternalActorError(RemoteActorError):
|
||||
'''
|
||||
|
@ -98,8 +134,19 @@ class NoRuntime(RuntimeError):
|
|||
"The root actor has not been initialized yet"
|
||||
|
||||
|
||||
class StreamOverrun(trio.TooSlowError):
|
||||
"This stream was overrun by sender"
|
||||
class StreamOverrun(
|
||||
RemoteActorError,
|
||||
trio.TooSlowError,
|
||||
):
|
||||
'''
|
||||
This stream was overrun by sender
|
||||
|
||||
'''
|
||||
@property
|
||||
def sender(self) -> tuple[str, str] | None:
|
||||
value = self.msgdata.get('sender')
|
||||
if value:
|
||||
return tuple(value)
|
||||
|
||||
|
||||
class AsyncioCancelled(Exception):
|
||||
|
@ -110,6 +157,9 @@ class AsyncioCancelled(Exception):
|
|||
|
||||
'''
|
||||
|
||||
class MessagingError(Exception):
|
||||
'Some kind of unexpected SC messaging dialog issue'
|
||||
|
||||
|
||||
def pack_error(
|
||||
exc: BaseException,
|
||||
|
@ -136,7 +186,15 @@ def pack_error(
|
|||
'src_actor_uid': current_actor().uid,
|
||||
}
|
||||
|
||||
if isinstance(exc, ContextCancelled):
|
||||
# TODO: ?just wholesale proxy `.msgdata: dict`?
|
||||
# XXX WARNING, when i swapped these ctx-semantics
|
||||
# tests started hanging..???!!!???
|
||||
# if msgdata := exc.getattr('msgdata', {}):
|
||||
# error_msg.update(msgdata)
|
||||
if (
|
||||
isinstance(exc, ContextCancelled)
|
||||
or isinstance(exc, StreamOverrun)
|
||||
):
|
||||
error_msg.update(exc.msgdata)
|
||||
|
||||
return {'error': error_msg}
|
||||
|
@ -146,7 +204,8 @@ def unpack_error(
|
|||
|
||||
msg: dict[str, Any],
|
||||
chan=None,
|
||||
err_type=RemoteActorError
|
||||
err_type=RemoteActorError,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> None | Exception:
|
||||
'''
|
||||
|
@ -157,7 +216,7 @@ def unpack_error(
|
|||
which is the responsibilitiy of the caller.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
error_dict: dict[str, dict] | None
|
||||
if (
|
||||
|
@ -214,3 +273,93 @@ def is_multi_cancelled(exc: BaseException) -> bool:
|
|||
) is not None
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _raise_from_no_key_in_msg(
|
||||
ctx: Context,
|
||||
msg: dict,
|
||||
src_err: KeyError,
|
||||
log: StackLevelAdapter, # caller specific `log` obj
|
||||
expect_key: str = 'yield',
|
||||
stream: MsgStream | None = None,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Raise an appopriate local error when a `MsgStream` msg arrives
|
||||
which does not contain the expected (under normal operation)
|
||||
`'yield'` field.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
|
||||
# internal error should never get here
|
||||
try:
|
||||
cid: str = msg['cid']
|
||||
except KeyError as src_err:
|
||||
raise MessagingError(
|
||||
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
|
||||
f'cid: {cid}\n'
|
||||
'received msg:\n'
|
||||
f'{pformat(msg)}\n'
|
||||
) from src_err
|
||||
|
||||
# TODO: test that shows stream raising an expected error!!!
|
||||
if msg.get('error'):
|
||||
# raise the error message
|
||||
raise unpack_error(
|
||||
msg,
|
||||
ctx.chan,
|
||||
) from None
|
||||
|
||||
elif (
|
||||
msg.get('stop')
|
||||
or (
|
||||
stream
|
||||
and stream._eoc
|
||||
)
|
||||
):
|
||||
log.debug(
|
||||
f'Context[{cid}] stream was stopped by remote side\n'
|
||||
f'cid: {cid}\n'
|
||||
)
|
||||
|
||||
# XXX: important to set so that a new ``.receive()``
|
||||
# call (likely by another task using a broadcast receiver)
|
||||
# doesn't accidentally pull the ``return`` message
|
||||
# value out of the underlying feed mem chan!
|
||||
stream._eoc: bool = True
|
||||
|
||||
# TODO: if the a local task is already blocking on
|
||||
# a `Context.result()` and thus a `.receive()` on the
|
||||
# rx-chan, we close the chan and set state ensuring that
|
||||
# an eoc is raised!
|
||||
|
||||
# # when the send is closed we assume the stream has
|
||||
# # terminated and signal this local iterator to stop
|
||||
# await stream.aclose()
|
||||
|
||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
raise trio.EndOfChannel(
|
||||
f'Context stream ended due to msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
) from src_err
|
||||
|
||||
|
||||
if (
|
||||
stream
|
||||
and stream._closed
|
||||
):
|
||||
raise trio.ClosedResourceError('This stream was closed')
|
||||
|
||||
|
||||
# always re-raise the source error if no translation error case
|
||||
# is activated above.
|
||||
_type: str = 'Stream' if stream else 'Context'
|
||||
raise MessagingError(
|
||||
f'{_type} was expecting a `{expect_key}` message'
|
||||
' BUT received a non-`error` msg:\n'
|
||||
f'cid: {cid}\n'
|
||||
'{pformat(msg)}'
|
||||
) from src_err
|
||||
|
|
|
@ -294,9 +294,11 @@ class Channel:
|
|||
self._agen = self._aiter_recv()
|
||||
self._exc: Optional[Exception] = None # set if far end actor errors
|
||||
self._closed: bool = False
|
||||
# flag set on ``Portal.cancel_actor()`` indicating
|
||||
# remote (peer) cancellation of the far end actor runtime.
|
||||
self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
|
||||
|
||||
# flag set by ``Portal.cancel_actor()`` indicating remote
|
||||
# (possibly peer) cancellation of the far end actor
|
||||
# runtime.
|
||||
self._cancel_called: bool = False
|
||||
|
||||
@classmethod
|
||||
def from_stream(
|
||||
|
@ -327,8 +329,11 @@ class Channel:
|
|||
def __repr__(self) -> str:
|
||||
if self.msgstream:
|
||||
return repr(
|
||||
self.msgstream.stream.socket._sock).replace( # type: ignore
|
||||
"socket.socket", "Channel")
|
||||
self.msgstream.stream.socket._sock
|
||||
).replace( # type: ignore
|
||||
"socket.socket",
|
||||
"Channel",
|
||||
)
|
||||
return object.__repr__(self)
|
||||
|
||||
@property
|
||||
|
|
|
@ -33,7 +33,6 @@ from typing import (
|
|||
)
|
||||
from functools import partial
|
||||
from dataclasses import dataclass
|
||||
from pprint import pformat
|
||||
import warnings
|
||||
|
||||
import trio
|
||||
|
@ -45,12 +44,17 @@ from ._ipc import Channel
|
|||
from .log import get_logger
|
||||
from .msg import NamespacePath
|
||||
from ._exceptions import (
|
||||
_raise_from_no_key_in_msg,
|
||||
unpack_error,
|
||||
NoResult,
|
||||
ContextCancelled,
|
||||
)
|
||||
from ._context import Context
|
||||
from ._streaming import MsgStream
|
||||
from ._context import (
|
||||
Context,
|
||||
)
|
||||
from ._streaming import (
|
||||
MsgStream,
|
||||
)
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -64,15 +68,10 @@ def _unwrap_msg(
|
|||
__tracebackhide__ = True
|
||||
try:
|
||||
return msg['return']
|
||||
except KeyError:
|
||||
except KeyError as ke:
|
||||
# internal error should never get here
|
||||
assert msg.get('cid'), "Received internal error at portal?"
|
||||
raise unpack_error(msg, channel) from None
|
||||
|
||||
|
||||
# TODO: maybe move this to ._exceptions?
|
||||
class MessagingError(Exception):
|
||||
'Some kind of unexpected SC messaging dialog issue'
|
||||
raise unpack_error(msg, channel) from ke
|
||||
|
||||
|
||||
class Portal:
|
||||
|
@ -219,14 +218,18 @@ class Portal:
|
|||
|
||||
try:
|
||||
# send cancel cmd - might not get response
|
||||
# XXX: sure would be nice to make this work with a proper shield
|
||||
# XXX: sure would be nice to make this work with
|
||||
# a proper shield
|
||||
with trio.move_on_after(
|
||||
timeout
|
||||
or self.cancel_timeout
|
||||
) as cs:
|
||||
cs.shield = True
|
||||
|
||||
await self.run_from_ns('self', 'cancel')
|
||||
await self.run_from_ns(
|
||||
'self',
|
||||
'cancel',
|
||||
)
|
||||
return True
|
||||
|
||||
if cs.cancelled_caught:
|
||||
|
@ -461,25 +464,18 @@ class Portal:
|
|||
try:
|
||||
# the "first" value here is delivered by the callee's
|
||||
# ``Context.started()`` call.
|
||||
first = msg['started']
|
||||
first: Any = msg['started']
|
||||
ctx._started_called: bool = True
|
||||
|
||||
except KeyError:
|
||||
if not (cid := msg.get('cid')):
|
||||
raise MessagingError(
|
||||
'Received internal error at context?\n'
|
||||
'No call-id (cid) in startup msg?'
|
||||
)
|
||||
except KeyError as src_error:
|
||||
|
||||
if msg.get('error'):
|
||||
# NOTE: mask the key error with the remote one
|
||||
raise unpack_error(msg, self.channel) from None
|
||||
else:
|
||||
raise MessagingError(
|
||||
f'Context for {cid} was expecting a `started` message'
|
||||
' but received a non-error msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
)
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=ctx,
|
||||
msg=msg,
|
||||
src_err=src_error,
|
||||
log=log,
|
||||
expect_key='started',
|
||||
)
|
||||
|
||||
ctx._portal: Portal = self
|
||||
uid: tuple = self.channel.uid
|
||||
|
@ -516,57 +512,102 @@ class Portal:
|
|||
# started in the ctx nursery.
|
||||
ctx._scope.cancel()
|
||||
|
||||
# XXX: (maybe) shield/mask context-cancellations that were
|
||||
# initiated by any of the context's 2 tasks. There are
|
||||
# subsequently 2 operating cases for a "graceful cancel"
|
||||
# of a `Context`:
|
||||
#
|
||||
# 1.*this* side's task called `Context.cancel()`, in
|
||||
# which case we mask the `ContextCancelled` from bubbling
|
||||
# to the opener (much like how `trio.Nursery` swallows
|
||||
# any `trio.Cancelled` bubbled by a call to
|
||||
# `Nursery.cancel_scope.cancel()`)
|
||||
# XXX NOTE XXX: maybe shield against
|
||||
# self-context-cancellation (which raises a local
|
||||
# `ContextCancelled`) when requested (via
|
||||
# `Context.cancel()`) by the same task (tree) which entered
|
||||
# THIS `.open_context()`.
|
||||
#
|
||||
# 2.*the other* side's (callee/spawned) task cancelled due
|
||||
# to a self or peer cancellation request in which case we
|
||||
# DO let the error bubble to the opener.
|
||||
# NOTE: There are 2 operating cases for a "graceful cancel"
|
||||
# of a `Context`. In both cases any `ContextCancelled`
|
||||
# raised in this scope-block came from a transport msg
|
||||
# relayed from some remote-actor-task which our runtime set
|
||||
# as a `Context._remote_error`
|
||||
#
|
||||
# the CASES:
|
||||
#
|
||||
# - if that context IS THE SAME ONE that called
|
||||
# `Context.cancel()`, we want to absorb the error
|
||||
# silently and let this `.open_context()` block to exit
|
||||
# without raising.
|
||||
#
|
||||
# - if it is from some OTHER context (we did NOT call
|
||||
# `.cancel()`), we want to re-RAISE IT whilst also
|
||||
# setting our own ctx's "reason for cancel" to be that
|
||||
# other context's cancellation condition; we set our
|
||||
# `.canceller: tuple[str, str]` to be same value as
|
||||
# caught here in a `ContextCancelled.canceller`.
|
||||
#
|
||||
# Again, there are 2 cases:
|
||||
#
|
||||
# 1-some other context opened in this `.open_context()`
|
||||
# block cancelled due to a self or peer cancellation
|
||||
# request in which case we DO let the error bubble to the
|
||||
# opener.
|
||||
#
|
||||
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
|
||||
# and received a `ContextCanclled` from the "callee"
|
||||
# task, in which case we mask the `ContextCancelled` from
|
||||
# bubbling to this "caller" (much like how `trio.Nursery`
|
||||
# swallows any `trio.Cancelled` bubbled by a call to
|
||||
# `Nursery.cancel_scope.cancel()`)
|
||||
except ContextCancelled as ctxc:
|
||||
scope_err = ctxc
|
||||
|
||||
# CASE 1: this context was never cancelled
|
||||
# via a local task's call to `Context.cancel()`.
|
||||
if not ctx._cancel_called:
|
||||
# XXX: this should NEVER happen!
|
||||
# from ._debug import breakpoint
|
||||
# await breakpoint()
|
||||
raise
|
||||
|
||||
# CASE 2: context was cancelled by local task calling
|
||||
# `.cancel()`, we don't raise and the exit block should
|
||||
# exit silently.
|
||||
else:
|
||||
if (
|
||||
ctx._cancel_called
|
||||
and (
|
||||
ctxc is ctx._remote_error
|
||||
or
|
||||
ctxc.canceller is self.canceller
|
||||
)
|
||||
):
|
||||
log.debug(
|
||||
f'Context {ctx} cancelled gracefully with:\n'
|
||||
f'{ctxc}'
|
||||
)
|
||||
# CASE 1: this context was never cancelled via a local
|
||||
# task (tree) having called `Context.cancel()`, raise
|
||||
# the error since it was caused by someone else!
|
||||
else:
|
||||
raise
|
||||
|
||||
# the above `._scope` can be cancelled due to:
|
||||
# 1. an explicit self cancel via `Context.cancel()` or
|
||||
# `Actor.cancel()`,
|
||||
# 2. any "callee"-side remote error, possibly also a cancellation
|
||||
# request by some peer,
|
||||
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
|
||||
except (
|
||||
# - a standard error in the caller/yieldee
|
||||
# CASE 3: standard local error in this caller/yieldee
|
||||
Exception,
|
||||
|
||||
# - a runtime teardown exception-group and/or
|
||||
# cancellation request from a caller task.
|
||||
BaseExceptionGroup,
|
||||
trio.Cancelled,
|
||||
# CASES 1 & 2: normally manifested as
|
||||
# a `Context._scope_nursery` raised
|
||||
# exception-group of,
|
||||
# 1.-`trio.Cancelled`s, since
|
||||
# `._scope.cancel()` will have been called and any
|
||||
# `ContextCancelled` absorbed and thus NOT RAISED in
|
||||
# any `Context._maybe_raise_remote_err()`,
|
||||
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
|
||||
# from any error raised in the "callee" side with
|
||||
# a group only raised if there was any more then one
|
||||
# task started here in the "caller" in the
|
||||
# `yield`-ed to task.
|
||||
BaseExceptionGroup, # since overrun handler tasks may have been spawned
|
||||
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
|
||||
KeyboardInterrupt,
|
||||
|
||||
) as err:
|
||||
scope_err = err
|
||||
|
||||
# XXX: request cancel of this context on any error.
|
||||
# NOTE: `Context.cancel()` is conversely NOT called in
|
||||
# the `ContextCancelled` "cancellation requested" case
|
||||
# above.
|
||||
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
|
||||
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
|
||||
# the `ContextCancelled` "self cancellation absorbed" case
|
||||
# handled in the block above!
|
||||
log.cancel(
|
||||
'Context cancelled for task due to\n'
|
||||
f'{err}\n'
|
||||
|
@ -585,7 +626,7 @@ class Portal:
|
|||
|
||||
raise # duh
|
||||
|
||||
# no scope error case
|
||||
# no local scope error, the "clean exit with a result" case.
|
||||
else:
|
||||
if ctx.chan.connected():
|
||||
log.info(
|
||||
|
@ -599,15 +640,27 @@ class Portal:
|
|||
# `Context._maybe_raise_remote_err()`) IFF
|
||||
# a `Context._remote_error` was set by the runtime
|
||||
# via a call to
|
||||
# `Context._maybe_cancel_and_set_remote_error()`
|
||||
# which IS SET any time the far end fails and
|
||||
# causes "caller side" cancellation via
|
||||
# a `ContextCancelled` here.
|
||||
result = await ctx.result()
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned value from callee:\n'
|
||||
f'`{result}`'
|
||||
)
|
||||
# `Context._maybe_cancel_and_set_remote_error()`.
|
||||
# As per `Context._deliver_msg()`, that error IS
|
||||
# ALWAYS SET any time "callee" side fails and causes "caller
|
||||
# side" cancellation via a `ContextCancelled` here.
|
||||
# result = await ctx.result()
|
||||
try:
|
||||
result = await ctx.result()
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned value from callee:\n'
|
||||
f'`{result}`'
|
||||
)
|
||||
except BaseException as berr:
|
||||
# on normal teardown, if we get some error
|
||||
# raised in `Context.result()` we still want to
|
||||
# save that error on the ctx's state to
|
||||
# determine things like `.cancelled_caught` for
|
||||
# cases where there was remote cancellation but
|
||||
# this task didn't know until final teardown
|
||||
# / value collection.
|
||||
scope_err = berr
|
||||
raise
|
||||
|
||||
finally:
|
||||
# though it should be impossible for any tasks
|
||||
|
@ -657,12 +710,14 @@ class Portal:
|
|||
with trio.CancelScope(shield=True):
|
||||
await ctx._recv_chan.aclose()
|
||||
|
||||
# XXX: since we always (maybe) re-raise (and thus also
|
||||
# mask runtime machinery related
|
||||
# multi-`trio.Cancelled`s) any scope error which was
|
||||
# the underlying cause of this context's exit, add
|
||||
# different log msgs for each of the (2) cases.
|
||||
# XXX: we always raise remote errors locally and
|
||||
# generally speaking mask runtime-machinery related
|
||||
# multi-`trio.Cancelled`s. As such, any `scope_error`
|
||||
# which was the underlying cause of this context's exit
|
||||
# should be stored as the `Context._local_error` and
|
||||
# used in determining `Context.cancelled_caught: bool`.
|
||||
if scope_err is not None:
|
||||
ctx._local_error: BaseException = scope_err
|
||||
etype: Type[BaseException] = type(scope_err)
|
||||
|
||||
# CASE 2
|
||||
|
@ -691,7 +746,7 @@ class Portal:
|
|||
# child has already cleared it and clobbered IPC.
|
||||
|
||||
# FINALLY, remove the context from runtime tracking and
|
||||
# exit Bo
|
||||
# exit!
|
||||
self.actor._contexts.pop(
|
||||
(self.channel.uid, ctx.cid),
|
||||
None,
|
||||
|
|
|
@ -25,7 +25,6 @@ import logging
|
|||
import signal
|
||||
import sys
|
||||
import os
|
||||
import typing
|
||||
import warnings
|
||||
|
||||
|
||||
|
|
|
@ -15,7 +15,10 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
Actor primitives and helpers
|
||||
The fundamental core machinery implementing every "actor" including
|
||||
the process-local (python-interpreter global) `Actor` state-type
|
||||
primitive(s), RPC-in-task scheduling, and IPC connectivity and
|
||||
low-level transport msg handling.
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
@ -41,8 +44,14 @@ import warnings
|
|||
|
||||
from async_generator import aclosing
|
||||
from exceptiongroup import BaseExceptionGroup
|
||||
import trio # type: ignore
|
||||
from trio_typing import TaskStatus
|
||||
import trio
|
||||
from trio import (
|
||||
CancelScope,
|
||||
)
|
||||
from trio_typing import (
|
||||
Nursery,
|
||||
TaskStatus,
|
||||
)
|
||||
|
||||
from ._ipc import Channel
|
||||
from ._context import (
|
||||
|
@ -86,21 +95,22 @@ async def _invoke(
|
|||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
'''
|
||||
Invoke local func and deliver result(s) over provided channel.
|
||||
Schedule a `trio` task-as-func and deliver result(s) over
|
||||
connected IPC channel.
|
||||
|
||||
This is the core "RPC task" starting machinery.
|
||||
This is the core "RPC" `trio.Task` scheduling machinery used to start every
|
||||
remotely invoked function, normally in `Actor._service_n: Nursery`.
|
||||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
treat_as_gen: bool = False
|
||||
failed_resp: bool = False
|
||||
|
||||
# possibly a traceback (not sure what typing is for this..)
|
||||
tb = None
|
||||
|
||||
cancel_scope = trio.CancelScope()
|
||||
cancel_scope = CancelScope()
|
||||
# activated cancel scope ref
|
||||
cs: trio.CancelScope | None = None
|
||||
cs: CancelScope | None = None
|
||||
|
||||
ctx = actor.get_context(
|
||||
chan,
|
||||
|
@ -112,6 +122,7 @@ async def _invoke(
|
|||
)
|
||||
context: bool = False
|
||||
|
||||
# TODO: deprecate this style..
|
||||
if getattr(func, '_tractor_stream_function', False):
|
||||
# handle decorated ``@tractor.stream`` async functions
|
||||
sig = inspect.signature(func)
|
||||
|
@ -153,6 +164,7 @@ async def _invoke(
|
|||
except TypeError:
|
||||
raise
|
||||
|
||||
# TODO: can we unify this with the `context=True` impl below?
|
||||
if inspect.isasyncgen(coro):
|
||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||
# XXX: massive gotcha! If the containing scope
|
||||
|
@ -183,6 +195,7 @@ async def _invoke(
|
|||
await chan.send({'stop': True, 'cid': cid})
|
||||
|
||||
# one way @stream func that gets treated like an async gen
|
||||
# TODO: can we unify this with the `context=True` impl below?
|
||||
elif treat_as_gen:
|
||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||
# XXX: the async-func may spawn further tasks which push
|
||||
|
@ -199,6 +212,20 @@ async def _invoke(
|
|||
# far end async gen to tear down
|
||||
await chan.send({'stop': True, 'cid': cid})
|
||||
|
||||
# our most general case: a remote SC-transitive,
|
||||
# IPC-linked, cross-actor-task "context"
|
||||
# ------ - ------
|
||||
# TODO: every other "func type" should be implemented from
|
||||
# a special case of this impl eventually!
|
||||
# -[ ] streaming funcs should instead of being async-for
|
||||
# handled directly here wrapped in
|
||||
# a async-with-open_stream() closure that does the
|
||||
# normal thing you'd expect a far end streaming context
|
||||
# to (if written by the app-dev).
|
||||
# -[ ] one off async funcs can literally just be called
|
||||
# here and awaited directly, possibly just with a small
|
||||
# wrapper that calls `Context.started()` and then does
|
||||
# the `await coro()`?
|
||||
elif context:
|
||||
# context func with support for bi-dir streaming
|
||||
await chan.send({'functype': 'context', 'cid': cid})
|
||||
|
@ -209,21 +236,30 @@ async def _invoke(
|
|||
ctx._scope = nurse.cancel_scope
|
||||
task_status.started(ctx)
|
||||
res = await coro
|
||||
await chan.send({'return': res, 'cid': cid})
|
||||
await chan.send({
|
||||
'return': res,
|
||||
'cid': cid
|
||||
})
|
||||
|
||||
# XXX: do we ever trigger this block any more?
|
||||
except (
|
||||
BaseExceptionGroup,
|
||||
trio.Cancelled,
|
||||
):
|
||||
# if a context error was set then likely
|
||||
# thei multierror was raised due to that
|
||||
if ctx._remote_error is not None:
|
||||
raise ctx._remote_error
|
||||
) as scope_error:
|
||||
|
||||
# maybe TODO: pack in ``trio.Cancelled.__traceback__`` here
|
||||
# so they can be unwrapped and displayed on the caller
|
||||
# side?
|
||||
# always set this (callee) side's exception as the
|
||||
# local error on the context
|
||||
ctx._local_error: BaseException = scope_error
|
||||
|
||||
# if a remote error was set then likely the
|
||||
# exception group was raised due to that, so
|
||||
# and we instead raise that error immediately!
|
||||
if re := ctx._remote_error:
|
||||
ctx._maybe_raise_remote_err(re)
|
||||
|
||||
# maybe TODO: pack in
|
||||
# ``trio.Cancelled.__traceback__`` here so they can
|
||||
# be unwrapped and displayed on the caller side?
|
||||
raise
|
||||
|
||||
finally:
|
||||
|
@ -234,11 +270,11 @@ async def _invoke(
|
|||
# don't pop the local context until we know the
|
||||
# associated child isn't in debug any more
|
||||
await _debug.maybe_wait_for_debugger()
|
||||
ctx = actor._contexts.pop((chan.uid, cid))
|
||||
if ctx:
|
||||
log.runtime(
|
||||
f'Context entrypoint {func} was terminated:\n{ctx}'
|
||||
)
|
||||
ctx: Context = actor._contexts.pop((chan.uid, cid))
|
||||
log.runtime(
|
||||
f'Context entrypoint {func} was terminated:\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
|
||||
if ctx.cancelled_caught:
|
||||
|
||||
|
@ -246,44 +282,46 @@ async def _invoke(
|
|||
# before raising any context cancelled case
|
||||
# so that real remote errors don't get masked as
|
||||
# ``ContextCancelled``s.
|
||||
re = ctx._remote_error
|
||||
if re:
|
||||
if re := ctx._remote_error:
|
||||
ctx._maybe_raise_remote_err(re)
|
||||
|
||||
fname = func.__name__
|
||||
cs: trio.CancelScope = ctx._scope
|
||||
fname: str = func.__name__
|
||||
cs: CancelScope = ctx._scope
|
||||
if cs.cancel_called:
|
||||
canceller = ctx._cancelled_remote
|
||||
# await _debug.breakpoint()
|
||||
our_uid: tuple = actor.uid
|
||||
canceller: tuple = ctx.canceller
|
||||
msg: str = (
|
||||
f'`{fname}()`@{our_uid} cancelled by '
|
||||
)
|
||||
|
||||
# NOTE / TODO: if we end up having
|
||||
# ``Actor._cancel_task()`` call
|
||||
# ``Context.cancel()`` directly, we're going to
|
||||
# need to change this logic branch since it will
|
||||
# always enter..
|
||||
# need to change this logic branch since it
|
||||
# will always enter..
|
||||
if ctx._cancel_called:
|
||||
msg = f'`{fname}()`@{actor.uid} cancelled itself'
|
||||
|
||||
else:
|
||||
msg = (
|
||||
f'`{fname}()`@{actor.uid} '
|
||||
'was remotely cancelled by '
|
||||
)
|
||||
# TODO: test for this!!!!!
|
||||
canceller: tuple = our_uid
|
||||
msg += 'itself '
|
||||
|
||||
# if the channel which spawned the ctx is the
|
||||
# one that cancelled it then we report that, vs.
|
||||
# it being some other random actor that for ex.
|
||||
# some actor who calls `Portal.cancel_actor()`
|
||||
# and by side-effect cancels this ctx.
|
||||
if canceller == ctx.chan.uid:
|
||||
msg += f'its caller {canceller}'
|
||||
elif canceller == ctx.chan.uid:
|
||||
msg += f'its caller {canceller} '
|
||||
|
||||
else:
|
||||
msg += f'remote actor {canceller}'
|
||||
|
||||
# TODO: does this ever get set any more or can
|
||||
# we remove it?
|
||||
if ctx._cancel_msg:
|
||||
msg += f' with msg:\n{ctx._cancel_msg}'
|
||||
msg += (
|
||||
' with msg:\n'
|
||||
f'{ctx._cancel_msg}'
|
||||
)
|
||||
|
||||
# task-contex was either cancelled by request using
|
||||
# ``Portal.cancel_actor()`` or ``Context.cancel()``
|
||||
|
@ -296,37 +334,76 @@ async def _invoke(
|
|||
canceller=canceller,
|
||||
)
|
||||
|
||||
# regular async function/method
|
||||
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
||||
# from a remote request to cancel some `Context`.
|
||||
# ------ - ------
|
||||
# TODO: ideally we unify this with the above `context=True`
|
||||
# block such that for any remote invocation ftype, we
|
||||
# always invoke the far end RPC task scheduling the same
|
||||
# way: using the linked IPC context machinery.
|
||||
else:
|
||||
# regular async function
|
||||
try:
|
||||
await chan.send({'functype': 'asyncfunc', 'cid': cid})
|
||||
except trio.BrokenResourceError:
|
||||
await chan.send({
|
||||
'functype': 'asyncfunc',
|
||||
'cid': cid
|
||||
})
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
BrokenPipeError,
|
||||
) as ipc_err:
|
||||
failed_resp = True
|
||||
if is_rpc:
|
||||
raise
|
||||
else:
|
||||
# TODO: should this be an `.exception()` call?
|
||||
log.warning(
|
||||
f'Failed to respond to non-rpc request: {func}'
|
||||
f'Failed to respond to non-rpc request: {func}\n'
|
||||
f'{ipc_err}'
|
||||
)
|
||||
|
||||
with cancel_scope as cs:
|
||||
ctx._scope = cs
|
||||
ctx._scope: CancelScope = cs
|
||||
task_status.started(ctx)
|
||||
result = await coro
|
||||
fname = func.__name__
|
||||
fname: str = func.__name__
|
||||
log.runtime(f'{fname}() result: {result}')
|
||||
if not failed_resp:
|
||||
# only send result if we know IPC isn't down
|
||||
await chan.send(
|
||||
{'return': result,
|
||||
'cid': cid}
|
||||
)
|
||||
|
||||
# NOTE: only send result if we know IPC isn't down
|
||||
if (
|
||||
not failed_resp
|
||||
and chan.connected()
|
||||
):
|
||||
try:
|
||||
await chan.send(
|
||||
{'return': result,
|
||||
'cid': cid}
|
||||
)
|
||||
except (
|
||||
BrokenPipeError,
|
||||
trio.BrokenResourceError,
|
||||
):
|
||||
log.warning(
|
||||
'Failed to return result:\n'
|
||||
f'{func}@{actor.uid}\n'
|
||||
f'remote chan: {chan.uid}'
|
||||
)
|
||||
|
||||
except (
|
||||
Exception,
|
||||
BaseExceptionGroup,
|
||||
) as err:
|
||||
|
||||
# always hide this frame from debug REPL if the crash
|
||||
# originated from an rpc task and we DID NOT fail
|
||||
# due to an IPC transport error!
|
||||
if (
|
||||
is_rpc
|
||||
and chan.connected()
|
||||
):
|
||||
__tracebackhide__: bool = True
|
||||
|
||||
if not is_multi_cancelled(err):
|
||||
|
||||
# TODO: maybe we'll want different "levels" of debugging
|
||||
|
@ -360,24 +437,31 @@ async def _invoke(
|
|||
log.exception("Actor crashed:")
|
||||
|
||||
# always ship errors back to caller
|
||||
err_msg = pack_error(err, tb=tb)
|
||||
err_msg: dict[str, dict] = pack_error(
|
||||
err,
|
||||
tb=tb,
|
||||
)
|
||||
err_msg['cid'] = cid
|
||||
|
||||
try:
|
||||
await chan.send(err_msg)
|
||||
if is_rpc:
|
||||
try:
|
||||
await chan.send(err_msg)
|
||||
|
||||
# TODO: tests for this scenario:
|
||||
# - RPC caller closes connection before getting a response
|
||||
# should **not** crash this actor..
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
BrokenPipeError,
|
||||
):
|
||||
# if we can't propagate the error that's a big boo boo
|
||||
log.exception(
|
||||
f"Failed to ship error to caller @ {chan.uid} !?"
|
||||
)
|
||||
# TODO: tests for this scenario:
|
||||
# - RPC caller closes connection before getting a response
|
||||
# should **not** crash this actor..
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
BrokenPipeError,
|
||||
) as ipc_err:
|
||||
|
||||
# if we can't propagate the error that's a big boo boo
|
||||
log.exception(
|
||||
f"Failed to ship error to caller @ {chan.uid} !?\n"
|
||||
f'{ipc_err}'
|
||||
|
||||
)
|
||||
|
||||
# error is probably from above coro running code *not from the
|
||||
# underlyingn rpc invocation* since a scope was never allocated
|
||||
|
@ -403,7 +487,11 @@ async def _invoke(
|
|||
log.warning(
|
||||
f"Task {func} likely errored or cancelled before start")
|
||||
else:
|
||||
log.cancel(f'{func.__name__}({kwargs}) failed?')
|
||||
log.cancel(
|
||||
'Failed to de-alloc internal task!?\n'
|
||||
f'cid: {cid}\n'
|
||||
f'{func.__name__}({kwargs})'
|
||||
)
|
||||
|
||||
finally:
|
||||
if not actor._rpc_tasks:
|
||||
|
@ -420,7 +508,7 @@ async def try_ship_error_to_parent(
|
|||
err: Union[Exception, BaseExceptionGroup],
|
||||
|
||||
) -> None:
|
||||
with trio.CancelScope(shield=True):
|
||||
with CancelScope(shield=True):
|
||||
try:
|
||||
# internal error so ship to parent without cid
|
||||
await channel.send(pack_error(err))
|
||||
|
@ -467,13 +555,13 @@ class Actor:
|
|||
msg_buffer_size: int = 2**6
|
||||
|
||||
# nursery placeholders filled in by `async_main()` after fork
|
||||
_root_n: trio.Nursery | None = None
|
||||
_service_n: trio.Nursery | None = None
|
||||
_server_n: trio.Nursery | None = None
|
||||
_root_n: Nursery | None = None
|
||||
_service_n: Nursery | None = None
|
||||
_server_n: Nursery | None = None
|
||||
|
||||
# Information about `__main__` from parent
|
||||
_parent_main_data: dict[str, str]
|
||||
_parent_chan_cs: trio.CancelScope | None = None
|
||||
_parent_chan_cs: CancelScope | None = None
|
||||
|
||||
# syncs for setup/teardown sequences
|
||||
_server_down: trio.Event | None = None
|
||||
|
@ -1005,7 +1093,7 @@ class Actor:
|
|||
|
||||
async def _serve_forever(
|
||||
self,
|
||||
handler_nursery: trio.Nursery,
|
||||
handler_nursery: Nursery,
|
||||
*,
|
||||
# (host, port) to bind for channel server
|
||||
accept_host: tuple[str, int] | None = None,
|
||||
|
@ -1073,12 +1161,17 @@ class Actor:
|
|||
- return control the parent channel message loop
|
||||
|
||||
'''
|
||||
log.cancel(f"{self.uid} is trying to cancel")
|
||||
log.cancel(
|
||||
f'{self.uid} requested to cancel by:\n'
|
||||
f'{requesting_uid}'
|
||||
)
|
||||
|
||||
# TODO: what happens here when we self-cancel tho?
|
||||
self._cancel_called_by_remote: tuple = requesting_uid
|
||||
self._cancel_called = True
|
||||
|
||||
# cancel all ongoing rpc tasks
|
||||
with trio.CancelScope(shield=True):
|
||||
with CancelScope(shield=True):
|
||||
|
||||
# kill any debugger request task to avoid deadlock
|
||||
# with the root actor in this tree
|
||||
|
@ -1088,7 +1181,9 @@ class Actor:
|
|||
dbcs.cancel()
|
||||
|
||||
# kill all ongoing tasks
|
||||
await self.cancel_rpc_tasks(requesting_uid=requesting_uid)
|
||||
await self.cancel_rpc_tasks(
|
||||
requesting_uid=requesting_uid,
|
||||
)
|
||||
|
||||
# stop channel server
|
||||
self.cancel_server()
|
||||
|
@ -1118,8 +1213,8 @@ class Actor:
|
|||
self,
|
||||
cid: str,
|
||||
chan: Channel,
|
||||
|
||||
requesting_uid: tuple[str, str] | None = None,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Cancel a local task by call-id / channel.
|
||||
|
@ -1136,7 +1231,7 @@ class Actor:
|
|||
# this ctx based lookup ensures the requested task to
|
||||
# be cancelled was indeed spawned by a request from this channel
|
||||
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
||||
scope = ctx._scope
|
||||
scope: CancelScope = ctx._scope
|
||||
except KeyError:
|
||||
log.cancel(f"{cid} has already completed/terminated?")
|
||||
return True
|
||||
|
@ -1146,10 +1241,10 @@ class Actor:
|
|||
f"peer: {chan.uid}\n")
|
||||
|
||||
if (
|
||||
ctx._cancelled_remote is None
|
||||
ctx._canceller is None
|
||||
and requesting_uid
|
||||
):
|
||||
ctx._cancelled_remote: tuple = requesting_uid
|
||||
ctx._canceller: tuple = requesting_uid
|
||||
|
||||
# don't allow cancelling this function mid-execution
|
||||
# (is this necessary?)
|
||||
|
@ -1159,6 +1254,7 @@ class Actor:
|
|||
# TODO: shouldn't we eventually be calling ``Context.cancel()``
|
||||
# directly here instead (since that method can handle both
|
||||
# side's calls into it?
|
||||
# await ctx.cancel()
|
||||
scope.cancel()
|
||||
|
||||
# wait for _invoke to mark the task complete
|
||||
|
@ -1186,9 +1282,12 @@ class Actor:
|
|||
registered for each.
|
||||
|
||||
'''
|
||||
tasks = self._rpc_tasks
|
||||
tasks: dict = self._rpc_tasks
|
||||
if tasks:
|
||||
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||
log.cancel(
|
||||
f'Cancelling all {len(tasks)} rpc tasks:\n'
|
||||
f'{tasks}'
|
||||
)
|
||||
for (
|
||||
(chan, cid),
|
||||
(ctx, func, is_complete),
|
||||
|
@ -1206,7 +1305,9 @@ class Actor:
|
|||
)
|
||||
|
||||
log.cancel(
|
||||
f"Waiting for remaining rpc tasks to complete {tasks}")
|
||||
'Waiting for remaining rpc tasks to complete:\n'
|
||||
f'{tasks}'
|
||||
)
|
||||
await self._ongoing_rpc_tasks.wait()
|
||||
|
||||
def cancel_server(self) -> None:
|
||||
|
@ -1436,7 +1537,7 @@ async def async_main(
|
|||
# block it might be actually possible to debug THIS
|
||||
# machinery in the same way as user task code?
|
||||
# if actor.name == 'brokerd.ib':
|
||||
# with trio.CancelScope(shield=True):
|
||||
# with CancelScope(shield=True):
|
||||
# await _debug.breakpoint()
|
||||
|
||||
actor.lifetime_stack.close()
|
||||
|
@ -1472,7 +1573,7 @@ async def async_main(
|
|||
):
|
||||
log.runtime(
|
||||
f"Waiting for remaining peers {actor._peers} to clear")
|
||||
with trio.CancelScope(shield=True):
|
||||
with CancelScope(shield=True):
|
||||
await actor._no_more_peers.wait()
|
||||
log.runtime("All peer channels are complete")
|
||||
|
||||
|
@ -1483,7 +1584,7 @@ async def process_messages(
|
|||
actor: Actor,
|
||||
chan: Channel,
|
||||
shield: bool = False,
|
||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
|
@ -1501,7 +1602,7 @@ async def process_messages(
|
|||
|
||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||
try:
|
||||
with trio.CancelScope(shield=shield) as loop_cs:
|
||||
with CancelScope(shield=shield) as loop_cs:
|
||||
# this internal scope allows for keeping this message
|
||||
# loop running despite the current task having been
|
||||
# cancelled (eg. `open_portal()` may call this method from
|
||||
|
@ -1563,18 +1664,18 @@ async def process_messages(
|
|||
|
||||
if ns == 'self':
|
||||
if funcname == 'cancel':
|
||||
func = actor.cancel
|
||||
func: Callable = actor.cancel
|
||||
kwargs['requesting_uid'] = chan.uid
|
||||
|
||||
# don't start entire actor runtime cancellation
|
||||
# if this actor is currently in debug mode!
|
||||
pdb_complete = _debug.Lock.local_pdb_complete
|
||||
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
|
||||
if pdb_complete:
|
||||
await pdb_complete.wait()
|
||||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
with trio.CancelScope(shield=True):
|
||||
with CancelScope(shield=True):
|
||||
# actor.cancel() was called so kill this
|
||||
# msg loop and break out into
|
||||
# ``async_main()``
|
||||
|
@ -1602,7 +1703,7 @@ async def process_messages(
|
|||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
# with trio.CancelScope(shield=True):
|
||||
# with CancelScope(shield=True):
|
||||
kwargs['chan'] = chan
|
||||
target_cid = kwargs['cid']
|
||||
kwargs['requesting_uid'] = chan.uid
|
||||
|
@ -1627,7 +1728,7 @@ async def process_messages(
|
|||
else:
|
||||
# normally registry methods, eg.
|
||||
# ``.register_actor()`` etc.
|
||||
func = getattr(actor, funcname)
|
||||
func: Callable = getattr(actor, funcname)
|
||||
|
||||
else:
|
||||
# complain to client about restricted modules
|
||||
|
@ -1717,9 +1818,10 @@ async def process_messages(
|
|||
Exception,
|
||||
BaseExceptionGroup,
|
||||
) as err:
|
||||
|
||||
if nursery_cancelled_before_task:
|
||||
sn = actor._service_n
|
||||
assert sn and sn.cancel_scope.cancel_called
|
||||
sn: Nursery = actor._service_n
|
||||
assert sn and sn.cancel_scope.cancel_called # sanity
|
||||
log.cancel(
|
||||
f'Service nursery cancelled before it handled {funcname}'
|
||||
)
|
||||
|
|
|
@ -204,6 +204,21 @@ async def do_hard_kill(
|
|||
# terminate_after: int = 99999,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Un-gracefully terminate an OS level `trio.Process` after timeout.
|
||||
|
||||
Used in 2 main cases:
|
||||
|
||||
- "unknown remote runtime state": a hanging/stalled actor that
|
||||
isn't responding after sending a (graceful) runtime cancel
|
||||
request via an IPC msg.
|
||||
- "cancelled during spawn": a process who's actor runtime was
|
||||
cancelled before full startup completed (such that
|
||||
cancel-request-handling machinery was never fully
|
||||
initialized) and thus a "cancel request msg" is never going
|
||||
to be handled.
|
||||
|
||||
'''
|
||||
# NOTE: this timeout used to do nothing since we were shielding
|
||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||
# never release until the process exits, now it acts as
|
||||
|
@ -219,6 +234,9 @@ async def do_hard_kill(
|
|||
# and wait for it to exit. If cancelled, kills the process and
|
||||
# waits for it to finish exiting before propagating the
|
||||
# cancellation.
|
||||
#
|
||||
# This code was originally triggred by ``proc.__aexit__()``
|
||||
# but now must be called manually.
|
||||
with trio.CancelScope(shield=True):
|
||||
if proc.stdin is not None:
|
||||
await proc.stdin.aclose()
|
||||
|
@ -234,10 +252,14 @@ async def do_hard_kill(
|
|||
with trio.CancelScope(shield=True):
|
||||
await proc.wait()
|
||||
|
||||
# XXX NOTE XXX: zombie squad dispatch:
|
||||
# (should ideally never, but) If we do get here it means
|
||||
# graceful termination of a process failed and we need to
|
||||
# resort to OS level signalling to interrupt and cancel the
|
||||
# (presumably stalled or hung) actor. Since we never allow
|
||||
# zombies (as a feature) we ask the OS to do send in the
|
||||
# removal swad as the last resort.
|
||||
if cs.cancelled_caught:
|
||||
# XXX: should pretty much never get here unless we have
|
||||
# to move the bits from ``proc.__aexit__()`` out and
|
||||
# into here.
|
||||
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
|
||||
proc.kill()
|
||||
|
||||
|
@ -252,10 +274,13 @@ async def soft_wait(
|
|||
portal: Portal,
|
||||
|
||||
) -> None:
|
||||
# Wait for proc termination but **dont' yet** call
|
||||
# ``trio.Process.__aexit__()`` (it tears down stdio
|
||||
# which will kill any waiting remote pdb trace).
|
||||
# This is a "soft" (cancellable) join/reap.
|
||||
'''
|
||||
Wait for proc termination but **dont' yet** teardown
|
||||
std-streams (since it will clobber any ongoing pdb REPL
|
||||
session). This is our "soft" (and thus itself cancellable)
|
||||
join/reap on an actor-runtime-in-process.
|
||||
|
||||
'''
|
||||
uid = portal.channel.uid
|
||||
try:
|
||||
log.cancel(f'Soft waiting on actor:\n{uid}')
|
||||
|
@ -278,7 +303,13 @@ async def soft_wait(
|
|||
await wait_func(proc)
|
||||
n.cancel_scope.cancel()
|
||||
|
||||
# start a task to wait on the termination of the
|
||||
# process by itself waiting on a (caller provided) wait
|
||||
# function which should unblock when the target process
|
||||
# has terminated.
|
||||
n.start_soon(cancel_on_proc_deth)
|
||||
|
||||
# send the actor-runtime a cancel request.
|
||||
await portal.cancel_actor()
|
||||
|
||||
if proc.poll() is None: # type: ignore
|
||||
|
|
|
@ -34,7 +34,7 @@ import warnings
|
|||
import trio
|
||||
|
||||
from ._exceptions import (
|
||||
unpack_error,
|
||||
_raise_from_no_key_in_msg,
|
||||
)
|
||||
from .log import get_logger
|
||||
from .trionics import (
|
||||
|
@ -54,61 +54,6 @@ log = get_logger(__name__)
|
|||
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
|
||||
# - use __slots__ on ``Context``?
|
||||
|
||||
def _raise_from_no_yield_msg(
|
||||
stream: MsgStream,
|
||||
msg: dict,
|
||||
src_err: KeyError,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Raise an appopriate local error when a `MsgStream` msg arrives
|
||||
which does not contain the expected (under normal operation)
|
||||
`'yield'` field.
|
||||
|
||||
'''
|
||||
# internal error should never get here
|
||||
assert msg.get('cid'), ("Received internal error at portal?")
|
||||
|
||||
# TODO: handle 2 cases with 3.10+ match syntax
|
||||
# - 'stop'
|
||||
# - 'error'
|
||||
# possibly just handle msg['stop'] here!
|
||||
|
||||
if stream._closed:
|
||||
raise trio.ClosedResourceError('This stream was closed')
|
||||
|
||||
if msg.get('stop') or stream._eoc:
|
||||
log.debug(f"{stream} was stopped at remote end")
|
||||
|
||||
# XXX: important to set so that a new ``.receive()``
|
||||
# call (likely by another task using a broadcast receiver)
|
||||
# doesn't accidentally pull the ``return`` message
|
||||
# value out of the underlying feed mem chan!
|
||||
stream._eoc = True
|
||||
|
||||
# # when the send is closed we assume the stream has
|
||||
# # terminated and signal this local iterator to stop
|
||||
# await stream.aclose()
|
||||
|
||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
raise trio.EndOfChannel from src_err
|
||||
|
||||
# TODO: test that shows stream raising an expected error!!!
|
||||
elif msg.get('error'):
|
||||
# raise the error message
|
||||
raise unpack_error(msg, stream._ctx.chan)
|
||||
|
||||
# always re-raise the source error if no translation error
|
||||
# case is activated above.
|
||||
raise src_err
|
||||
# raise RuntimeError(
|
||||
# 'Unknown non-yield stream msg?\n'
|
||||
# f'{msg}'
|
||||
# )
|
||||
|
||||
|
||||
class MsgStream(trio.abc.Channel):
|
||||
'''
|
||||
A bidirectional message stream for receiving logically sequenced
|
||||
|
@ -148,10 +93,13 @@ class MsgStream(trio.abc.Channel):
|
|||
try:
|
||||
return msg['yield']
|
||||
except KeyError as kerr:
|
||||
_raise_from_no_yield_msg(
|
||||
stream=self,
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=self._ctx,
|
||||
msg=msg,
|
||||
src_err=kerr,
|
||||
log=log,
|
||||
expect_key='yield',
|
||||
stream=self,
|
||||
)
|
||||
|
||||
async def receive(self):
|
||||
|
@ -161,6 +109,16 @@ class MsgStream(trio.abc.Channel):
|
|||
determined by the underlying protocol).
|
||||
|
||||
'''
|
||||
# NOTE: `trio.ReceiveChannel` implements
|
||||
# EOC handling as follows (aka uses it
|
||||
# to gracefully exit async for loops):
|
||||
#
|
||||
# async def __anext__(self) -> ReceiveType:
|
||||
# try:
|
||||
# return await self.receive()
|
||||
# except trio.EndOfChannel:
|
||||
# raise StopAsyncIteration
|
||||
|
||||
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||
# introducing this
|
||||
if self._eoc:
|
||||
|
@ -174,10 +132,13 @@ class MsgStream(trio.abc.Channel):
|
|||
return msg['yield']
|
||||
|
||||
except KeyError as kerr:
|
||||
_raise_from_no_yield_msg(
|
||||
stream=self,
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=self._ctx,
|
||||
msg=msg,
|
||||
src_err=kerr,
|
||||
log=log,
|
||||
expect_key='yield',
|
||||
stream=self,
|
||||
)
|
||||
|
||||
except (
|
||||
|
|
|
@ -48,12 +48,15 @@ LOG_FORMAT = (
|
|||
|
||||
DATE_FORMAT = '%b %d %H:%M:%S'
|
||||
|
||||
LEVELS = {
|
||||
LEVELS: dict[str, int] = {
|
||||
'TRANSPORT': 5,
|
||||
'RUNTIME': 15,
|
||||
'CANCEL': 16,
|
||||
'PDB': 500,
|
||||
}
|
||||
# _custom_levels: set[str] = {
|
||||
# lvlname.lower for lvlname in LEVELS.keys()
|
||||
# }
|
||||
|
||||
STD_PALETTE = {
|
||||
'CRITICAL': 'red',
|
||||
|
@ -102,7 +105,11 @@ class StackLevelAdapter(logging.LoggerAdapter):
|
|||
Cancellation logging, mostly for runtime reporting.
|
||||
|
||||
'''
|
||||
return self.log(16, msg)
|
||||
return self.log(
|
||||
level=16,
|
||||
msg=msg,
|
||||
# stacklevel=4,
|
||||
)
|
||||
|
||||
def pdb(
|
||||
self,
|
||||
|
@ -114,14 +121,37 @@ class StackLevelAdapter(logging.LoggerAdapter):
|
|||
'''
|
||||
return self.log(500, msg)
|
||||
|
||||
def log(self, level, msg, *args, **kwargs):
|
||||
"""
|
||||
def log(
|
||||
self,
|
||||
level,
|
||||
msg,
|
||||
*args,
|
||||
**kwargs,
|
||||
):
|
||||
'''
|
||||
Delegate a log call to the underlying logger, after adding
|
||||
contextual information from this adapter instance.
|
||||
"""
|
||||
|
||||
'''
|
||||
if self.isEnabledFor(level):
|
||||
stacklevel: int = 3
|
||||
if (
|
||||
level in LEVELS.values()
|
||||
# or level in _custom_levels
|
||||
):
|
||||
stacklevel: int = 4
|
||||
|
||||
# msg, kwargs = self.process(msg, kwargs)
|
||||
self._log(level, msg, args, **kwargs)
|
||||
self._log(
|
||||
level=level,
|
||||
msg=msg,
|
||||
args=args,
|
||||
# NOTE: not sure how this worked before but, it
|
||||
# seems with our custom level methods defined above
|
||||
# we do indeed (now) require another stack level??
|
||||
stacklevel=stacklevel,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# LOL, the stdlib doesn't allow passing through ``stacklevel``..
|
||||
def _log(
|
||||
|
@ -134,12 +164,15 @@ class StackLevelAdapter(logging.LoggerAdapter):
|
|||
stack_info=False,
|
||||
|
||||
# XXX: bit we added to show fileinfo from actual caller.
|
||||
# this level then ``.log()`` then finally the caller's level..
|
||||
stacklevel=3,
|
||||
# - this level
|
||||
# - then ``.log()``
|
||||
# - then finally the caller's level..
|
||||
stacklevel=4,
|
||||
):
|
||||
"""
|
||||
'''
|
||||
Low-level log implementation, proxied to allow nested logger adapters.
|
||||
"""
|
||||
|
||||
'''
|
||||
return self.logger._log(
|
||||
level,
|
||||
msg,
|
||||
|
|
|
@ -19,22 +19,13 @@ Sugary patterns for trio + tractor designs.
|
|||
|
||||
'''
|
||||
from ._mngrs import (
|
||||
gather_contexts,
|
||||
maybe_open_context,
|
||||
maybe_open_nursery,
|
||||
gather_contexts as gather_contexts,
|
||||
maybe_open_context as maybe_open_context,
|
||||
maybe_open_nursery as maybe_open_nursery,
|
||||
)
|
||||
from ._broadcast import (
|
||||
broadcast_receiver,
|
||||
BroadcastReceiver,
|
||||
Lagged,
|
||||
AsyncReceiver as AsyncReceiver,
|
||||
broadcast_receiver as broadcast_receiver,
|
||||
BroadcastReceiver as BroadcastReceiver,
|
||||
Lagged as Lagged,
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
'gather_contexts',
|
||||
'broadcast_receiver',
|
||||
'BroadcastReceiver',
|
||||
'Lagged',
|
||||
'maybe_open_context',
|
||||
'maybe_open_nursery',
|
||||
]
|
||||
|
|
|
@ -225,6 +225,7 @@ async def maybe_open_context(
|
|||
|
||||
# yielded output
|
||||
yielded: Any = None
|
||||
lock_registered: bool = False
|
||||
|
||||
# Lock resource acquisition around task racing / ``trio``'s
|
||||
# scheduler protocol.
|
||||
|
@ -232,6 +233,7 @@ async def maybe_open_context(
|
|||
# to allow re-entrant use cases where one `maybe_open_context()`
|
||||
# wrapped factor may want to call into another.
|
||||
lock = _Cache.locks.setdefault(fid, trio.Lock())
|
||||
lock_registered: bool = True
|
||||
await lock.acquire()
|
||||
|
||||
# XXX: one singleton nursery per actor and we want to
|
||||
|
@ -291,4 +293,9 @@ async def maybe_open_context(
|
|||
_, no_more_users = entry
|
||||
no_more_users.set()
|
||||
|
||||
_Cache.locks.pop(fid)
|
||||
if lock_registered:
|
||||
maybe_lock = _Cache.locks.pop(fid, None)
|
||||
if maybe_lock is None:
|
||||
log.error(
|
||||
f'Resource lock for {fid} ALREADY POPPED?'
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue