Compare commits

..

No commits in common. "9fc9b10b5370f9384955e62655bd731b2aa7c6d1" and "fdf0c43bfa1d94e0c6a6be9a090d1eaad3ae7408" have entirely different histories.

27 changed files with 552 additions and 1166 deletions

View File

@ -47,7 +47,7 @@ async def do_nuthin():
],
ids=['no_args', 'unexpected_args'],
)
def test_remote_error(reg_addr, args_err):
def test_remote_error(arb_addr, args_err):
"""Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin
error type info and causes cancellation and reraising all the
@ -57,7 +57,7 @@ def test_remote_error(reg_addr, args_err):
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
) as nursery:
# on a remote type error caused by bad input args
@ -97,7 +97,7 @@ def test_remote_error(reg_addr, args_err):
assert exc.type == errtype
def test_multierror(reg_addr):
def test_multierror(arb_addr):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.
@ -105,7 +105,7 @@ def test_multierror(reg_addr):
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
) as nursery:
await nursery.run_in_actor(assert_err, name='errorer1')
@ -130,14 +130,14 @@ def test_multierror(reg_addr):
@pytest.mark.parametrize(
'num_subactors', range(25, 26),
)
def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
) as nursery:
for i in range(num_subactors):
@ -175,20 +175,15 @@ async def do_nothing():
@pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt])
def test_cancel_single_subactor(reg_addr, mechanism):
'''
Ensure a ``ActorNursery.start_actor()`` spawned subactor
def test_cancel_single_subactor(arb_addr, mechanism):
"""Ensure a ``ActorNursery.start_actor()`` spawned subactor
cancels when the nursery is cancelled.
'''
"""
async def spawn_actor():
'''
Spawn an actor that blocks indefinitely then cancel via
either `ActorNursery.cancel()` or an exception raise.
'''
"""Spawn an actor that blocks indefinitely.
"""
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
) as nursery:
portal = await nursery.start_actor(

View File

@ -141,7 +141,7 @@ async def open_actor_local_nursery(
)
def test_actor_managed_trio_nursery_task_error_cancels_aio(
asyncio_mode: bool,
reg_addr: tuple,
arb_addr
):
'''
Verify that a ``trio`` nursery created managed in a child actor

View File

@ -5,7 +5,7 @@ Verify the we raise errors when streams are opened prior to
sync-opening a ``tractor.Context`` beforehand.
'''
# from contextlib import asynccontextmanager as acm
from contextlib import asynccontextmanager as acm
from itertools import count
import platform
from typing import Optional
@ -13,11 +13,6 @@ from typing import Optional
import pytest
import trio
import tractor
from tractor import (
Actor,
Context,
current_actor,
)
from tractor._exceptions import (
StreamOverrun,
ContextCancelled,
@ -198,6 +193,9 @@ def test_simple_context(
else:
assert await ctx.result() == 'yo'
if not error_parent:
await ctx.cancel()
if pointlessly_open_stream:
async with ctx.open_stream():
if error_parent:
@ -210,15 +208,10 @@ def test_simple_context(
# 'stop' msg to the far end which needs
# to be ignored
pass
else:
if error_parent:
raise error_parent
# cancel AFTER we open a stream
# to avoid a cancel raised inside
# `.open_stream()`
await ctx.cancel()
finally:
# after cancellation
@ -283,7 +276,7 @@ def test_caller_cancels(
assert (
tuple(err.canceller)
==
current_actor().uid
tractor.current_actor().uid
)
async def main():
@ -437,11 +430,9 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
):
'caller context closes without using stream'
async with tractor.open_nursery() as an:
async with tractor.open_nursery() as n:
root: Actor = current_actor()
portal = await an.start_actor(
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
@ -449,10 +440,10 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
assert sent is None
await portal.run(assert_state, value=True)
assert sent is None
# call cancel explicitly
if use_ctx_cancel_method:
@ -463,21 +454,8 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async for msg in stream:
pass
except tractor.ContextCancelled as ctxc:
# XXX: the cause is US since we call
# `Context.cancel()` just above!
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
else:
assert 0, "Should have context cancelled?"
@ -494,13 +472,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
await ctx.result()
assert 0, "Callee should have blocked!?"
except trio.TooSlowError:
# NO-OP -> since already called above
await ctx.cancel()
# local scope should have absorbed the cancellation
assert ctx.cancelled_caught
assert ctx._remote_error is ctx._local_error
try:
async with ctx.open_stream() as stream:
async for msg in stream:
@ -579,25 +551,19 @@ async def cancel_self(
global _state
_state = True
# since we call this the below `.open_stream()` should always
# error!
await ctx.cancel()
# should inline raise immediately
try:
async with ctx.open_stream():
pass
# except tractor.ContextCancelled:
except RuntimeError:
except tractor.ContextCancelled:
# suppress for now so we can do checkpoint tests below
print('Got expected runtime error for stream-after-cancel')
pass
else:
raise RuntimeError('Context didnt cancel itself?!')
# check that``trio.Cancelled`` is now raised on any further
# checkpoints since the self cancel above will have cancelled
# the `Context._scope.cancel_scope: trio.CancelScope`
# check a real ``trio.Cancelled`` is raised on a checkpoint
try:
with trio.fail_after(0.1):
await trio.sleep_forever()
@ -608,7 +574,6 @@ async def cancel_self(
# should never get here
assert 0
raise RuntimeError('Context didnt cancel itself?!')
@tractor_test
async def test_callee_cancels_before_started():
@ -636,7 +601,7 @@ async def test_callee_cancels_before_started():
ce.type == trio.Cancelled
# the traceback should be informative
assert 'itself' in ce.msgdata['tb_str']
assert 'cancelled itself' in ce.msgdata['tb_str']
# teardown the actor
await portal.cancel_actor()
@ -808,7 +773,7 @@ async def echo_back_sequence(
print(
'EXITING CALLEEE:\n'
f'{ctx.canceller}'
f'{ctx.cancel_called_remote}'
)
return 'yo'
@ -906,7 +871,7 @@ def test_maybe_allow_overruns_stream(
if cancel_ctx:
assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == current_actor().uid
assert tuple(res.canceller) == tractor.current_actor().uid
else:
print(f'RX ROOT SIDE RESULT {res}')

View File

@ -78,7 +78,7 @@ has_nested_actors = pytest.mark.has_nested_actors
def spawn(
start_method,
testdir,
reg_addr,
arb_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':

View File

@ -15,19 +15,19 @@ from conftest import tractor_test
@tractor_test
async def test_reg_then_unreg(reg_addr):
async def test_reg_then_unreg(arb_addr):
actor = tractor.current_actor()
assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
) as n:
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid
async with tractor.get_arbiter(*reg_addr) as aportal:
async with tractor.get_arbiter(*arb_addr) as aportal:
# this local actor should be the arbiter
assert actor is aportal.actor
@ -53,27 +53,15 @@ async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(
other_actor: str,
reg_addr: tuple[str, int],
):
async def say_hello(other_actor):
await trio.sleep(1) # wait for other actor to spawn
async with tractor.find_actor(
other_actor,
registry_addrs=[reg_addr],
) as portal:
async with tractor.find_actor(other_actor) as portal:
assert portal is not None
return await portal.run(__name__, 'hi')
async def say_hello_use_wait(
other_actor: str,
reg_addr: tuple[str, int],
):
async with tractor.wait_for_actor(
other_actor,
registry_addr=reg_addr,
) as portal:
async def say_hello_use_wait(other_actor):
async with tractor.wait_for_actor(other_actor) as portal:
assert portal is not None
result = await portal.run(__name__, 'hi')
return result
@ -81,29 +69,21 @@ async def say_hello_use_wait(
@tractor_test
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
async def test_trynamic_trio(
func,
start_method,
reg_addr,
):
'''
Root actor acting as the "director" and running one-shot-task-actors
for the directed subs.
'''
async def test_trynamic_trio(func, start_method, arb_addr):
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
func,
other_actor='gretchen',
reg_addr=reg_addr,
name='donny',
)
gretchen = await n.run_in_actor(
func,
other_actor='donny',
reg_addr=reg_addr,
name='gretchen',
)
print(await gretchen.result())
@ -151,7 +131,7 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
reg_addr: tuple,
arb_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,
@ -159,9 +139,9 @@ async def spawn_and_check_registry(
) -> None:
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
):
async with tractor.get_arbiter(*reg_addr) as portal:
async with tractor.get_arbiter(*arb_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@ -233,19 +213,17 @@ async def spawn_and_check_registry(
def test_subactors_unregister_on_cancel(
start_method,
use_signal,
reg_addr,
arb_addr,
with_streaming,
):
'''
Verify that cancelling a nursery results in all subactors
"""Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
'''
"""
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
spawn_and_check_registry,
reg_addr,
arb_addr,
use_signal,
remote_arbiter=False,
with_streaming=with_streaming,
@ -259,7 +237,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method,
use_signal,
reg_addr,
arb_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
@ -270,7 +248,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
trio.run(
partial(
spawn_and_check_registry,
reg_addr,
arb_addr,
use_signal,
remote_arbiter=True,
with_streaming=with_streaming,
@ -284,7 +262,7 @@ async def streamer(agen):
async def close_chans_before_nursery(
reg_addr: tuple,
arb_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
) -> None:
@ -297,9 +275,9 @@ async def close_chans_before_nursery(
entries_at_end = 1
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
):
async with tractor.get_arbiter(*reg_addr) as aportal:
async with tractor.get_arbiter(*arb_addr) as aportal:
try:
get_reg = partial(unpack_reg, aportal)
@ -351,7 +329,7 @@ async def close_chans_before_nursery(
def test_close_channel_explicit(
start_method,
use_signal,
reg_addr,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@ -361,7 +339,7 @@ def test_close_channel_explicit(
trio.run(
partial(
close_chans_before_nursery,
reg_addr,
arb_addr,
use_signal,
remote_arbiter=False,
),
@ -373,7 +351,7 @@ def test_close_channel_explicit_remote_arbiter(
daemon,
start_method,
use_signal,
reg_addr,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@ -383,7 +361,7 @@ def test_close_channel_explicit_remote_arbiter(
trio.run(
partial(
close_chans_before_nursery,
reg_addr,
arb_addr,
use_signal,
remote_arbiter=True,
),

View File

@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(sleep_forever)
def test_trio_cancels_aio_on_actor_side(reg_addr):
def test_trio_cancels_aio_on_actor_side(arb_addr):
'''
Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis.
@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr]
arbiter_addr=arb_addr
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
@ -94,7 +94,7 @@ async def asyncio_actor(
raise
def test_aio_simple_error(reg_addr):
def test_aio_simple_error(arb_addr):
'''
Verify a simple remote asyncio error propagates back through trio
to the parent actor.
@ -103,7 +103,7 @@ def test_aio_simple_error(reg_addr):
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr]
arbiter_addr=arb_addr
) as n:
await n.run_in_actor(
asyncio_actor,
@ -120,7 +120,7 @@ def test_aio_simple_error(reg_addr):
assert err.type == AssertionError
def test_tractor_cancels_aio(reg_addr):
def test_tractor_cancels_aio(arb_addr):
'''
Verify we can cancel a spawned asyncio task gracefully.
@ -139,7 +139,7 @@ def test_tractor_cancels_aio(reg_addr):
trio.run(main)
def test_trio_cancels_aio(reg_addr):
def test_trio_cancels_aio(arb_addr):
'''
Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api.
@ -194,7 +194,7 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format
)
def test_context_spawns_aio_task_that_errors(
reg_addr,
arb_addr,
parent_cancels: bool,
):
'''
@ -225,7 +225,7 @@ def test_context_spawns_aio_task_that_errors(
await trio.sleep_forever()
return await ctx.result()
return await ctx.result()
if parent_cancels:
# bc the parent made the cancel request,
@ -258,7 +258,7 @@ async def aio_cancel():
await sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
async def main():
async with tractor.open_nursery() as n:
@ -395,7 +395,7 @@ async def stream_from_aio(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
def test_basic_interloop_channel_stream(reg_addr, fan_out):
def test_basic_interloop_channel_stream(arb_addr, fan_out):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -409,7 +409,7 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
# TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(reg_addr):
def test_trio_error_cancels_intertask_chan(arb_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -428,7 +428,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr):
def test_trio_closes_early_and_channel_exits(arb_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -443,7 +443,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
trio.run(main)
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -520,7 +520,7 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
reg_addr,
arb_addr,
raise_error_mid_stream,
):

View File

@ -15,26 +15,6 @@ from tractor import ( # typing
ContextCancelled,
)
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg raised in all open ctxs
# with that peer.
# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which
# (also) spawned a failing task which was unhandled and
# propagated up to the immediate parent - the peer to the actor
# that also spawned a remote task task in that same peer-parent.
# def test_self_cancel():
# '''
@ -49,30 +29,14 @@ from tractor import ( # typing
@tractor.context
async def sleep_forever(
ctx: Context,
expect_ctxc: bool = False,
) -> None:
'''
Sync the context, open a stream then just sleep.
Allow checking for (context) cancellation locally.
'''
try:
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
except BaseException as berr:
# TODO: it'd sure be nice to be able to inject our own
# `ContextCancelled` here instead of of `trio.Cancelled`
# so that our runtime can expect it and this "user code"
# would be able to tell the diff between a generic trio
# cancel and a tractor runtime-IPC cancel.
if expect_ctxc:
assert isinstance(berr, trio.Cancelled)
raise
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
@tractor.context
@ -181,7 +145,6 @@ async def stream_ints(
async with ctx.open_stream() as stream:
for i in itertools.count():
await stream.send(i)
await trio.sleep(0.01)
@tractor.context
@ -198,81 +161,73 @@ async def stream_from_peer(
peer_ctx.open_stream() as stream,
):
await ctx.started()
# XXX QUESTIONS & TODO: for further details around this
# in the longer run..
# https://github.com/goodboy/tractor/issues/368
# XXX TODO: big set of questions for this
# - should we raise `ContextCancelled` or `Cancelled` (rn
# it does latter) and should/could it be implemented
# as a general injection override for `trio` such
# that ANY next checkpoint would raise the "cancel
# error type" of choice?
# - should the `ContextCancelled` bubble from
# all `Context` and `MsgStream` apis wherein it
# prolly makes the most sense to make it
# a `trio.Cancelled` subtype?
# - what about IPC-transport specific errors, should
# they bubble from the async for and trigger
# other special cases?
# NOTE: current ctl flow:
# - stream raises `trio.EndOfChannel` and
# exits the loop
# - `.open_context()` will raise the ctxcanc
# received from the sleeper.
async for msg in stream:
assert msg is not None
print(msg)
# it does that) here?!
# - test the `ContextCancelled` OUTSIDE the
# `.open_context()` call?
try:
async for msg in stream:
print(msg)
except trio.Cancelled:
assert not ctx.cancel_called
assert not ctx.cancelled_caught
assert not peer_ctx.cancel_called
assert not peer_ctx.cancelled_caught
assert 'root' in ctx.cancel_called_remote
raise # XXX MUST NEVER MASK IT!!
with trio.CancelScope(shield=True):
await tractor.pause()
# pass
# pytest.fail(
raise RuntimeError(
'peer never triggered local `[Context]Cancelled`?!?'
)
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxerr:
err = ctxerr
assert peer_ctx._remote_error is ctxerr
assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
# XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task
# was started in, exits.
assert not ctx.cancelled_caught
# we never requested cancellation
assert not peer_ctx.cancel_called
# the `.open_context()` exit definitely caught
# a cancellation in the internal `Context._scope` since
# likely the runtime called `_deliver_msg()` after
# receiving the remote error from the streaming task.
assert peer_ctx.cancelled_caught
# TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside
# `.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last
# checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368
# root/parent actor task should NEVER HAVE cancelled us!
assert not ctx.canceller
assert 'canceller' in peer_ctx.canceller
assert ctxerr.canceller == 'canceller'
assert ctxerr._remote_error is ctxerr
# CASE 1: we were cancelled by our parent, the root actor.
# TODO: there are other cases depending on how the root
# actor and it's caller side task are written:
# - if the root does not req us to cancel then an
# IPC-transport related error should bubble from the async
# for loop and thus cause local cancellation both here
# and in the root (since in that case this task cancels the
# context with the root, not the other way around)
assert ctx.cancel_called_remote[0] == 'root'
raise
# TODO: IN THEORY we could have other cases depending on
# who cancels first, the root actor or the canceller peer?.
#
# 1- when the peer request is first then the `.canceller`
# field should obvi be set to the 'canceller' uid,
#
# 2-if the root DOES req cancel then we should see the same
# `trio.Cancelled` implicitly raised
# assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper'
raise RuntimeError(
'peer never triggered local `ContextCancelled`?'
)
# except BaseException as err:
# raise
# cases:
# - some arbitrary remote peer cancels via Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg.
# - peer spawned a sub-actor which (also) spawned a failing task
# which was unhandled and propagated up to the immediate
# parent, the peer to the actor that also spawned a remote task
# task in that same peer-parent.
# - peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
@pytest.mark.parametrize(
'error_during_ctxerr_handling',
@ -296,8 +251,8 @@ def test_peer_canceller(
line and be less indented.
.actor0> ()-> .actor1>
a inter-actor task context opened (by `async with
`Portal.open_context()`) from actor0 *into* actor1.
a inter-actor task context opened (by `async with `Portal.open_context()`)
from actor0 *into* actor1.
.actor0> ()<=> .actor1>
a inter-actor task context opened (as above)
@ -332,12 +287,11 @@ def test_peer_canceller(
5. .canceller> ()-> .sleeper>
- calls `Portal.cancel_actor()`
'''
async def main():
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
# debug_mode=True
) as an:
async with tractor.open_nursery() as an:
canceller: Portal = await an.start_actor(
'canceller',
enable_modules=[__name__],
@ -351,13 +305,10 @@ def test_peer_canceller(
enable_modules=[__name__],
)
root = tractor.current_actor()
try:
async with (
sleeper.open_context(
sleep_forever,
expect_ctxc=True,
) as (sleeper_ctx, sent),
just_caller.open_context(
@ -384,15 +335,16 @@ def test_peer_canceller(
'Context.result() did not raise ctx-cancelled?'
)
# should always raise since this root task does
# not request the sleeper cancellation ;)
# TODO: not sure why this isn't catching
# but maybe we need an `ExceptionGroup` and
# the whole except *errs: thinger in 3.11?
except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
# canceller and caller peers should not
# have been remotely cancelled.
assert canceller_ctx.canceller is None
assert caller_ctx.canceller is None
assert canceller_ctx.cancel_called_remote is None
assert caller_ctx.cancel_called_remote is None
assert ctxerr.canceller[0] == 'canceller'
@ -403,14 +355,16 @@ def test_peer_canceller(
# block it should be.
assert not sleeper_ctx.cancelled_caught
# TODO: a test which ensures this error is
# bubbled and caught (NOT MASKED) by the
# runtime!!!
if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown')
raise
# XXX SHOULD NEVER EVER GET HERE XXX
except BaseException as berr:
err = berr
# SHOULD NEVER GET HERE!
except BaseException:
pytest.fail('did not rx ctx-cancelled error?')
else:
pytest.fail('did not rx ctx-cancelled error?')
@ -421,20 +375,6 @@ def test_peer_canceller(
)as ctxerr:
_err = ctxerr
# NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs)
# - `.cancel_called` (bool of whether this side
# requested)
# - `.canceller` (uid of cancel-causing actor-task)
# - `._remote_error` (any `RemoteActorError`
# instance from other side of context)
# TODO: are we really planning to use this tho?
# - `._cancel_msg` (any msg that caused the
# cancel)
# CASE: error raised during handling of
# `ContextCancelled` inside `.open_context()`
# block
if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError)
@ -444,42 +384,20 @@ def test_peer_canceller(
for ctx in ctxs:
assert ctx.cancel_called
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# this root actor task should have
# cancelled all opened contexts except the
# sleeper which is obvi by the "canceller"
# peer.
re = ctx._remote_error
if (
ctx is sleeper_ctx
or ctx is caller_ctx
):
assert (
re.canceller
==
ctx.canceller
==
canceller.channel.uid
)
# cancelled all opened contexts except
# the sleeper which is cancelled by its
# peer "canceller"
if ctx is not sleeper_ctx:
assert ctx._remote_error.canceller[0] == 'root'
else:
assert (
re.canceller
==
ctx.canceller
==
root.uid
)
# CASE: standard teardown inside in `.open_context()` block
else:
assert ctxerr.canceller == sleeper_ctx.canceller
assert (
ctxerr.canceller[0]
==
sleeper_ctx.canceller[0]
==
'canceller'
)
assert ctxerr.canceller[0] == 'canceller'
# the sleeper's remote error is the error bubbled
# out of the context-stack above!
@ -487,43 +405,18 @@ def test_peer_canceller(
assert re is ctxerr
for ctx in ctxs:
re: BaseException | None = ctx._remote_error
assert re
# root doesn't cancel sleeper since it's
# cancelled by its peer.
if ctx is sleeper_ctx:
assert not ctx.cancel_called
# since sleeper_ctx.result() IS called
# above we should have (silently)
# absorbed the corresponding
# `ContextCancelled` for it and thus
# the logic inside `.cancelled_caught`
# should trigger!
assert ctx.cancelled_caught
elif ctx is caller_ctx:
# since its context was remotely
# cancelled, we never needed to
# call `Context.cancel()` bc it was
# done by the peer and also we never
assert ctx.cancel_called
# TODO: figure out the details of
# this..
# if you look the `._local_error` here
# is a multi of ctxc + 2 Cancelleds?
# assert not ctx.cancelled_caught
else:
assert ctx.cancel_called
assert not ctx.cancelled_caught
# TODO: do we even need this flag?
# -> each context should have received
# each context should have received
# a silently absorbed context cancellation
# in its remote nursery scope.
# assert ctx.chan.uid == ctx.canceller
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# NOTE: when an inter-peer cancellation
# occurred, we DO NOT expect this
@ -541,7 +434,9 @@ def test_peer_canceller(
# including the case where ctx-cancel handling
# itself errors.
assert sleeper_ctx.cancelled_caught
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
# await tractor.pause()
raise # always to ensure teardown
if error_during_ctxerr_handling:

View File

@ -55,7 +55,7 @@ async def context_stream(
async def stream_from_single_subactor(
reg_addr,
arb_addr,
start_method,
stream_func,
):
@ -64,7 +64,7 @@ async def stream_from_single_subactor(
# only one per host address, spawns an actor if None
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
start_method=start_method,
) as nursery:
@ -115,13 +115,13 @@ async def stream_from_single_subactor(
@pytest.mark.parametrize(
'stream_func', [async_gen_stream, context_stream]
)
def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator.
"""
trio.run(
partial(
stream_from_single_subactor,
reg_addr,
arb_addr,
start_method,
stream_func=stream_func,
),
@ -225,14 +225,14 @@ async def a_quadruple_example():
return result_stream
async def cancel_after(wait, reg_addr):
async with tractor.open_root_actor(registry_addrs=[reg_addr]):
async def cancel_after(wait, arb_addr):
async with tractor.open_root_actor(arbiter_addr=arb_addr):
with trio.move_on_after(wait):
return await a_quadruple_example()
@pytest.fixture(scope='module')
def time_quad_ex(reg_addr, ci_env, spawn_backend):
def time_quad_ex(arb_addr, ci_env, spawn_backend):
if spawn_backend == 'mp':
"""no idea but the mp *nix runs are flaking out here often...
"""
@ -240,7 +240,7 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
start = time.time()
results = trio.run(cancel_after, timeout, reg_addr)
results = trio.run(cancel_after, timeout, arb_addr)
diff = time.time() - start
assert results
return results, diff
@ -260,14 +260,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
list(map(lambda i: i/10, range(3, 9)))
)
def test_not_fast_enough_quad(
reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
):
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0)
results = trio.run(cancel_after, delay, reg_addr)
results = trio.run(cancel_after, delay, arb_addr)
system = platform.system()
if system in ('Windows', 'Darwin') and results is not None:
# In CI envoirments it seems later runs are quicker then the first
@ -280,7 +280,7 @@ def test_not_fast_enough_quad(
@tractor_test
async def test_respawn_consumer_task(
reg_addr,
arb_addr,
spawn_backend,
loglevel,
):

View File

@ -24,7 +24,7 @@ async def test_no_runtime():
@tractor_test
async def test_self_is_registered(reg_addr):
async def test_self_is_registered(arb_addr):
"Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
@ -34,20 +34,20 @@ async def test_self_is_registered(reg_addr):
@tractor_test
async def test_self_is_registered_localportal(reg_addr):
async def test_self_is_registered_localportal(arb_addr):
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
async with tractor.get_arbiter(*reg_addr) as portal:
async with tractor.get_arbiter(*arb_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns(
'self', 'wait_for_actor', name='root')
assert sockaddr[0] == reg_addr
assert sockaddr[0] == arb_addr
def test_local_actor_async_func(reg_addr):
def test_local_actor_async_func(arb_addr):
"""Verify a simple async function in-process.
"""
nums = []
@ -55,7 +55,7 @@ def test_local_actor_async_func(reg_addr):
async def print_loop():
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter

View File

@ -28,9 +28,9 @@ def test_abort_on_sigint(daemon):
@tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr):
async def test_cancel_remote_arbiter(daemon, arb_addr):
assert not tractor.current_actor().is_arbiter
async with tractor.get_arbiter(*reg_addr) as portal:
async with tractor.get_arbiter(*arb_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
@ -39,16 +39,16 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
# no arbiter socket should exist
with pytest.raises(OSError):
async with tractor.get_arbiter(*reg_addr) as portal:
async with tractor.get_arbiter(*arb_addr) as portal:
pass
def test_register_duplicate_name(daemon, reg_addr):
def test_register_duplicate_name(daemon, arb_addr):
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
) as n:
assert not tractor.current_actor().is_arbiter

View File

@ -160,7 +160,7 @@ async def test_required_args(callwith_expecterror):
)
def test_multi_actor_subs_arbiter_pub(
loglevel,
reg_addr,
arb_addr,
pub_actor,
):
"""Try out the neato @pub decorator system.
@ -170,7 +170,7 @@ def test_multi_actor_subs_arbiter_pub(
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
enable_modules=[__name__],
) as n:
@ -255,12 +255,12 @@ def test_multi_actor_subs_arbiter_pub(
def test_single_subactor_pub_multitask_subs(
loglevel,
reg_addr,
arb_addr,
):
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
arbiter_addr=arb_addr,
enable_modules=[__name__],
) as n:

View File

@ -34,6 +34,7 @@ def test_resource_only_entered_once(key_on):
global _resource
_resource = 0
kwargs = {}
key = None
if key_on == 'key_value':
key = 'some_common_key'
@ -138,7 +139,7 @@ def test_open_local_sub_to_stream():
N local tasks using ``trionics.maybe_open_context():``.
'''
timeout: float = 3.6 if platform.system() != "Windows" else 10
timeout = 3 if platform.system() != "Windows" else 10
async def main():

View File

@ -45,7 +45,7 @@ async def short_sleep():
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
'fail_on_syntax'],
)
def test_rpc_errors(reg_addr, to_call, testdir):
def test_rpc_errors(arb_addr, to_call, testdir):
"""Test errors when making various RPC requests to an actor
that either doesn't have the requested module exposed or doesn't define
the named function.
@ -77,7 +77,7 @@ def test_rpc_errors(reg_addr, to_call, testdir):
# spawn a subactor which calls us back
async with tractor.open_nursery(
arbiter_addr=reg_addr,
arbiter_addr=arb_addr,
enable_modules=exposed_mods.copy(),
) as n:

View File

@ -16,14 +16,14 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(
is_arbiter: bool,
data: dict,
reg_addr: tuple[str, int],
arb_addr: tuple[str, int],
):
namespaces = [__name__]
await trio.sleep(0.1)
async with tractor.open_root_actor(
arbiter_addr=reg_addr,
arbiter_addr=arb_addr,
):
actor = tractor.current_actor()
@ -41,7 +41,7 @@ async def spawn(
is_arbiter=False,
name='sub-actor',
data=data,
reg_addr=reg_addr,
arb_addr=arb_addr,
enable_modules=namespaces,
)
@ -55,12 +55,12 @@ async def spawn(
return 10
def test_local_arbiter_subactor_global_state(reg_addr):
def test_local_arbiter_subactor_global_state(arb_addr):
result = trio.run(
spawn,
True,
data_to_pass_down,
reg_addr,
arb_addr,
)
assert result == 10
@ -140,7 +140,7 @@ async def check_loglevel(level):
def test_loglevel_propagated_to_subactor(
start_method,
capfd,
reg_addr,
arb_addr,
):
if start_method == 'mp_forkserver':
pytest.skip(
@ -152,7 +152,7 @@ def test_loglevel_propagated_to_subactor(
async with tractor.open_nursery(
name='arbiter',
start_method=start_method,
arbiter_addr=reg_addr,
arbiter_addr=arb_addr,
) as tn:
await tn.run_in_actor(

View File

@ -66,13 +66,13 @@ async def ensure_sequence(
async def open_sequence_streamer(
sequence: list[int],
reg_addr: tuple[str, int],
arb_addr: tuple[str, int],
start_method: str,
) -> tractor.MsgStream:
async with tractor.open_nursery(
arbiter_addr=reg_addr,
arbiter_addr=arb_addr,
start_method=start_method,
) as tn:
@ -93,7 +93,7 @@ async def open_sequence_streamer(
def test_stream_fan_out_to_local_subscriptions(
reg_addr,
arb_addr,
start_method,
):
@ -103,7 +103,7 @@ def test_stream_fan_out_to_local_subscriptions(
async with open_sequence_streamer(
sequence,
reg_addr,
arb_addr,
start_method,
) as stream:
@ -138,7 +138,7 @@ def test_stream_fan_out_to_local_subscriptions(
]
)
def test_consumer_and_parent_maybe_lag(
reg_addr,
arb_addr,
start_method,
task_delays,
):
@ -150,7 +150,7 @@ def test_consumer_and_parent_maybe_lag(
async with open_sequence_streamer(
sequence,
reg_addr,
arb_addr,
start_method,
) as stream:
@ -211,7 +211,7 @@ def test_consumer_and_parent_maybe_lag(
def test_faster_task_to_recv_is_cancelled_by_slower(
reg_addr,
arb_addr,
start_method,
):
'''
@ -225,7 +225,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
async with open_sequence_streamer(
sequence,
reg_addr,
arb_addr,
start_method,
) as stream:
@ -302,7 +302,7 @@ def test_subscribe_errors_after_close():
def test_ensure_slow_consumers_lag_out(
reg_addr,
arb_addr,
start_method,
):
'''This is a pure local task test; no tractor

View File

@ -18,6 +18,8 @@
This is the "bootloader" for actors started using the native trio backend.
"""
import sys
import trio
import argparse
from ast import literal_eval
@ -35,6 +37,8 @@ def parse_ipaddr(arg):
return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__":
parser = argparse.ArgumentParser()

View File

@ -44,11 +44,9 @@ import warnings
import trio
from ._exceptions import (
# _raise_from_no_key_in_msg,
unpack_error,
pack_error,
ContextCancelled,
# MessagingError,
StreamOverrun,
)
from .log import get_logger
@ -58,36 +56,28 @@ from ._state import current_actor
if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
log = get_logger(__name__)
# TODO: make this a msgspec.Struct!
@dataclass
class Context:
'''
An inter-actor, SC transitive, `trio.Task` communication context.
An inter-actor, ``trio``-task communication context.
NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways:
- by entering ``Portal.open_context()`` which is the primary
public API for any "caller" task or,
- by the RPC machinery's `._runtime._invoke()` as a `ctx` arg
to a remotely scheduled "callee" function.
NB: This class should never be instatiated directly, it is delivered
by either,
- runtime machinery to a remotely started task or,
- by entering ``Portal.open_context()``.
AND is always constructed using the below ``mk_context()``.
and is always constructed using ``mkt_context()``.
Allows maintaining task or protocol specific state between
2 cancel-scope-linked, communicating and parallel executing
`trio.Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is
always allocated inside ``._runtime._invoke()``.
# TODO: more detailed writeup on cancellation, error and
# streaming semantics..
2 communicating, parallel executing actor tasks. A unique context is
allocated on each side of any task RPC-linked msg dialog, for
every request to a remote actor from a portal. On the "callee"
side a context is always allocated inside ``._runtime._invoke()``.
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, it can also open task
@ -118,31 +108,12 @@ class Context:
# which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope | None = None
# on a clean exit there should be a final value
# delivered from the far end "callee" task, so
# this value is only set on one side.
_result: Any | int = None
# if the local "caller" task errors this
# value is always set to the error that was
# captured in the `Portal.open_context().__aexit__()`
# teardown.
_local_error: BaseException | None = None
# if the either side gets an error from the other
# this value is set to that error unpacked from an
# IPC msg.
_remote_error: BaseException | None = None
# only set if the local task called `.cancel()`
# cancellation state
_cancel_called: bool = False # did WE cancel the far end?
# TODO: do we even need this? we can assume that if we're
# cancelled that the other side is as well, so maybe we should
# instead just have a `.canceller` pulled from the
# `ContextCancelled`?
_canceller: tuple[str, str] | None = None
_cancelled_remote: tuple[str, str] | None = None
# NOTE: we try to ensure assignment of a "cancel msg" since
# there's always going to be an "underlying reason" that any
@ -174,47 +145,23 @@ class Context:
return self._cancel_called
@property
def canceller(self) -> tuple[str, str] | None:
def cancel_called_remote(self) -> tuple[str, str] | None:
'''
``Actor.uid: tuple[str, str]`` of the (remote)
actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled.
``Actor.uid`` of the remote actor who's task was cancelled
causing this side of the context to also be cancelled.
'''
return self._canceller
remote_uid = self._cancelled_remote
if remote_uid:
return tuple(remote_uid)
@property
def cancelled_caught(self) -> bool:
return (
# the local scope was cancelled either by
# remote error or self-request
self._scope.cancelled_caught
# the local scope was never cancelled
# and instead likely we received a remote side
# cancellation that was raised inside `.result()`
or (
(se := self._local_error)
and
isinstance(se, ContextCancelled)
and (
se.canceller == self.canceller
or
se is self._remote_error
)
)
)
@property
def side(self) -> str:
'''
Return string indicating which task this instance is wrapping.
'''
return 'caller' if self._portal else 'callee'
return self._scope.cancelled_caught
# init and streaming state
_started_called: bool = False
_started_received: bool = False
_stream_opened: bool = False
# overrun handling machinery
@ -249,7 +196,7 @@ class Context:
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
def _maybe_cancel_and_set_remote_error(
async def _maybe_cancel_and_set_remote_error(
self,
error: BaseException,
@ -322,19 +269,16 @@ class Context:
# that error as the reason.
self._remote_error: BaseException = error
# always record the remote actor's uid since its cancellation
# state is directly linked to ours (the local one).
self._cancelled_remote = self.chan.uid
if (
isinstance(error, ContextCancelled)
):
# always record the cancelling actor's uid since its cancellation
# state is linked and we want to know which process was
# the cause / requester of the cancellation.
self._canceller = error.canceller
log.cancel(
'Remote task-context was cancelled for '
f'actor: {self.chan.uid}\n'
f'task: {self.cid}\n'
f'canceller: {error.canceller}\n'
'Remote task-context sucessfully cancelled for '
f'{self.chan.uid}:{self.cid}'
)
if self._cancel_called:
@ -345,37 +289,22 @@ class Context:
# and we **don't need to raise it** in local cancel
# scope since it will potentially override a real error.
return
else:
log.error(
f'Remote context error,\n'
f'remote actor: {self.chan.uid}\n'
f'task: {self.cid}\n'
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{error}'
)
self._canceller = self.chan.uid
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
# YES! this is way better and simpler!
cs: trio.CancelScope = self._scope
if (
cs
and not cs.cancel_called
and not cs.cancelled_caught
):
# TODO: we can for sure drop this right?
if self._scope:
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
# self._cancelled_remote = self.chan.uid
self._scope.cancel()
# NOTE: this REPL usage actually works here dawg! Bo
# this REPL usage actually works here BD
# from .devx._debug import pause
# await pause()
@ -391,19 +320,13 @@ class Context:
Timeout quickly in an attempt to sidestep 2-generals...
'''
side: str = self.side
side: str = 'caller' if self._portal else 'callee'
log.cancel(
f'Cancelling {side} side of context to {self.chan.uid}'
)
self._cancel_called: bool = True
# caller side who entered `Portal.open_context()`
# NOTE: on the call side we never manually call
# `._scope.cancel()` since we expect the eventual
# `ContextCancelled` from the other side to trigger this
# when the runtime finally receives it during teardown
# (normally in `.result()` called from
# `Portal.open_context().__aexit__()`)
if side == 'caller':
if not self._portal:
raise RuntimeError(
@ -426,6 +349,7 @@ class Context:
'_cancel_task',
cid=cid,
)
# print("EXITING CANCEL CALL")
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
@ -444,9 +368,6 @@ class Context:
)
# callee side remote task
# NOTE: on this side we ALWAYS cancel the local scope since
# the caller expects a `ContextCancelled` to be sent from
# `._runtime._invoke()` back to the other side.
else:
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
@ -482,7 +403,7 @@ class Context:
``trio``'s cancellation system.
'''
actor: Actor = current_actor()
actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
@ -492,34 +413,12 @@ class Context:
# killed
if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
if self._remote_error:
raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different
# runtime error indicating that this task's usage of
# `Context.cancel()` and then `.open_stream()` is WRONG!
task: str = trio.lowlevel.current_task().name
raise RuntimeError(
'Stream opened after `Context.cancel()` called..?\n'
f'task: {actor.uid[0]}:{task}\n'
f'{self}'
task = trio.lowlevel.current_task().name
raise ContextCancelled(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
)
if (
not self._portal
and not self._started_called
):
if not self._portal and not self._started_called:
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
@ -535,7 +434,7 @@ class Context:
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns: bool = allow_overruns
ctx._allow_overruns = allow_overruns
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
@ -545,32 +444,27 @@ class Context:
if ctx._recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?'
)
'The underlying channel for this stream was already closed!?')
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
# NOTE: we track all existing streams per portal for
# the purposes of attempting graceful closes on runtime
# cancel requests.
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened: bool = True
self._stream_opened = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# NOTE: Make the stream "one-shot use". On exit,
# signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end.
# NOTE: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await stream.aclose()
finally:
@ -601,22 +495,14 @@ class Context:
# whenever ``CancelScope.cancel()`` was called) and
# instead silently reap the expected cancellation
# "error"-msg.
our_uid: tuple[str, str] = current_actor().uid
if (
isinstance(err, ContextCancelled)
and (
self._cancel_called
or self.chan._cancel_called
or self.canceller == our_uid
or tuple(err.canceller) == our_uid
or tuple(err.canceller) == current_actor().uid
)
):
# NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing"
# the error silently B)
if self._local_error is None:
self._local_error = err
return err
# NOTE: currently we are masking underlying runtime errors
@ -629,7 +515,7 @@ class Context:
# runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
# __tracebackhide__: bool = True
__tracebackhide__: bool = True
raise err from None
async def result(self) -> Any | Exception:
@ -658,6 +544,7 @@ class Context:
of the remote cancellation.
'''
__tracebackhide__: bool = True
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
@ -720,15 +607,13 @@ class Context:
"Received internal error at portal?"
)
if err:= unpack_error(
err = unpack_error(
msg,
self._portal.channel
): # from msgerr
self._maybe_cancel_and_set_remote_error(err)
self._maybe_raise_remote_err(err)
) # from msgerr
else:
raise
err = self._maybe_raise_remote_err(err)
self._remote_error = err
if re := self._remote_error:
return self._maybe_raise_remote_err(re)
@ -839,17 +724,13 @@ class Context:
f"Delivering {msg} from {uid} to caller {cid}"
)
if (
msg.get('error') # check for field
and (
error := unpack_error(
msg,
self.chan,
)
)
error = msg.get('error')
if error := unpack_error(
msg,
self.chan,
):
self._cancel_msg = msg
self._maybe_cancel_and_set_remote_error(error)
await self._maybe_cancel_and_set_remote_error(error)
if (
self._in_overrun
@ -884,7 +765,7 @@ class Context:
# XXX: always push an error even if the local
# receiver is in overrun state.
# self._maybe_cancel_and_set_remote_error(msg)
# await self._maybe_cancel_and_set_remote_error(msg)
local_uid = current_actor().uid
lines = [

View File

@ -14,18 +14,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
"""
Our classy exception set.
'''
from __future__ import annotations
"""
import builtins
import importlib
from pprint import pformat
from typing import (
Any,
Type,
TYPE_CHECKING,
)
import traceback
@ -34,11 +31,6 @@ import trio
from ._state import current_actor
if TYPE_CHECKING:
from ._context import Context
from ._stream import MsgStream
from .log import StackLevelAdapter
_this_mod = importlib.import_module(__name__)
@ -46,17 +38,12 @@ class ActorFailure(Exception):
"General actor failure"
# TODO: rename to just `RemoteError`?
class RemoteActorError(Exception):
'''
A box(ing) type which bundles a remote actor `BaseException` for
(near identical, and only if possible,) local object/instance
re-construction in the local process memory domain.
Normally each instance is expected to be constructed from
a special "error" IPC msg sent by some remote actor-runtime.
Remote actor exception bundled locally
'''
# TODO: local recontruction of remote exception deats
def __init__(
self,
message: str,
@ -66,36 +53,13 @@ class RemoteActorError(Exception):
) -> None:
super().__init__(message)
# TODO: maybe a better name?
# - .errtype
# - .retype
# - .boxed_errtype
# - .boxed_type
# - .remote_type
# also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5
self.type: str = suberror_type
self.msgdata: dict[str, Any] = msgdata
self.type = suberror_type
self.msgdata = msgdata
@property
def src_actor_uid(self) -> tuple[str, str] | None:
return self.msgdata.get('src_actor_uid')
def __repr__(self) -> str:
if remote_tb := self.msgdata.get('tb_str'):
pformat(remote_tb)
return (
f'{type(self).__name__}(\n'
f'msgdata={pformat(self.msgdata)}\n'
')'
)
return super().__repr__()
# TODO: local recontruction of remote exception deats
# def unbox(self) -> BaseException:
# ...
class InternalActorError(RemoteActorError):
'''
@ -134,19 +98,8 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class StreamOverrun(trio.TooSlowError):
"This stream was overrun by sender"
class AsyncioCancelled(Exception):
@ -157,9 +110,6 @@ class AsyncioCancelled(Exception):
'''
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
def pack_error(
exc: BaseException,
@ -186,15 +136,7 @@ def pack_error(
'src_actor_uid': current_actor().uid,
}
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if (
isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
):
if isinstance(exc, ContextCancelled):
error_msg.update(exc.msgdata)
return {'error': error_msg}
@ -204,8 +146,7 @@ def unpack_error(
msg: dict[str, Any],
chan=None,
err_type=RemoteActorError,
hide_tb: bool = True,
err_type=RemoteActorError
) -> None | Exception:
'''
@ -216,7 +157,7 @@ def unpack_error(
which is the responsibilitiy of the caller.
'''
__tracebackhide__: bool = hide_tb
__tracebackhide__: bool = True
error_dict: dict[str, dict] | None
if (
@ -273,93 +214,3 @@ def is_multi_cancelled(exc: BaseException) -> bool:
) is not None
return False
def _raise_from_no_key_in_msg(
ctx: Context,
msg: dict,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
stream: MsgStream | None = None,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}\n'
) from src_err
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(
msg,
ctx.chan,
) from None
elif (
msg.get('stop')
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
f'Context stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
if (
stream
and stream._closed
):
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
) from src_err

View File

@ -294,11 +294,9 @@ class Channel:
self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False
# flag set by ``Portal.cancel_actor()`` indicating remote
# (possibly peer) cancellation of the far end actor
# runtime.
self._cancel_called: bool = False
# flag set on ``Portal.cancel_actor()`` indicating
# remote (peer) cancellation of the far end actor runtime.
self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
@classmethod
def from_stream(
@ -329,11 +327,8 @@ class Channel:
def __repr__(self) -> str:
if self.msgstream:
return repr(
self.msgstream.stream.socket._sock
).replace( # type: ignore
"socket.socket",
"Channel",
)
self.msgstream.stream.socket._sock).replace( # type: ignore
"socket.socket", "Channel")
return object.__repr__(self)
@property

View File

@ -33,6 +33,7 @@ from typing import (
)
from functools import partial
from dataclasses import dataclass
from pprint import pformat
import warnings
import trio
@ -44,17 +45,12 @@ from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
from ._exceptions import (
_raise_from_no_key_in_msg,
unpack_error,
NoResult,
ContextCancelled,
)
from ._context import (
Context,
)
from ._streaming import (
MsgStream,
)
from ._context import Context
from ._streaming import MsgStream
log = get_logger(__name__)
@ -68,10 +64,15 @@ def _unwrap_msg(
__tracebackhide__ = True
try:
return msg['return']
except KeyError as ke:
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel) from ke
raise unpack_error(msg, channel) from None
# TODO: maybe move this to ._exceptions?
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
class Portal:
@ -218,18 +219,14 @@ class Portal:
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with
# a proper shield
# XXX: sure would be nice to make this work with a proper shield
with trio.move_on_after(
timeout
or self.cancel_timeout
) as cs:
cs.shield = True
await self.run_from_ns(
'self',
'cancel',
)
await self.run_from_ns('self', 'cancel')
return True
if cs.cancelled_caught:
@ -464,18 +461,25 @@ class Portal:
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first: Any = msg['started']
first = msg['started']
ctx._started_called: bool = True
except KeyError as src_error:
except KeyError:
if not (cid := msg.get('cid')):
raise MessagingError(
'Received internal error at context?\n'
'No call-id (cid) in startup msg?'
)
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
src_err=src_error,
log=log,
expect_key='started',
)
if msg.get('error'):
# NOTE: mask the key error with the remote one
raise unpack_error(msg, self.channel) from None
else:
raise MessagingError(
f'Context for {cid} was expecting a `started` message'
' but received a non-error msg:\n'
f'{pformat(msg)}'
)
ctx._portal: Portal = self
uid: tuple = self.channel.uid
@ -512,102 +516,57 @@ class Portal:
# started in the ctx nursery.
ctx._scope.cancel()
# XXX NOTE XXX: maybe shield against
# self-context-cancellation (which raises a local
# `ContextCancelled`) when requested (via
# `Context.cancel()`) by the same task (tree) which entered
# THIS `.open_context()`.
#
# NOTE: There are 2 operating cases for a "graceful cancel"
# of a `Context`. In both cases any `ContextCancelled`
# raised in this scope-block came from a transport msg
# relayed from some remote-actor-task which our runtime set
# as a `Context._remote_error`
#
# the CASES:
#
# - if that context IS THE SAME ONE that called
# `Context.cancel()`, we want to absorb the error
# silently and let this `.open_context()` block to exit
# without raising.
#
# - if it is from some OTHER context (we did NOT call
# `.cancel()`), we want to re-RAISE IT whilst also
# setting our own ctx's "reason for cancel" to be that
# other context's cancellation condition; we set our
# `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`.
#
# Again, there are 2 cases:
#
# 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# XXX: (maybe) shield/mask context-cancellations that were
# initiated by any of the context's 2 tasks. There are
# subsequently 2 operating cases for a "graceful cancel"
# of a `Context`:
#
# 1.*this* side's task called `Context.cancel()`, in
# which case we mask the `ContextCancelled` from bubbling
# to the opener (much like how `trio.Nursery` swallows
# any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
#
# 2.*the other* side's (callee/spawned) task cancelled due
# to a self or peer cancellation request in which case we
# DO let the error bubble to the opener.
except ContextCancelled as ctxc:
scope_err = ctxc
# CASE 1: this context was never cancelled
# via a local task's call to `Context.cancel()`.
if not ctx._cancel_called:
# XXX: this should NEVER happen!
# from ._debug import breakpoint
# await breakpoint()
raise
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# exit silently.
if (
ctx._cancel_called
and (
ctxc is ctx._remote_error
or
ctxc.canceller is self.canceller
)
):
else:
log.debug(
f'Context {ctx} cancelled gracefully with:\n'
f'{ctxc}'
)
# CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else!
else:
raise
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except (
# CASE 3: standard local error in this caller/yieldee
# - a standard error in the caller/yieldee
Exception,
# CASES 1 & 2: normally manifested as
# a `Context._scope_nursery` raised
# exception-group of,
# 1.-`trio.Cancelled`s, since
# `._scope.cancel()` will have been called and any
# `ContextCancelled` absorbed and thus NOT RAISED in
# any `Context._maybe_raise_remote_err()`,
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error raised in the "callee" side with
# a group only raised if there was any more then one
# task started here in the "caller" in the
# `yield`-ed to task.
BaseExceptionGroup, # since overrun handler tasks may have been spawned
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
# - a runtime teardown exception-group and/or
# cancellation request from a caller task.
BaseExceptionGroup,
trio.Cancelled,
KeyboardInterrupt,
) as err:
scope_err = err
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
# the `ContextCancelled` "self cancellation absorbed" case
# handled in the block above!
# XXX: request cancel of this context on any error.
# NOTE: `Context.cancel()` is conversely NOT called in
# the `ContextCancelled` "cancellation requested" case
# above.
log.cancel(
'Context cancelled for task due to\n'
f'{err}\n'
@ -626,7 +585,7 @@ class Portal:
raise # duh
# no local scope error, the "clean exit with a result" case.
# no scope error case
else:
if ctx.chan.connected():
log.info(
@ -640,27 +599,15 @@ class Portal:
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
# result = await ctx.result()
try:
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but
# this task didn't know until final teardown
# / value collection.
scope_err = berr
raise
# `Context._maybe_cancel_and_set_remote_error()`
# which IS SET any time the far end fails and
# causes "caller side" cancellation via
# a `ContextCancelled` here.
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
finally:
# though it should be impossible for any tasks
@ -710,14 +657,12 @@ class Portal:
with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose()
# XXX: we always raise remote errors locally and
# generally speaking mask runtime-machinery related
# multi-`trio.Cancelled`s. As such, any `scope_error`
# which was the underlying cause of this context's exit
# should be stored as the `Context._local_error` and
# used in determining `Context.cancelled_caught: bool`.
# XXX: since we always (maybe) re-raise (and thus also
# mask runtime machinery related
# multi-`trio.Cancelled`s) any scope error which was
# the underlying cause of this context's exit, add
# different log msgs for each of the (2) cases.
if scope_err is not None:
ctx._local_error: BaseException = scope_err
etype: Type[BaseException] = type(scope_err)
# CASE 2
@ -746,7 +691,7 @@ class Portal:
# child has already cleared it and clobbered IPC.
# FINALLY, remove the context from runtime tracking and
# exit!
# exit Bo
self.actor._contexts.pop(
(self.channel.uid, ctx.cid),
None,

View File

@ -25,6 +25,7 @@ import logging
import signal
import sys
import os
import typing
import warnings

View File

@ -15,10 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
The fundamental core machinery implementing every "actor" including
the process-local (python-interpreter global) `Actor` state-type
primitive(s), RPC-in-task scheduling, and IPC connectivity and
low-level transport msg handling.
Actor primitives and helpers
"""
from __future__ import annotations
@ -44,14 +41,8 @@ import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio
from trio import (
CancelScope,
)
from trio_typing import (
Nursery,
TaskStatus,
)
import trio # type: ignore
from trio_typing import TaskStatus
from ._ipc import Channel
from ._context import (
@ -95,22 +86,21 @@ async def _invoke(
] = trio.TASK_STATUS_IGNORED,
):
'''
Schedule a `trio` task-as-func and deliver result(s) over
connected IPC channel.
Invoke local func and deliver result(s) over provided channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`.
This is the core "RPC task" starting machinery.
'''
__tracebackhide__ = True
treat_as_gen: bool = False
failed_resp: bool = False
# possibly a traceback (not sure what typing is for this..)
tb = None
cancel_scope = CancelScope()
cancel_scope = trio.CancelScope()
# activated cancel scope ref
cs: CancelScope | None = None
cs: trio.CancelScope | None = None
ctx = actor.get_context(
chan,
@ -122,7 +112,6 @@ async def _invoke(
)
context: bool = False
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
@ -164,7 +153,6 @@ async def _invoke(
except TypeError:
raise
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope
@ -195,7 +183,6 @@ async def _invoke(
await chan.send({'stop': True, 'cid': cid})
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push
@ -212,20 +199,6 @@ async def _invoke(
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
# our most general case: a remote SC-transitive,
# IPC-linked, cross-actor-task "context"
# ------ - ------
# TODO: every other "func type" should be implemented from
# a special case of this impl eventually!
# -[ ] streaming funcs should instead of being async-for
# handled directly here wrapped in
# a async-with-open_stream() closure that does the
# normal thing you'd expect a far end streaming context
# to (if written by the app-dev).
# -[ ] one off async funcs can literally just be called
# here and awaited directly, possibly just with a small
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
@ -236,30 +209,21 @@ async def _invoke(
ctx._scope = nurse.cancel_scope
task_status.started(ctx)
res = await coro
await chan.send({
'return': res,
'cid': cid
})
await chan.send({'return': res, 'cid': cid})
# XXX: do we ever trigger this block any more?
except (
BaseExceptionGroup,
trio.Cancelled,
) as scope_error:
):
# if a context error was set then likely
# thei multierror was raised due to that
if ctx._remote_error is not None:
raise ctx._remote_error
# always set this (callee) side's exception as the
# local error on the context
ctx._local_error: BaseException = scope_error
# if a remote error was set then likely the
# exception group was raised due to that, so
# and we instead raise that error immediately!
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
# maybe TODO: pack in
# ``trio.Cancelled.__traceback__`` here so they can
# be unwrapped and displayed on the caller side?
# maybe TODO: pack in ``trio.Cancelled.__traceback__`` here
# so they can be unwrapped and displayed on the caller
# side?
raise
finally:
@ -270,11 +234,11 @@ async def _invoke(
# don't pop the local context until we know the
# associated child isn't in debug any more
await _debug.maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((chan.uid, cid))
log.runtime(
f'Context entrypoint {func} was terminated:\n'
f'{ctx}'
)
ctx = actor._contexts.pop((chan.uid, cid))
if ctx:
log.runtime(
f'Context entrypoint {func} was terminated:\n{ctx}'
)
if ctx.cancelled_caught:
@ -282,46 +246,44 @@ async def _invoke(
# before raising any context cancelled case
# so that real remote errors don't get masked as
# ``ContextCancelled``s.
if re := ctx._remote_error:
re = ctx._remote_error
if re:
ctx._maybe_raise_remote_err(re)
fname: str = func.__name__
cs: CancelScope = ctx._scope
fname = func.__name__
cs: trio.CancelScope = ctx._scope
if cs.cancel_called:
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller
msg: str = (
f'`{fname}()`@{our_uid} cancelled by '
)
canceller = ctx._cancelled_remote
# await _debug.breakpoint()
# NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call
# ``Context.cancel()`` directly, we're going to
# need to change this logic branch since it
# will always enter..
# need to change this logic branch since it will
# always enter..
if ctx._cancel_called:
# TODO: test for this!!!!!
canceller: tuple = our_uid
msg += 'itself '
msg = f'`{fname}()`@{actor.uid} cancelled itself'
else:
msg = (
f'`{fname}()`@{actor.uid} '
'was remotely cancelled by '
)
# if the channel which spawned the ctx is the
# one that cancelled it then we report that, vs.
# it being some other random actor that for ex.
# some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx.
elif canceller == ctx.chan.uid:
msg += f'its caller {canceller} '
if canceller == ctx.chan.uid:
msg += f'its caller {canceller}'
else:
msg += f'remote actor {canceller}'
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += (
' with msg:\n'
f'{ctx._cancel_msg}'
)
msg += f' with msg:\n{ctx._cancel_msg}'
# task-contex was either cancelled by request using
# ``Portal.cancel_actor()`` or ``Context.cancel()``
@ -334,76 +296,37 @@ async def _invoke(
canceller=canceller,
)
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
else:
# regular async function
try:
await chan.send({
'functype': 'asyncfunc',
'cid': cid
})
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
await chan.send({'functype': 'asyncfunc', 'cid': cid})
except trio.BrokenResourceError:
failed_resp = True
if is_rpc:
raise
else:
# TODO: should this be an `.exception()` call?
log.warning(
f'Failed to respond to non-rpc request: {func}\n'
f'{ipc_err}'
f'Failed to respond to non-rpc request: {func}'
)
with cancel_scope as cs:
ctx._scope: CancelScope = cs
ctx._scope = cs
task_status.started(ctx)
result = await coro
fname: str = func.__name__
fname = func.__name__
log.runtime(f'{fname}() result: {result}')
# NOTE: only send result if we know IPC isn't down
if (
not failed_resp
and chan.connected()
):
try:
await chan.send(
{'return': result,
'cid': cid}
)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to return result:\n'
f'{func}@{actor.uid}\n'
f'remote chan: {chan.uid}'
)
if not failed_resp:
# only send result if we know IPC isn't down
await chan.send(
{'return': result,
'cid': cid}
)
except (
Exception,
BaseExceptionGroup,
) as err:
# always hide this frame from debug REPL if the crash
# originated from an rpc task and we DID NOT fail
# due to an IPC transport error!
if (
is_rpc
and chan.connected()
):
__tracebackhide__: bool = True
if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
@ -437,31 +360,24 @@ async def _invoke(
log.exception("Actor crashed:")
# always ship errors back to caller
err_msg: dict[str, dict] = pack_error(
err,
tb=tb,
)
err_msg = pack_error(err, tb=tb)
err_msg['cid'] = cid
if is_rpc:
try:
await chan.send(err_msg)
try:
await chan.send(err_msg)
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?\n"
f'{ipc_err}'
)
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
):
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?"
)
# error is probably from above coro running code *not from the
# underlyingn rpc invocation* since a scope was never allocated
@ -487,11 +403,7 @@ async def _invoke(
log.warning(
f"Task {func} likely errored or cancelled before start")
else:
log.cancel(
'Failed to de-alloc internal task!?\n'
f'cid: {cid}\n'
f'{func.__name__}({kwargs})'
)
log.cancel(f'{func.__name__}({kwargs}) failed?')
finally:
if not actor._rpc_tasks:
@ -508,7 +420,7 @@ async def try_ship_error_to_parent(
err: Union[Exception, BaseExceptionGroup],
) -> None:
with CancelScope(shield=True):
with trio.CancelScope(shield=True):
try:
# internal error so ship to parent without cid
await channel.send(pack_error(err))
@ -555,13 +467,13 @@ class Actor:
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery | None = None
_service_n: Nursery | None = None
_server_n: Nursery | None = None
_root_n: trio.Nursery | None = None
_service_n: trio.Nursery | None = None
_server_n: trio.Nursery | None = None
# Information about `__main__` from parent
_parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope | None = None
_parent_chan_cs: trio.CancelScope | None = None
# syncs for setup/teardown sequences
_server_down: trio.Event | None = None
@ -1093,7 +1005,7 @@ class Actor:
async def _serve_forever(
self,
handler_nursery: Nursery,
handler_nursery: trio.Nursery,
*,
# (host, port) to bind for channel server
accept_host: tuple[str, int] | None = None,
@ -1161,17 +1073,12 @@ class Actor:
- return control the parent channel message loop
'''
log.cancel(
f'{self.uid} requested to cancel by:\n'
f'{requesting_uid}'
)
# TODO: what happens here when we self-cancel tho?
log.cancel(f"{self.uid} is trying to cancel")
self._cancel_called_by_remote: tuple = requesting_uid
self._cancel_called = True
# cancel all ongoing rpc tasks
with CancelScope(shield=True):
with trio.CancelScope(shield=True):
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
@ -1181,9 +1088,7 @@ class Actor:
dbcs.cancel()
# kill all ongoing tasks
await self.cancel_rpc_tasks(
requesting_uid=requesting_uid,
)
await self.cancel_rpc_tasks(requesting_uid=requesting_uid)
# stop channel server
self.cancel_server()
@ -1213,8 +1118,8 @@ class Actor:
self,
cid: str,
chan: Channel,
requesting_uid: tuple[str, str] | None = None,
requesting_uid: tuple[str, str] | None = None,
) -> bool:
'''
Cancel a local task by call-id / channel.
@ -1231,7 +1136,7 @@ class Actor:
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope: CancelScope = ctx._scope
scope = ctx._scope
except KeyError:
log.cancel(f"{cid} has already completed/terminated?")
return True
@ -1241,10 +1146,10 @@ class Actor:
f"peer: {chan.uid}\n")
if (
ctx._canceller is None
ctx._cancelled_remote is None
and requesting_uid
):
ctx._canceller: tuple = requesting_uid
ctx._cancelled_remote: tuple = requesting_uid
# don't allow cancelling this function mid-execution
# (is this necessary?)
@ -1254,7 +1159,6 @@ class Actor:
# TODO: shouldn't we eventually be calling ``Context.cancel()``
# directly here instead (since that method can handle both
# side's calls into it?
# await ctx.cancel()
scope.cancel()
# wait for _invoke to mark the task complete
@ -1282,12 +1186,9 @@ class Actor:
registered for each.
'''
tasks: dict = self._rpc_tasks
tasks = self._rpc_tasks
if tasks:
log.cancel(
f'Cancelling all {len(tasks)} rpc tasks:\n'
f'{tasks}'
)
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (
(chan, cid),
(ctx, func, is_complete),
@ -1305,9 +1206,7 @@ class Actor:
)
log.cancel(
'Waiting for remaining rpc tasks to complete:\n'
f'{tasks}'
)
f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None:
@ -1537,7 +1436,7 @@ async def async_main(
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if actor.name == 'brokerd.ib':
# with CancelScope(shield=True):
# with trio.CancelScope(shield=True):
# await _debug.breakpoint()
actor.lifetime_stack.close()
@ -1573,7 +1472,7 @@ async def async_main(
):
log.runtime(
f"Waiting for remaining peers {actor._peers} to clear")
with CancelScope(shield=True):
with trio.CancelScope(shield=True):
await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
@ -1584,7 +1483,7 @@ async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
@ -1602,7 +1501,7 @@ async def process_messages(
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try:
with CancelScope(shield=shield) as loop_cs:
with trio.CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
@ -1664,18 +1563,18 @@ async def process_messages(
if ns == 'self':
if funcname == 'cancel':
func: Callable = actor.cancel
func = actor.cancel
kwargs['requesting_uid'] = chan.uid
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
pdb_complete = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
with CancelScope(shield=True):
with trio.CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``async_main()``
@ -1703,7 +1602,7 @@ async def process_messages(
# we immediately start the runtime machinery
# shutdown
# with CancelScope(shield=True):
# with trio.CancelScope(shield=True):
kwargs['chan'] = chan
target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid
@ -1728,7 +1627,7 @@ async def process_messages(
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func: Callable = getattr(actor, funcname)
func = getattr(actor, funcname)
else:
# complain to client about restricted modules
@ -1818,10 +1717,9 @@ async def process_messages(
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task:
sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity
sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)

View File

@ -204,21 +204,6 @@ async def do_hard_kill(
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
Used in 2 main cases:
- "unknown remote runtime state": a hanging/stalled actor that
isn't responding after sending a (graceful) runtime cancel
request via an IPC msg.
- "cancelled during spawn": a process who's actor runtime was
cancelled before full startup completed (such that
cancel-request-handling machinery was never fully
initialized) and thus a "cancel request msg" is never going
to be handled.
'''
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
@ -234,9 +219,6 @@ async def do_hard_kill(
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
#
# This code was originally triggred by ``proc.__aexit__()``
# but now must be called manually.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
@ -252,14 +234,10 @@ async def do_hard_kill(
with trio.CancelScope(shield=True):
await proc.wait()
# XXX NOTE XXX: zombie squad dispatch:
# (should ideally never, but) If we do get here it means
# graceful termination of a process failed and we need to
# resort to OS level signalling to interrupt and cancel the
# (presumably stalled or hung) actor. Since we never allow
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and
# into here.
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
proc.kill()
@ -274,13 +252,10 @@ async def soft_wait(
portal: Portal,
) -> None:
'''
Wait for proc termination but **dont' yet** teardown
std-streams (since it will clobber any ongoing pdb REPL
session). This is our "soft" (and thus itself cancellable)
join/reap on an actor-runtime-in-process.
'''
# Wait for proc termination but **dont' yet** call
# ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace).
# This is a "soft" (cancellable) join/reap.
uid = portal.channel.uid
try:
log.cancel(f'Soft waiting on actor:\n{uid}')
@ -303,13 +278,7 @@ async def soft_wait(
await wait_func(proc)
n.cancel_scope.cancel()
# start a task to wait on the termination of the
# process by itself waiting on a (caller provided) wait
# function which should unblock when the target process
# has terminated.
n.start_soon(cancel_on_proc_deth)
# send the actor-runtime a cancel request.
await portal.cancel_actor()
if proc.poll() is None: # type: ignore

View File

@ -34,7 +34,7 @@ import warnings
import trio
from ._exceptions import (
_raise_from_no_key_in_msg,
unpack_error,
)
from .log import get_logger
from .trionics import (
@ -54,6 +54,61 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or stream._eoc:
log.debug(f"{stream} was stopped at remote end")
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error
# case is activated above.
raise src_err
# raise RuntimeError(
# 'Unknown non-yield stream msg?\n'
# f'{msg}'
# )
class MsgStream(trio.abc.Channel):
'''
A bidirectional message stream for receiving logically sequenced
@ -93,13 +148,10 @@ class MsgStream(trio.abc.Channel):
try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_key_in_msg(
ctx=self._ctx,
_raise_from_no_yield_msg(
stream=self,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
async def receive(self):
@ -109,16 +161,6 @@ class MsgStream(trio.abc.Channel):
determined by the underlying protocol).
'''
# NOTE: `trio.ReceiveChannel` implements
# EOC handling as follows (aka uses it
# to gracefully exit async for loops):
#
# async def __anext__(self) -> ReceiveType:
# try:
# return await self.receive()
# except trio.EndOfChannel:
# raise StopAsyncIteration
# see ``.aclose()`` for notes on the old behaviour prior to
# introducing this
if self._eoc:
@ -132,13 +174,10 @@ class MsgStream(trio.abc.Channel):
return msg['yield']
except KeyError as kerr:
_raise_from_no_key_in_msg(
ctx=self._ctx,
_raise_from_no_yield_msg(
stream=self,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
except (

View File

@ -48,15 +48,12 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS: dict[str, int] = {
LEVELS = {
'TRANSPORT': 5,
'RUNTIME': 15,
'CANCEL': 16,
'PDB': 500,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
# }
STD_PALETTE = {
'CRITICAL': 'red',
@ -105,11 +102,7 @@ class StackLevelAdapter(logging.LoggerAdapter):
Cancellation logging, mostly for runtime reporting.
'''
return self.log(
level=16,
msg=msg,
# stacklevel=4,
)
return self.log(16, msg)
def pdb(
self,
@ -121,37 +114,14 @@ class StackLevelAdapter(logging.LoggerAdapter):
'''
return self.log(500, msg)
def log(
self,
level,
msg,
*args,
**kwargs,
):
'''
def log(self, level, msg, *args, **kwargs):
"""
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
'''
"""
if self.isEnabledFor(level):
stacklevel: int = 3
if (
level in LEVELS.values()
# or level in _custom_levels
):
stacklevel: int = 4
# msg, kwargs = self.process(msg, kwargs)
self._log(
level=level,
msg=msg,
args=args,
# NOTE: not sure how this worked before but, it
# seems with our custom level methods defined above
# we do indeed (now) require another stack level??
stacklevel=stacklevel,
**kwargs,
)
self._log(level, msg, args, **kwargs)
# LOL, the stdlib doesn't allow passing through ``stacklevel``..
def _log(
@ -164,15 +134,12 @@ class StackLevelAdapter(logging.LoggerAdapter):
stack_info=False,
# XXX: bit we added to show fileinfo from actual caller.
# - this level
# - then ``.log()``
# - then finally the caller's level..
stacklevel=4,
# this level then ``.log()`` then finally the caller's level..
stacklevel=3,
):
'''
"""
Low-level log implementation, proxied to allow nested logger adapters.
'''
"""
return self.logger._log(
level,
msg,

View File

@ -19,13 +19,22 @@ Sugary patterns for trio + tractor designs.
'''
from ._mngrs import (
gather_contexts as gather_contexts,
maybe_open_context as maybe_open_context,
maybe_open_nursery as maybe_open_nursery,
gather_contexts,
maybe_open_context,
maybe_open_nursery,
)
from ._broadcast import (
AsyncReceiver as AsyncReceiver,
broadcast_receiver as broadcast_receiver,
BroadcastReceiver as BroadcastReceiver,
Lagged as Lagged,
broadcast_receiver,
BroadcastReceiver,
Lagged,
)
__all__ = [
'gather_contexts',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',
'maybe_open_context',
'maybe_open_nursery',
]

View File

@ -225,7 +225,6 @@ async def maybe_open_context(
# yielded output
yielded: Any = None
lock_registered: bool = False
# Lock resource acquisition around task racing / ``trio``'s
# scheduler protocol.
@ -233,7 +232,6 @@ async def maybe_open_context(
# to allow re-entrant use cases where one `maybe_open_context()`
# wrapped factor may want to call into another.
lock = _Cache.locks.setdefault(fid, trio.Lock())
lock_registered: bool = True
await lock.acquire()
# XXX: one singleton nursery per actor and we want to
@ -293,9 +291,4 @@ async def maybe_open_context(
_, no_more_users = entry
no_more_users.set()
if lock_registered:
maybe_lock = _Cache.locks.pop(fid, None)
if maybe_lock is None:
log.error(
f'Resource lock for {fid} ALREADY POPPED?'
)
_Cache.locks.pop(fid)