Compare commits

...

25 Commits

Author SHA1 Message Date
Tyler Goodlet 9fc9b10b53 Add `StreamOverrun.sender: tuple` for better handling
Since it's generally useful to know who is the cause of an overrun (say
bc you want your system to then adjust the writer side to slow tf down)
might as well pack an extra `.sender: tuple[str, str]` actor uid field
which can be relayed through `RemoteActorError` boxing. Add an extra
case for the exc-type to `unpack_error()` to match B)
2025-03-14 14:14:54 -04:00
Tyler Goodlet a86275996c Offer `unpack_error(hid_tb: bool)` for `pdbp` REPL config 2025-03-14 14:14:54 -04:00
Tyler Goodlet b5431c0343 Never mask original `KeyError` in portal-error unwrapper, for now? 2025-03-14 14:14:54 -04:00
Tyler Goodlet cdee6f9354 Try allowing multi-pops of `_Cache.locks` for now? 2025-03-14 14:14:53 -04:00
Tyler Goodlet a2f1bcc23f Use `import <blah> as blah` over `__all__` in `.trionics` 2025-03-14 14:14:53 -04:00
Tyler Goodlet 4aa89bf391 Bump timeout on resource cache test a bitty bit. 2025-03-14 14:14:53 -04:00
Tyler Goodlet 45e9cb4d09 `_root`: drop unused `typing` import 2025-03-14 14:14:53 -04:00
Tyler Goodlet 27c5ffe5a7 Move missing-key-in-msg raiser to `._exceptions`
Since we use basically the exact same set of logic in
`Portal.open_context()` when expecting the first `'started'` msg factor
and generalize `._streaming._raise_from_no_yield_msg()` into a new
`._exceptions._raise_from_no_key_in_msg()` (as per the lingering todo)
which obvi requires a more generalized / optional signature including
a caller specific `log` obj. Obvi call the new func from all the other
modules X)
2025-03-14 14:14:50 -04:00
Tyler Goodlet 914efd80eb Fmt repr as multi-line style call 2025-03-14 14:14:11 -04:00
Tyler Goodlet 2d2d1ca1c4 Drop unused walrus assign of `re` 2025-03-14 14:14:11 -04:00
Tyler Goodlet 74aa5aa9cd `StackLevelAdapter._log(stacklevel: int)` for custom levels..
Apparently (and i don't know if this was always broken [i feel like no?]
or is a recent change to stdlib's `logging` stuff) we need increment the
`stacklevel` input by one for our custom level methods now? Without this
you're going to see the path to the method's-callstack-frame on every
emission instead of to the caller's. I first noticed this when debugging
the workspace layer spawning in `modden.bigd` and then verified it in
other depended projects..

I guess we should add some tests for this as well XD
2025-03-14 14:14:11 -04:00
Tyler Goodlet 44e386dd99 ._child: remove some unused imports.. 2025-03-14 13:56:25 -04:00
Tyler Goodlet 13fbcc723f Guarding for IPC failures in `._runtime._invoke()`
Took me longer then i wanted to figure out the source of
a failed-response to a remote-cancellation (in this case in `modden`
where a client was cancelling a workspace layer.. but disconnects before
receiving the ack msg) that was triggering an IPC error when sending the
error msg for the cancellation of a `Actor._cancel_task()`, but since
this (non-rpc) `._invoke()` task was trying to send to a now
disconnected canceller it was resulting in a `BrokenPipeError` (or similar)
error.

Now, we except for such IPC errors and only raise them when,
1. the transport `Channel` is for sure up (bc ow what's the point of
   trying to send an error on the thing that caused it..)
2. it's definitely for handling an RPC task

Similarly if the entire main invoke `try:` excepts,
- we only hide the call-stack frame from the debugger (with
  `__tracebackhide__: bool`) if it's an RPC task that has a connected
  channel since we always want to see the frame when debugging internal
  task or IPC failures.
- we don't bother trying to send errors to the context caller (actor)
  when it's a non-RPC request since failures on actor-runtime-internal
  tasks shouldn't really ever be reported remotely, only maybe raised
  locally.

Also some other tidying,
- this properly corrects for the self-cancel case where an RPC context
  is cancelled due to a local (runtime) task calling a method like
  `Actor.cancel_soon()`. We now set our own `.uid` as the
  `ContextCancelled.canceller` value so that other-end tasks know that
  the cancellation was due to a self-cancellation by the actor itself.
  We still need to properly test for this though!
- add a more detailed module doc-str.
- more explicit imports for `trio` core types throughout.
2025-03-14 13:56:23 -04:00
Tyler Goodlet 315f0fc7eb More thurough hard kill doc strings 2025-03-14 13:48:35 -04:00
Tyler Goodlet fea111e882 Tons of interpeer test cleanup
Drop all the nested `@acm` blocks and defunct comments from initial
validations. Add some todos for cases that are still unclear such as
whether the caller / streamer should have `.cancelled_caught == True` in
it's teardown.
2025-03-14 13:44:09 -04:00
Tyler Goodlet a1bf4db1e3 Get inter-peer suite passing with all `Context` state checks!
Definitely needs some cleaning and refinement but this gets us to stage
1 of being pretty frickin correct i'd say 💃
2025-03-14 13:44:09 -04:00
Tyler Goodlet bac9523ecf Adjust test details where `Context.cancel()` is called
We can now make asserts on `.cancelled_caught` and `_remote_error` vs.
`_local_error`. Expect a runtime error when `Context.open_stream()` is
called AFTER `.cancel()` and the remote `ContextCancelled` hasn't
arrived (yet). Adjust to `'itself'` string in self-cancel case.
2025-03-14 13:44:09 -04:00
Tyler Goodlet abe31e9e2c Fix `Context.result()` call to be in runtime scope 2025-03-14 13:44:09 -04:00
Tyler Goodlet 0222180c11 Tweak `Channel._cancel_called` comment 2025-03-14 13:44:09 -04:00
Tyler Goodlet 7d5fda4485 Be ultra-correct in `Portal.open_context()`
This took way too long to get right but hopefully will give us grok-able
and correct context exit semantics going forward B)

The main fixes were:
- always shielding the `MsgStream.aclose()` call on teardown to avoid
  bubbling a `Cancelled`.
- properly absorbing any `ContextCancelled` in cases due to "self
  cancellation" using the new `Context.canceller` in the logic.
- capturing any error raised by the `Context.result()` call in the
  "normal exit, result received" case and setting it as the
  `Context._local_error` so that self-cancels can be easily measured via
  `Context.cancelled_caught` in same way as remote-error caused
  cancellations.
- extremely detailed comments around all of the cancellation-error cases
  to avoid ever getting confused about the control flow in the future XD
2025-03-14 13:44:08 -04:00
Tyler Goodlet f5fcd8ca2e Be mega-pedantic with `ContextCancelled` semantics
As part of extremely detailed inter-peer-actor testing, add much more
granular `Context` cancellation state tracking via the following (new)
fields:
- `.canceller: tuple[str, str]` the uuid of the actor responsible for
  the cancellation condition - always set by
  `Context._maybe_cancel_and_set_remote_error()` and replaces
  `._cancelled_remote` and `.cancel_called_remote`. If set, this value
  should normally always match a value from some `ContextCancelled`
  raised or caught by one side of the context.
- `._local_error` which is always set to the locally raised (and caller
  or callee task's scope-internal) error which caused any
  eventual cancellation/error condition and thus any closure of the
  context's per-task-side-`trio.Nursery`.
- `.cancelled_caught: bool` is now always `True` whenever the local task
  catches (or "silently absorbs") a `ContextCancelled` (a `ctxc`) that
  indeed originated from one of the context's linked tasks or any other
  context which raised its own `ctxc` in the current `.open_context()` scope.
  => whenever there is a case that no `ContextCancelled` was raised
  **in** the `.open_context().__aexit__()` (eg. `ctx.result()` called
  after a call `ctx.cancel()`), we still consider the context's as
  having "caught a cancellation" since the `ctxc` was indeed silently
  handled by the cancel requester; all other error cases are already
  represented by mirroring the state of the `._scope: trio.CancelScope`
  => IOW there should be **no case** where an error is **not raised** in
  the context's scope and `.cancelled_caught: bool == False`, i.e. no
  case where `._scope.cancelled_caught == False and ._local_error is not
  None`!
- always raise any `ctxc` from `.open_stream()` if `._cancel_called ==
  True` - if the cancellation request has not already resulted in
  a `._remote_error: ContextCancelled` we raise a `RuntimeError` to
  indicate improper usage to the guilty side's task code.
- make `._maybe_raise_remote_err()` a sync func and don't raise
  any `ctxc` which is matched against a `.canceller` determined to
  be the current actor, aka a "self cancel", and always set the
  `._local_error` to any such `ctxc`.
- `.side: str` taken from inside `.cancel()` and unused as of now since
  it might be better re-written as a similar `.is_opener() -> bool`?
- drop unused `._started_received: bool`..
- TONS and TONS of detailed comments/docs to attempt to explain all the
  possible cancellation/exit cases and how they should exhibit as either
  silent closes or raises from the `Context` API!

Adjust the `._runtime._invoke()` code to match:
- use `ctx._maybe_raise_remote_err()` in `._invoke()`.
- adjust to new `.canceller` property.
- more type hints.
- better `log.cancel()` msging around self-cancels vs. peer-cancels.
- always set the `._local_error: BaseException` for the "callee" task
  just like `Portal.open_context()` now will do B)

Prior we were raising any `Context._remote_error` directly and doing
(more or less) the same `ContextCancelled` "absorbing" logic (well
kinda) in block; instead delegate to the method
2025-03-14 13:42:55 -04:00
Tyler Goodlet 04217f319a Raise a `MessagingError` from the src error on msging edge cases 2025-03-14 13:42:15 -04:00
Tyler Goodlet 8cb8390201 Move `MessagingError` into `._exceptions` set 2025-03-14 13:42:15 -04:00
Tyler Goodlet 5035617adf Dump `.msgdata` in `RemoteActorError.__repr__()` 2025-03-14 13:42:15 -04:00
Tyler Goodlet 715348c5c2 Port all tests to new `reg_addr` fixture name 2025-03-14 13:42:15 -04:00
27 changed files with 1166 additions and 552 deletions

View File

@ -47,7 +47,7 @@ async def do_nuthin():
],
ids=['no_args', 'unexpected_args'],
)
def test_remote_error(arb_addr, args_err):
def test_remote_error(reg_addr, args_err):
"""Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin
error type info and causes cancellation and reraising all the
@ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
# on a remote type error caused by bad input args
@ -97,7 +97,7 @@ def test_remote_error(arb_addr, args_err):
assert exc.type == errtype
def test_multierror(arb_addr):
def test_multierror(reg_addr):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.
@ -105,7 +105,7 @@ def test_multierror(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
await nursery.run_in_actor(assert_err, name='errorer1')
@ -130,14 +130,14 @@ def test_multierror(arb_addr):
@pytest.mark.parametrize(
'num_subactors', range(25, 26),
)
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
for i in range(num_subactors):
@ -175,15 +175,20 @@ async def do_nothing():
@pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt])
def test_cancel_single_subactor(arb_addr, mechanism):
"""Ensure a ``ActorNursery.start_actor()`` spawned subactor
def test_cancel_single_subactor(reg_addr, mechanism):
'''
Ensure a ``ActorNursery.start_actor()`` spawned subactor
cancels when the nursery is cancelled.
"""
'''
async def spawn_actor():
"""Spawn an actor that blocks indefinitely.
"""
'''
Spawn an actor that blocks indefinitely then cancel via
either `ActorNursery.cancel()` or an exception raise.
'''
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:
portal = await nursery.start_actor(

View File

@ -141,7 +141,7 @@ async def open_actor_local_nursery(
)
def test_actor_managed_trio_nursery_task_error_cancels_aio(
asyncio_mode: bool,
arb_addr
reg_addr: tuple,
):
'''
Verify that a ``trio`` nursery created managed in a child actor

View File

@ -5,7 +5,7 @@ Verify the we raise errors when streams are opened prior to
sync-opening a ``tractor.Context`` beforehand.
'''
from contextlib import asynccontextmanager as acm
# from contextlib import asynccontextmanager as acm
from itertools import count
import platform
from typing import Optional
@ -13,6 +13,11 @@ from typing import Optional
import pytest
import trio
import tractor
from tractor import (
Actor,
Context,
current_actor,
)
from tractor._exceptions import (
StreamOverrun,
ContextCancelled,
@ -193,9 +198,6 @@ def test_simple_context(
else:
assert await ctx.result() == 'yo'
if not error_parent:
await ctx.cancel()
if pointlessly_open_stream:
async with ctx.open_stream():
if error_parent:
@ -208,10 +210,15 @@ def test_simple_context(
# 'stop' msg to the far end which needs
# to be ignored
pass
else:
if error_parent:
raise error_parent
# cancel AFTER we open a stream
# to avoid a cancel raised inside
# `.open_stream()`
await ctx.cancel()
finally:
# after cancellation
@ -276,7 +283,7 @@ def test_caller_cancels(
assert (
tuple(err.canceller)
==
tractor.current_actor().uid
current_actor().uid
)
async def main():
@ -430,9 +437,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
):
'caller context closes without using stream'
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
portal = await n.start_actor(
root: Actor = current_actor()
portal = await an.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
@ -440,10 +449,10 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
await portal.run(assert_state, value=True)
assert sent is None
await portal.run(assert_state, value=True)
# call cancel explicitly
if use_ctx_cancel_method:
@ -454,8 +463,21 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async for msg in stream:
pass
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
except tractor.ContextCancelled as ctxc:
# XXX: the cause is US since we call
# `Context.cancel()` just above!
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
else:
assert 0, "Should have context cancelled?"
@ -472,7 +494,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
await ctx.result()
assert 0, "Callee should have blocked!?"
except trio.TooSlowError:
# NO-OP -> since already called above
await ctx.cancel()
# local scope should have absorbed the cancellation
assert ctx.cancelled_caught
assert ctx._remote_error is ctx._local_error
try:
async with ctx.open_stream() as stream:
async for msg in stream:
@ -551,19 +579,25 @@ async def cancel_self(
global _state
_state = True
# since we call this the below `.open_stream()` should always
# error!
await ctx.cancel()
# should inline raise immediately
try:
async with ctx.open_stream():
pass
except tractor.ContextCancelled:
# except tractor.ContextCancelled:
except RuntimeError:
# suppress for now so we can do checkpoint tests below
pass
print('Got expected runtime error for stream-after-cancel')
else:
raise RuntimeError('Context didnt cancel itself?!')
# check a real ``trio.Cancelled`` is raised on a checkpoint
# check that``trio.Cancelled`` is now raised on any further
# checkpoints since the self cancel above will have cancelled
# the `Context._scope.cancel_scope: trio.CancelScope`
try:
with trio.fail_after(0.1):
await trio.sleep_forever()
@ -574,6 +608,7 @@ async def cancel_self(
# should never get here
assert 0
raise RuntimeError('Context didnt cancel itself?!')
@tractor_test
async def test_callee_cancels_before_started():
@ -601,7 +636,7 @@ async def test_callee_cancels_before_started():
ce.type == trio.Cancelled
# the traceback should be informative
assert 'cancelled itself' in ce.msgdata['tb_str']
assert 'itself' in ce.msgdata['tb_str']
# teardown the actor
await portal.cancel_actor()
@ -773,7 +808,7 @@ async def echo_back_sequence(
print(
'EXITING CALLEEE:\n'
f'{ctx.cancel_called_remote}'
f'{ctx.canceller}'
)
return 'yo'
@ -871,7 +906,7 @@ def test_maybe_allow_overruns_stream(
if cancel_ctx:
assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == tractor.current_actor().uid
assert tuple(res.canceller) == current_actor().uid
else:
print(f'RX ROOT SIDE RESULT {res}')

View File

@ -78,7 +78,7 @@ has_nested_actors = pytest.mark.has_nested_actors
def spawn(
start_method,
testdir,
arb_addr,
reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':

View File

@ -15,19 +15,19 @@ from conftest import tractor_test
@tractor_test
async def test_reg_then_unreg(arb_addr):
async def test_reg_then_unreg(reg_addr):
actor = tractor.current_actor()
assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as n:
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid
async with tractor.get_arbiter(*arb_addr) as aportal:
async with tractor.get_arbiter(*reg_addr) as aportal:
# this local actor should be the arbiter
assert actor is aportal.actor
@ -53,15 +53,27 @@ async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(other_actor):
async def say_hello(
other_actor: str,
reg_addr: tuple[str, int],
):
await trio.sleep(1) # wait for other actor to spawn
async with tractor.find_actor(other_actor) as portal:
async with tractor.find_actor(
other_actor,
registry_addrs=[reg_addr],
) as portal:
assert portal is not None
return await portal.run(__name__, 'hi')
async def say_hello_use_wait(other_actor):
async with tractor.wait_for_actor(other_actor) as portal:
async def say_hello_use_wait(
other_actor: str,
reg_addr: tuple[str, int],
):
async with tractor.wait_for_actor(
other_actor,
registry_addr=reg_addr,
) as portal:
assert portal is not None
result = await portal.run(__name__, 'hi')
return result
@ -69,21 +81,29 @@ async def say_hello_use_wait(other_actor):
@tractor_test
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
async def test_trynamic_trio(func, start_method, arb_addr):
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async def test_trynamic_trio(
func,
start_method,
reg_addr,
):
'''
Root actor acting as the "director" and running one-shot-task-actors
for the directed subs.
'''
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
func,
other_actor='gretchen',
reg_addr=reg_addr,
name='donny',
)
gretchen = await n.run_in_actor(
func,
other_actor='donny',
reg_addr=reg_addr,
name='gretchen',
)
print(await gretchen.result())
@ -131,7 +151,7 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
arb_addr: tuple,
reg_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,
@ -139,9 +159,9 @@ async def spawn_and_check_registry(
) -> None:
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@ -213,17 +233,19 @@ async def spawn_and_check_registry(
def test_subactors_unregister_on_cancel(
start_method,
use_signal,
arb_addr,
reg_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
spawn_and_check_registry,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=False,
with_streaming=with_streaming,
@ -237,7 +259,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method,
use_signal,
arb_addr,
reg_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
@ -248,7 +270,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
trio.run(
partial(
spawn_and_check_registry,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=True,
with_streaming=with_streaming,
@ -262,7 +284,7 @@ async def streamer(agen):
async def close_chans_before_nursery(
arb_addr: tuple,
reg_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
) -> None:
@ -275,9 +297,9 @@ async def close_chans_before_nursery(
entries_at_end = 1
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
async with tractor.get_arbiter(*arb_addr) as aportal:
async with tractor.get_arbiter(*reg_addr) as aportal:
try:
get_reg = partial(unpack_reg, aportal)
@ -329,7 +351,7 @@ async def close_chans_before_nursery(
def test_close_channel_explicit(
start_method,
use_signal,
arb_addr,
reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@ -339,7 +361,7 @@ def test_close_channel_explicit(
trio.run(
partial(
close_chans_before_nursery,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=False,
),
@ -351,7 +373,7 @@ def test_close_channel_explicit_remote_arbiter(
daemon,
start_method,
use_signal,
arb_addr,
reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
@ -361,7 +383,7 @@ def test_close_channel_explicit_remote_arbiter(
trio.run(
partial(
close_chans_before_nursery,
arb_addr,
reg_addr,
use_signal,
remote_arbiter=True,
),

View File

@ -47,7 +47,7 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(sleep_forever)
def test_trio_cancels_aio_on_actor_side(arb_addr):
def test_trio_cancels_aio_on_actor_side(reg_addr):
'''
Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis.
@ -55,7 +55,7 @@ def test_trio_cancels_aio_on_actor_side(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
@ -94,7 +94,7 @@ async def asyncio_actor(
raise
def test_aio_simple_error(arb_addr):
def test_aio_simple_error(reg_addr):
'''
Verify a simple remote asyncio error propagates back through trio
to the parent actor.
@ -103,7 +103,7 @@ def test_aio_simple_error(arb_addr):
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr
registry_addrs=[reg_addr]
) as n:
await n.run_in_actor(
asyncio_actor,
@ -120,7 +120,7 @@ def test_aio_simple_error(arb_addr):
assert err.type == AssertionError
def test_tractor_cancels_aio(arb_addr):
def test_tractor_cancels_aio(reg_addr):
'''
Verify we can cancel a spawned asyncio task gracefully.
@ -139,7 +139,7 @@ def test_tractor_cancels_aio(arb_addr):
trio.run(main)
def test_trio_cancels_aio(arb_addr):
def test_trio_cancels_aio(reg_addr):
'''
Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api.
@ -194,7 +194,7 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format
)
def test_context_spawns_aio_task_that_errors(
arb_addr,
reg_addr,
parent_cancels: bool,
):
'''
@ -225,7 +225,7 @@ def test_context_spawns_aio_task_that_errors(
await trio.sleep_forever()
return await ctx.result()
return await ctx.result()
if parent_cancels:
# bc the parent made the cancel request,
@ -258,7 +258,7 @@ async def aio_cancel():
await sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
async def main():
async with tractor.open_nursery() as n:
@ -395,7 +395,7 @@ async def stream_from_aio(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
def test_basic_interloop_channel_stream(arb_addr, fan_out):
def test_basic_interloop_channel_stream(reg_addr, fan_out):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -409,7 +409,7 @@ def test_basic_interloop_channel_stream(arb_addr, fan_out):
# TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(arb_addr):
def test_trio_error_cancels_intertask_chan(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -428,7 +428,7 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
def test_trio_closes_early_and_channel_exits(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -443,7 +443,7 @@ def test_trio_closes_early_and_channel_exits(arb_addr):
trio.run(main)
def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
@ -520,7 +520,7 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
arb_addr,
reg_addr,
raise_error_mid_stream,
):

View File

@ -15,6 +15,26 @@ from tractor import ( # typing
ContextCancelled,
)
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg raised in all open ctxs
# with that peer.
# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which
# (also) spawned a failing task which was unhandled and
# propagated up to the immediate parent - the peer to the actor
# that also spawned a remote task task in that same peer-parent.
# def test_self_cancel():
# '''
@ -29,14 +49,30 @@ from tractor import ( # typing
@tractor.context
async def sleep_forever(
ctx: Context,
expect_ctxc: bool = False,
) -> None:
'''
Sync the context, open a stream then just sleep.
Allow checking for (context) cancellation locally.
'''
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
try:
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
except BaseException as berr:
# TODO: it'd sure be nice to be able to inject our own
# `ContextCancelled` here instead of of `trio.Cancelled`
# so that our runtime can expect it and this "user code"
# would be able to tell the diff between a generic trio
# cancel and a tractor runtime-IPC cancel.
if expect_ctxc:
assert isinstance(berr, trio.Cancelled)
raise
@tractor.context
@ -145,6 +181,7 @@ async def stream_ints(
async with ctx.open_stream() as stream:
for i in itertools.count():
await stream.send(i)
await trio.sleep(0.01)
@tractor.context
@ -161,73 +198,81 @@ async def stream_from_peer(
peer_ctx.open_stream() as stream,
):
await ctx.started()
# XXX TODO: big set of questions for this
# XXX QUESTIONS & TODO: for further details around this
# in the longer run..
# https://github.com/goodboy/tractor/issues/368
# - should we raise `ContextCancelled` or `Cancelled` (rn
# it does that) here?!
# - test the `ContextCancelled` OUTSIDE the
# `.open_context()` call?
try:
async for msg in stream:
print(msg)
except trio.Cancelled:
assert not ctx.cancel_called
assert not ctx.cancelled_caught
assert not peer_ctx.cancel_called
assert not peer_ctx.cancelled_caught
assert 'root' in ctx.cancel_called_remote
raise # XXX MUST NEVER MASK IT!!
with trio.CancelScope(shield=True):
await tractor.pause()
# pass
# pytest.fail(
raise RuntimeError(
'peer never triggered local `[Context]Cancelled`?!?'
)
# it does latter) and should/could it be implemented
# as a general injection override for `trio` such
# that ANY next checkpoint would raise the "cancel
# error type" of choice?
# - should the `ContextCancelled` bubble from
# all `Context` and `MsgStream` apis wherein it
# prolly makes the most sense to make it
# a `trio.Cancelled` subtype?
# - what about IPC-transport specific errors, should
# they bubble from the async for and trigger
# other special cases?
# NOTE: current ctl flow:
# - stream raises `trio.EndOfChannel` and
# exits the loop
# - `.open_context()` will raise the ctxcanc
# received from the sleeper.
async for msg in stream:
assert msg is not None
print(msg)
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxerr:
assert ctxerr.canceller == 'canceller'
assert ctxerr._remote_error is ctxerr
err = ctxerr
assert peer_ctx._remote_error is ctxerr
assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
# XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task
# was started in, exits.
assert not ctx.cancelled_caught
# we never requested cancellation
assert not peer_ctx.cancel_called
# the `.open_context()` exit definitely caught
# a cancellation in the internal `Context._scope` since
# likely the runtime called `_deliver_msg()` after
# receiving the remote error from the streaming task.
assert peer_ctx.cancelled_caught
# TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside
# `.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last
# checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368
# root/parent actor task should NEVER HAVE cancelled us!
assert not ctx.canceller
assert 'canceller' in peer_ctx.canceller
# CASE 1: we were cancelled by our parent, the root actor.
# TODO: there are other cases depending on how the root
# actor and it's caller side task are written:
# - if the root does not req us to cancel then an
# IPC-transport related error should bubble from the async
# for loop and thus cause local cancellation both here
# and in the root (since in that case this task cancels the
# context with the root, not the other way around)
assert ctx.cancel_called_remote[0] == 'root'
raise
# TODO: IN THEORY we could have other cases depending on
# who cancels first, the root actor or the canceller peer?.
#
# 1- when the peer request is first then the `.canceller`
# field should obvi be set to the 'canceller' uid,
#
# 2-if the root DOES req cancel then we should see the same
# `trio.Cancelled` implicitly raised
# assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper'
# except BaseException as err:
raise RuntimeError(
'peer never triggered local `ContextCancelled`?'
)
# raise
# cases:
# - some arbitrary remote peer cancels via Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg.
# - peer spawned a sub-actor which (also) spawned a failing task
# which was unhandled and propagated up to the immediate
# parent, the peer to the actor that also spawned a remote task
# task in that same peer-parent.
# - peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
@pytest.mark.parametrize(
'error_during_ctxerr_handling',
@ -251,8 +296,8 @@ def test_peer_canceller(
line and be less indented.
.actor0> ()-> .actor1>
a inter-actor task context opened (by `async with `Portal.open_context()`)
from actor0 *into* actor1.
a inter-actor task context opened (by `async with
`Portal.open_context()`) from actor0 *into* actor1.
.actor0> ()<=> .actor1>
a inter-actor task context opened (as above)
@ -287,11 +332,12 @@ def test_peer_canceller(
5. .canceller> ()-> .sleeper>
- calls `Portal.cancel_actor()`
'''
async def main():
async with tractor.open_nursery() as an:
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
# debug_mode=True
) as an:
canceller: Portal = await an.start_actor(
'canceller',
enable_modules=[__name__],
@ -305,10 +351,13 @@ def test_peer_canceller(
enable_modules=[__name__],
)
root = tractor.current_actor()
try:
async with (
sleeper.open_context(
sleep_forever,
expect_ctxc=True,
) as (sleeper_ctx, sent),
just_caller.open_context(
@ -335,16 +384,15 @@ def test_peer_canceller(
'Context.result() did not raise ctx-cancelled?'
)
# TODO: not sure why this isn't catching
# but maybe we need an `ExceptionGroup` and
# the whole except *errs: thinger in 3.11?
# should always raise since this root task does
# not request the sleeper cancellation ;)
except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
# canceller and caller peers should not
# have been remotely cancelled.
assert canceller_ctx.cancel_called_remote is None
assert caller_ctx.cancel_called_remote is None
assert canceller_ctx.canceller is None
assert caller_ctx.canceller is None
assert ctxerr.canceller[0] == 'canceller'
@ -355,16 +403,14 @@ def test_peer_canceller(
# block it should be.
assert not sleeper_ctx.cancelled_caught
# TODO: a test which ensures this error is
# bubbled and caught (NOT MASKED) by the
# runtime!!!
if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown')
raise
# SHOULD NEVER GET HERE!
except BaseException:
# XXX SHOULD NEVER EVER GET HERE XXX
except BaseException as berr:
err = berr
pytest.fail('did not rx ctx-cancelled error?')
else:
pytest.fail('did not rx ctx-cancelled error?')
@ -375,6 +421,20 @@ def test_peer_canceller(
)as ctxerr:
_err = ctxerr
# NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs)
# - `.cancel_called` (bool of whether this side
# requested)
# - `.canceller` (uid of cancel-causing actor-task)
# - `._remote_error` (any `RemoteActorError`
# instance from other side of context)
# TODO: are we really planning to use this tho?
# - `._cancel_msg` (any msg that caused the
# cancel)
# CASE: error raised during handling of
# `ContextCancelled` inside `.open_context()`
# block
if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError)
@ -384,20 +444,42 @@ def test_peer_canceller(
for ctx in ctxs:
assert ctx.cancel_called
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# this root actor task should have
# cancelled all opened contexts except
# the sleeper which is cancelled by its
# peer "canceller"
if ctx is not sleeper_ctx:
assert ctx._remote_error.canceller[0] == 'root'
# cancelled all opened contexts except the
# sleeper which is obvi by the "canceller"
# peer.
re = ctx._remote_error
if (
ctx is sleeper_ctx
or ctx is caller_ctx
):
assert (
re.canceller
==
ctx.canceller
==
canceller.channel.uid
)
else:
assert (
re.canceller
==
ctx.canceller
==
root.uid
)
# CASE: standard teardown inside in `.open_context()` block
else:
assert ctxerr.canceller[0] == 'canceller'
assert ctxerr.canceller == sleeper_ctx.canceller
assert (
ctxerr.canceller[0]
==
sleeper_ctx.canceller[0]
==
'canceller'
)
# the sleeper's remote error is the error bubbled
# out of the context-stack above!
@ -405,18 +487,43 @@ def test_peer_canceller(
assert re is ctxerr
for ctx in ctxs:
re: BaseException | None = ctx._remote_error
assert re
# root doesn't cancel sleeper since it's
# cancelled by its peer.
if ctx is sleeper_ctx:
assert not ctx.cancel_called
# since sleeper_ctx.result() IS called
# above we should have (silently)
# absorbed the corresponding
# `ContextCancelled` for it and thus
# the logic inside `.cancelled_caught`
# should trigger!
assert ctx.cancelled_caught
elif ctx is caller_ctx:
# since its context was remotely
# cancelled, we never needed to
# call `Context.cancel()` bc it was
# done by the peer and also we never
assert ctx.cancel_called
# TODO: figure out the details of
# this..
# if you look the `._local_error` here
# is a multi of ctxc + 2 Cancelleds?
# assert not ctx.cancelled_caught
else:
assert ctx.cancel_called
assert not ctx.cancelled_caught
# each context should have received
# TODO: do we even need this flag?
# -> each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# in its remote nursery scope.
# assert ctx.chan.uid == ctx.canceller
# NOTE: when an inter-peer cancellation
# occurred, we DO NOT expect this
@ -434,9 +541,7 @@ def test_peer_canceller(
# including the case where ctx-cancel handling
# itself errors.
assert sleeper_ctx.cancelled_caught
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
# await tractor.pause()
raise # always to ensure teardown
if error_during_ctxerr_handling:

View File

@ -55,7 +55,7 @@ async def context_stream(
async def stream_from_single_subactor(
arb_addr,
reg_addr,
start_method,
stream_func,
):
@ -64,7 +64,7 @@ async def stream_from_single_subactor(
# only one per host address, spawns an actor if None
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
start_method=start_method,
) as nursery:
@ -115,13 +115,13 @@ async def stream_from_single_subactor(
@pytest.mark.parametrize(
'stream_func', [async_gen_stream, context_stream]
)
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator.
"""
trio.run(
partial(
stream_from_single_subactor,
arb_addr,
reg_addr,
start_method,
stream_func=stream_func,
),
@ -225,14 +225,14 @@ async def a_quadruple_example():
return result_stream
async def cancel_after(wait, arb_addr):
async with tractor.open_root_actor(arbiter_addr=arb_addr):
async def cancel_after(wait, reg_addr):
async with tractor.open_root_actor(registry_addrs=[reg_addr]):
with trio.move_on_after(wait):
return await a_quadruple_example()
@pytest.fixture(scope='module')
def time_quad_ex(arb_addr, ci_env, spawn_backend):
def time_quad_ex(reg_addr, ci_env, spawn_backend):
if spawn_backend == 'mp':
"""no idea but the mp *nix runs are flaking out here often...
"""
@ -240,7 +240,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend):
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
start = time.time()
results = trio.run(cancel_after, timeout, arb_addr)
results = trio.run(cancel_after, timeout, reg_addr)
diff = time.time() - start
assert results
return results, diff
@ -260,14 +260,14 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
list(map(lambda i: i/10, range(3, 9)))
)
def test_not_fast_enough_quad(
arb_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
):
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0)
results = trio.run(cancel_after, delay, arb_addr)
results = trio.run(cancel_after, delay, reg_addr)
system = platform.system()
if system in ('Windows', 'Darwin') and results is not None:
# In CI envoirments it seems later runs are quicker then the first
@ -280,7 +280,7 @@ def test_not_fast_enough_quad(
@tractor_test
async def test_respawn_consumer_task(
arb_addr,
reg_addr,
spawn_backend,
loglevel,
):

View File

@ -24,7 +24,7 @@ async def test_no_runtime():
@tractor_test
async def test_self_is_registered(arb_addr):
async def test_self_is_registered(reg_addr):
"Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
@ -34,20 +34,20 @@ async def test_self_is_registered(arb_addr):
@tractor_test
async def test_self_is_registered_localportal(arb_addr):
async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns(
'self', 'wait_for_actor', name='root')
assert sockaddr[0] == arb_addr
assert sockaddr[0] == reg_addr
def test_local_actor_async_func(arb_addr):
def test_local_actor_async_func(reg_addr):
"""Verify a simple async function in-process.
"""
nums = []
@ -55,7 +55,7 @@ def test_local_actor_async_func(arb_addr):
async def print_loop():
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter

View File

@ -28,9 +28,9 @@ def test_abort_on_sigint(daemon):
@tractor_test
async def test_cancel_remote_arbiter(daemon, arb_addr):
async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
@ -39,16 +39,16 @@ async def test_cancel_remote_arbiter(daemon, arb_addr):
# no arbiter socket should exist
with pytest.raises(OSError):
async with tractor.get_arbiter(*arb_addr) as portal:
async with tractor.get_arbiter(*reg_addr) as portal:
pass
def test_register_duplicate_name(daemon, arb_addr):
def test_register_duplicate_name(daemon, reg_addr):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as n:
assert not tractor.current_actor().is_arbiter

View File

@ -160,7 +160,7 @@ async def test_required_args(callwith_expecterror):
)
def test_multi_actor_subs_arbiter_pub(
loglevel,
arb_addr,
reg_addr,
pub_actor,
):
"""Try out the neato @pub decorator system.
@ -170,7 +170,7 @@ def test_multi_actor_subs_arbiter_pub(
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:
@ -255,12 +255,12 @@ def test_multi_actor_subs_arbiter_pub(
def test_single_subactor_pub_multitask_subs(
loglevel,
arb_addr,
reg_addr,
):
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:

View File

@ -34,7 +34,6 @@ def test_resource_only_entered_once(key_on):
global _resource
_resource = 0
kwargs = {}
key = None
if key_on == 'key_value':
key = 'some_common_key'
@ -139,7 +138,7 @@ def test_open_local_sub_to_stream():
N local tasks using ``trionics.maybe_open_context():``.
'''
timeout = 3 if platform.system() != "Windows" else 10
timeout: float = 3.6 if platform.system() != "Windows" else 10
async def main():

View File

@ -45,7 +45,7 @@ async def short_sleep():
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
'fail_on_syntax'],
)
def test_rpc_errors(arb_addr, to_call, testdir):
def test_rpc_errors(reg_addr, to_call, testdir):
"""Test errors when making various RPC requests to an actor
that either doesn't have the requested module exposed or doesn't define
the named function.
@ -77,7 +77,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
# spawn a subactor which calls us back
async with tractor.open_nursery(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
enable_modules=exposed_mods.copy(),
) as n:

View File

@ -16,14 +16,14 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(
is_arbiter: bool,
data: dict,
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
):
namespaces = [__name__]
await trio.sleep(0.1)
async with tractor.open_root_actor(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
):
actor = tractor.current_actor()
@ -41,7 +41,7 @@ async def spawn(
is_arbiter=False,
name='sub-actor',
data=data,
arb_addr=arb_addr,
reg_addr=reg_addr,
enable_modules=namespaces,
)
@ -55,12 +55,12 @@ async def spawn(
return 10
def test_local_arbiter_subactor_global_state(arb_addr):
def test_local_arbiter_subactor_global_state(reg_addr):
result = trio.run(
spawn,
True,
data_to_pass_down,
arb_addr,
reg_addr,
)
assert result == 10
@ -140,7 +140,7 @@ async def check_loglevel(level):
def test_loglevel_propagated_to_subactor(
start_method,
capfd,
arb_addr,
reg_addr,
):
if start_method == 'mp_forkserver':
pytest.skip(
@ -152,7 +152,7 @@ def test_loglevel_propagated_to_subactor(
async with tractor.open_nursery(
name='arbiter',
start_method=start_method,
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
) as tn:
await tn.run_in_actor(

View File

@ -66,13 +66,13 @@ async def ensure_sequence(
async def open_sequence_streamer(
sequence: list[int],
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
start_method: str,
) -> tractor.MsgStream:
async with tractor.open_nursery(
arbiter_addr=arb_addr,
arbiter_addr=reg_addr,
start_method=start_method,
) as tn:
@ -93,7 +93,7 @@ async def open_sequence_streamer(
def test_stream_fan_out_to_local_subscriptions(
arb_addr,
reg_addr,
start_method,
):
@ -103,7 +103,7 @@ def test_stream_fan_out_to_local_subscriptions(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -138,7 +138,7 @@ def test_stream_fan_out_to_local_subscriptions(
]
)
def test_consumer_and_parent_maybe_lag(
arb_addr,
reg_addr,
start_method,
task_delays,
):
@ -150,7 +150,7 @@ def test_consumer_and_parent_maybe_lag(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -211,7 +211,7 @@ def test_consumer_and_parent_maybe_lag(
def test_faster_task_to_recv_is_cancelled_by_slower(
arb_addr,
reg_addr,
start_method,
):
'''
@ -225,7 +225,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
async with open_sequence_streamer(
sequence,
arb_addr,
reg_addr,
start_method,
) as stream:
@ -302,7 +302,7 @@ def test_subscribe_errors_after_close():
def test_ensure_slow_consumers_lag_out(
arb_addr,
reg_addr,
start_method,
):
'''This is a pure local task test; no tractor

View File

@ -18,8 +18,6 @@
This is the "bootloader" for actors started using the native trio backend.
"""
import sys
import trio
import argparse
from ast import literal_eval
@ -37,8 +35,6 @@ def parse_ipaddr(arg):
return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__":
parser = argparse.ArgumentParser()

View File

@ -44,9 +44,11 @@ import warnings
import trio
from ._exceptions import (
# _raise_from_no_key_in_msg,
unpack_error,
pack_error,
ContextCancelled,
# MessagingError,
StreamOverrun,
)
from .log import get_logger
@ -56,28 +58,36 @@ from ._state import current_actor
if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
log = get_logger(__name__)
# TODO: make this a msgspec.Struct!
@dataclass
class Context:
'''
An inter-actor, ``trio``-task communication context.
An inter-actor, SC transitive, `trio.Task` communication context.
NB: This class should never be instatiated directly, it is delivered
by either,
- runtime machinery to a remotely started task or,
- by entering ``Portal.open_context()``.
NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways:
- by entering ``Portal.open_context()`` which is the primary
public API for any "caller" task or,
- by the RPC machinery's `._runtime._invoke()` as a `ctx` arg
to a remotely scheduled "callee" function.
and is always constructed using ``mkt_context()``.
AND is always constructed using the below ``mk_context()``.
Allows maintaining task or protocol specific state between
2 communicating, parallel executing actor tasks. A unique context is
allocated on each side of any task RPC-linked msg dialog, for
every request to a remote actor from a portal. On the "callee"
side a context is always allocated inside ``._runtime._invoke()``.
2 cancel-scope-linked, communicating and parallel executing
`trio.Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is
always allocated inside ``._runtime._invoke()``.
# TODO: more detailed writeup on cancellation, error and
# streaming semantics..
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, it can also open task
@ -108,12 +118,31 @@ class Context:
# which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope | None = None
# on a clean exit there should be a final value
# delivered from the far end "callee" task, so
# this value is only set on one side.
_result: Any | int = None
# if the local "caller" task errors this
# value is always set to the error that was
# captured in the `Portal.open_context().__aexit__()`
# teardown.
_local_error: BaseException | None = None
# if the either side gets an error from the other
# this value is set to that error unpacked from an
# IPC msg.
_remote_error: BaseException | None = None
# cancellation state
# only set if the local task called `.cancel()`
_cancel_called: bool = False # did WE cancel the far end?
_cancelled_remote: tuple[str, str] | None = None
# TODO: do we even need this? we can assume that if we're
# cancelled that the other side is as well, so maybe we should
# instead just have a `.canceller` pulled from the
# `ContextCancelled`?
_canceller: tuple[str, str] | None = None
# NOTE: we try to ensure assignment of a "cancel msg" since
# there's always going to be an "underlying reason" that any
@ -145,23 +174,47 @@ class Context:
return self._cancel_called
@property
def cancel_called_remote(self) -> tuple[str, str] | None:
def canceller(self) -> tuple[str, str] | None:
'''
``Actor.uid`` of the remote actor who's task was cancelled
causing this side of the context to also be cancelled.
``Actor.uid: tuple[str, str]`` of the (remote)
actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled.
'''
remote_uid = self._cancelled_remote
if remote_uid:
return tuple(remote_uid)
return self._canceller
@property
def cancelled_caught(self) -> bool:
return self._scope.cancelled_caught
return (
# the local scope was cancelled either by
# remote error or self-request
self._scope.cancelled_caught
# the local scope was never cancelled
# and instead likely we received a remote side
# cancellation that was raised inside `.result()`
or (
(se := self._local_error)
and
isinstance(se, ContextCancelled)
and (
se.canceller == self.canceller
or
se is self._remote_error
)
)
)
@property
def side(self) -> str:
'''
Return string indicating which task this instance is wrapping.
'''
return 'caller' if self._portal else 'callee'
# init and streaming state
_started_called: bool = False
_started_received: bool = False
_stream_opened: bool = False
# overrun handling machinery
@ -196,7 +249,7 @@ class Context:
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
async def _maybe_cancel_and_set_remote_error(
def _maybe_cancel_and_set_remote_error(
self,
error: BaseException,
@ -269,16 +322,19 @@ class Context:
# that error as the reason.
self._remote_error: BaseException = error
# always record the remote actor's uid since its cancellation
# state is directly linked to ours (the local one).
self._cancelled_remote = self.chan.uid
if (
isinstance(error, ContextCancelled)
):
# always record the cancelling actor's uid since its cancellation
# state is linked and we want to know which process was
# the cause / requester of the cancellation.
self._canceller = error.canceller
log.cancel(
'Remote task-context sucessfully cancelled for '
f'{self.chan.uid}:{self.cid}'
'Remote task-context was cancelled for '
f'actor: {self.chan.uid}\n'
f'task: {self.cid}\n'
f'canceller: {error.canceller}\n'
)
if self._cancel_called:
@ -289,22 +345,37 @@ class Context:
# and we **don't need to raise it** in local cancel
# scope since it will potentially override a real error.
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'Remote context error,\n'
f'remote actor: {self.chan.uid}\n'
f'task: {self.cid}\n'
f'{error}'
)
self._canceller = self.chan.uid
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
# YES! this is way better and simpler!
if self._scope:
cs: trio.CancelScope = self._scope
if (
cs
and not cs.cancel_called
and not cs.cancelled_caught
):
# TODO: we can for sure drop this right?
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
# self._cancelled_remote = self.chan.uid
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
self._scope.cancel()
# this REPL usage actually works here BD
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
@ -320,13 +391,19 @@ class Context:
Timeout quickly in an attempt to sidestep 2-generals...
'''
side: str = 'caller' if self._portal else 'callee'
side: str = self.side
log.cancel(
f'Cancelling {side} side of context to {self.chan.uid}'
)
self._cancel_called: bool = True
# caller side who entered `Portal.open_context()`
# NOTE: on the call side we never manually call
# `._scope.cancel()` since we expect the eventual
# `ContextCancelled` from the other side to trigger this
# when the runtime finally receives it during teardown
# (normally in `.result()` called from
# `Portal.open_context().__aexit__()`)
if side == 'caller':
if not self._portal:
raise RuntimeError(
@ -349,7 +426,6 @@ class Context:
'_cancel_task',
cid=cid,
)
# print("EXITING CANCEL CALL")
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
@ -368,6 +444,9 @@ class Context:
)
# callee side remote task
# NOTE: on this side we ALWAYS cancel the local scope since
# the caller expects a `ContextCancelled` to be sent from
# `._runtime._invoke()` back to the other side.
else:
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
@ -403,7 +482,7 @@ class Context:
``trio``'s cancellation system.
'''
actor = current_actor()
actor: Actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
@ -413,12 +492,34 @@ class Context:
# killed
if self._cancel_called:
task = trio.lowlevel.current_task().name
raise ContextCancelled(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
if self._remote_error:
raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different
# runtime error indicating that this task's usage of
# `Context.cancel()` and then `.open_stream()` is WRONG!
task: str = trio.lowlevel.current_task().name
raise RuntimeError(
'Stream opened after `Context.cancel()` called..?\n'
f'task: {actor.uid[0]}:{task}\n'
f'{self}'
)
if not self._portal and not self._started_called:
if (
not self._portal
and not self._started_called
):
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
@ -434,7 +535,7 @@ class Context:
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns = allow_overruns
ctx._allow_overruns: bool = allow_overruns
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
@ -444,27 +545,32 @@ class Context:
if ctx._recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
'The underlying channel for this stream was already closed!?'
)
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
# NOTE: we track all existing streams per portal for
# the purposes of attempting graceful closes on runtime
# cancel requests.
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened = True
self._stream_opened: bool = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# NOTE: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
# NOTE: Make the stream "one-shot use". On exit,
# signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end.
await stream.aclose()
finally:
@ -495,14 +601,22 @@ class Context:
# whenever ``CancelScope.cancel()`` was called) and
# instead silently reap the expected cancellation
# "error"-msg.
our_uid: tuple[str, str] = current_actor().uid
if (
isinstance(err, ContextCancelled)
and (
self._cancel_called
or self.chan._cancel_called
or tuple(err.canceller) == current_actor().uid
or self.canceller == our_uid
or tuple(err.canceller) == our_uid
)
):
# NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing"
# the error silently B)
if self._local_error is None:
self._local_error = err
return err
# NOTE: currently we are masking underlying runtime errors
@ -515,7 +629,7 @@ class Context:
# runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True
# __tracebackhide__: bool = True
raise err from None
async def result(self) -> Any | Exception:
@ -544,7 +658,6 @@ class Context:
of the remote cancellation.
'''
__tracebackhide__: bool = True
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
@ -607,13 +720,15 @@ class Context:
"Received internal error at portal?"
)
err = unpack_error(
if err:= unpack_error(
msg,
self._portal.channel
) # from msgerr
): # from msgerr
self._maybe_cancel_and_set_remote_error(err)
self._maybe_raise_remote_err(err)
err = self._maybe_raise_remote_err(err)
self._remote_error = err
else:
raise
if re := self._remote_error:
return self._maybe_raise_remote_err(re)
@ -724,13 +839,17 @@ class Context:
f"Delivering {msg} from {uid} to caller {cid}"
)
error = msg.get('error')
if error := unpack_error(
msg,
self.chan,
if (
msg.get('error') # check for field
and (
error := unpack_error(
msg,
self.chan,
)
)
):
self._cancel_msg = msg
await self._maybe_cancel_and_set_remote_error(error)
self._maybe_cancel_and_set_remote_error(error)
if (
self._in_overrun
@ -765,7 +884,7 @@ class Context:
# XXX: always push an error even if the local
# receiver is in overrun state.
# await self._maybe_cancel_and_set_remote_error(msg)
# self._maybe_cancel_and_set_remote_error(msg)
local_uid = current_actor().uid
lines = [

View File

@ -14,15 +14,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
Our classy exception set.
"""
'''
from __future__ import annotations
import builtins
import importlib
from pprint import pformat
from typing import (
Any,
Type,
TYPE_CHECKING,
)
import traceback
@ -31,6 +34,11 @@ import trio
from ._state import current_actor
if TYPE_CHECKING:
from ._context import Context
from ._stream import MsgStream
from .log import StackLevelAdapter
_this_mod = importlib.import_module(__name__)
@ -38,12 +46,17 @@ class ActorFailure(Exception):
"General actor failure"
# TODO: rename to just `RemoteError`?
class RemoteActorError(Exception):
'''
Remote actor exception bundled locally
A box(ing) type which bundles a remote actor `BaseException` for
(near identical, and only if possible,) local object/instance
re-construction in the local process memory domain.
Normally each instance is expected to be constructed from
a special "error" IPC msg sent by some remote actor-runtime.
'''
# TODO: local recontruction of remote exception deats
def __init__(
self,
message: str,
@ -53,13 +66,36 @@ class RemoteActorError(Exception):
) -> None:
super().__init__(message)
self.type = suberror_type
self.msgdata = msgdata
# TODO: maybe a better name?
# - .errtype
# - .retype
# - .boxed_errtype
# - .boxed_type
# - .remote_type
# also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5
self.type: str = suberror_type
self.msgdata: dict[str, Any] = msgdata
@property
def src_actor_uid(self) -> tuple[str, str] | None:
return self.msgdata.get('src_actor_uid')
def __repr__(self) -> str:
if remote_tb := self.msgdata.get('tb_str'):
pformat(remote_tb)
return (
f'{type(self).__name__}(\n'
f'msgdata={pformat(self.msgdata)}\n'
')'
)
return super().__repr__()
# TODO: local recontruction of remote exception deats
# def unbox(self) -> BaseException:
# ...
class InternalActorError(RemoteActorError):
'''
@ -98,8 +134,19 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
class StreamOverrun(trio.TooSlowError):
"This stream was overrun by sender"
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class AsyncioCancelled(Exception):
@ -110,6 +157,9 @@ class AsyncioCancelled(Exception):
'''
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
def pack_error(
exc: BaseException,
@ -136,7 +186,15 @@ def pack_error(
'src_actor_uid': current_actor().uid,
}
if isinstance(exc, ContextCancelled):
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if (
isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
):
error_msg.update(exc.msgdata)
return {'error': error_msg}
@ -146,7 +204,8 @@ def unpack_error(
msg: dict[str, Any],
chan=None,
err_type=RemoteActorError
err_type=RemoteActorError,
hide_tb: bool = True,
) -> None | Exception:
'''
@ -157,7 +216,7 @@ def unpack_error(
which is the responsibilitiy of the caller.
'''
__tracebackhide__: bool = True
__tracebackhide__: bool = hide_tb
error_dict: dict[str, dict] | None
if (
@ -214,3 +273,93 @@ def is_multi_cancelled(exc: BaseException) -> bool:
) is not None
return False
def _raise_from_no_key_in_msg(
ctx: Context,
msg: dict,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
stream: MsgStream | None = None,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}\n'
) from src_err
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(
msg,
ctx.chan,
) from None
elif (
msg.get('stop')
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# TODO: if the a local task is already blocking on
# a `Context.result()` and thus a `.receive()` on the
# rx-chan, we close the chan and set state ensuring that
# an eoc is raised!
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
f'Context stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
if (
stream
and stream._closed
):
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
) from src_err

View File

@ -294,9 +294,11 @@ class Channel:
self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False
# flag set on ``Portal.cancel_actor()`` indicating
# remote (peer) cancellation of the far end actor runtime.
self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
# flag set by ``Portal.cancel_actor()`` indicating remote
# (possibly peer) cancellation of the far end actor
# runtime.
self._cancel_called: bool = False
@classmethod
def from_stream(
@ -327,8 +329,11 @@ class Channel:
def __repr__(self) -> str:
if self.msgstream:
return repr(
self.msgstream.stream.socket._sock).replace( # type: ignore
"socket.socket", "Channel")
self.msgstream.stream.socket._sock
).replace( # type: ignore
"socket.socket",
"Channel",
)
return object.__repr__(self)
@property

View File

@ -33,7 +33,6 @@ from typing import (
)
from functools import partial
from dataclasses import dataclass
from pprint import pformat
import warnings
import trio
@ -45,12 +44,17 @@ from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
from ._exceptions import (
_raise_from_no_key_in_msg,
unpack_error,
NoResult,
ContextCancelled,
)
from ._context import Context
from ._streaming import MsgStream
from ._context import (
Context,
)
from ._streaming import (
MsgStream,
)
log = get_logger(__name__)
@ -64,15 +68,10 @@ def _unwrap_msg(
__tracebackhide__ = True
try:
return msg['return']
except KeyError:
except KeyError as ke:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel) from None
# TODO: maybe move this to ._exceptions?
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
raise unpack_error(msg, channel) from ke
class Portal:
@ -219,14 +218,18 @@ class Portal:
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield
# XXX: sure would be nice to make this work with
# a proper shield
with trio.move_on_after(
timeout
or self.cancel_timeout
) as cs:
cs.shield = True
await self.run_from_ns('self', 'cancel')
await self.run_from_ns(
'self',
'cancel',
)
return True
if cs.cancelled_caught:
@ -461,25 +464,18 @@ class Portal:
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first = msg['started']
first: Any = msg['started']
ctx._started_called: bool = True
except KeyError:
if not (cid := msg.get('cid')):
raise MessagingError(
'Received internal error at context?\n'
'No call-id (cid) in startup msg?'
)
except KeyError as src_error:
if msg.get('error'):
# NOTE: mask the key error with the remote one
raise unpack_error(msg, self.channel) from None
else:
raise MessagingError(
f'Context for {cid} was expecting a `started` message'
' but received a non-error msg:\n'
f'{pformat(msg)}'
)
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
src_err=src_error,
log=log,
expect_key='started',
)
ctx._portal: Portal = self
uid: tuple = self.channel.uid
@ -516,57 +512,102 @@ class Portal:
# started in the ctx nursery.
ctx._scope.cancel()
# XXX: (maybe) shield/mask context-cancellations that were
# initiated by any of the context's 2 tasks. There are
# subsequently 2 operating cases for a "graceful cancel"
# of a `Context`:
#
# 1.*this* side's task called `Context.cancel()`, in
# which case we mask the `ContextCancelled` from bubbling
# to the opener (much like how `trio.Nursery` swallows
# any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
# XXX NOTE XXX: maybe shield against
# self-context-cancellation (which raises a local
# `ContextCancelled`) when requested (via
# `Context.cancel()`) by the same task (tree) which entered
# THIS `.open_context()`.
#
# 2.*the other* side's (callee/spawned) task cancelled due
# to a self or peer cancellation request in which case we
# DO let the error bubble to the opener.
# NOTE: There are 2 operating cases for a "graceful cancel"
# of a `Context`. In both cases any `ContextCancelled`
# raised in this scope-block came from a transport msg
# relayed from some remote-actor-task which our runtime set
# as a `Context._remote_error`
#
# the CASES:
#
# - if that context IS THE SAME ONE that called
# `Context.cancel()`, we want to absorb the error
# silently and let this `.open_context()` block to exit
# without raising.
#
# - if it is from some OTHER context (we did NOT call
# `.cancel()`), we want to re-RAISE IT whilst also
# setting our own ctx's "reason for cancel" to be that
# other context's cancellation condition; we set our
# `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`.
#
# Again, there are 2 cases:
#
# 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc:
scope_err = ctxc
# CASE 1: this context was never cancelled
# via a local task's call to `Context.cancel()`.
if not ctx._cancel_called:
# XXX: this should NEVER happen!
# from ._debug import breakpoint
# await breakpoint()
raise
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# exit silently.
else:
if (
ctx._cancel_called
and (
ctxc is ctx._remote_error
or
ctxc.canceller is self.canceller
)
):
log.debug(
f'Context {ctx} cancelled gracefully with:\n'
f'{ctxc}'
)
# CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else!
else:
raise
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except (
# - a standard error in the caller/yieldee
# CASE 3: standard local error in this caller/yieldee
Exception,
# - a runtime teardown exception-group and/or
# cancellation request from a caller task.
BaseExceptionGroup,
trio.Cancelled,
# CASES 1 & 2: normally manifested as
# a `Context._scope_nursery` raised
# exception-group of,
# 1.-`trio.Cancelled`s, since
# `._scope.cancel()` will have been called and any
# `ContextCancelled` absorbed and thus NOT RAISED in
# any `Context._maybe_raise_remote_err()`,
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error raised in the "callee" side with
# a group only raised if there was any more then one
# task started here in the "caller" in the
# `yield`-ed to task.
BaseExceptionGroup, # since overrun handler tasks may have been spawned
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt,
) as err:
scope_err = err
# XXX: request cancel of this context on any error.
# NOTE: `Context.cancel()` is conversely NOT called in
# the `ContextCancelled` "cancellation requested" case
# above.
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
# the `ContextCancelled` "self cancellation absorbed" case
# handled in the block above!
log.cancel(
'Context cancelled for task due to\n'
f'{err}\n'
@ -585,7 +626,7 @@ class Portal:
raise # duh
# no scope error case
# no local scope error, the "clean exit with a result" case.
else:
if ctx.chan.connected():
log.info(
@ -599,15 +640,27 @@ class Portal:
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`
# which IS SET any time the far end fails and
# causes "caller side" cancellation via
# a `ContextCancelled` here.
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
# `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
# result = await ctx.result()
try:
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but
# this task didn't know until final teardown
# / value collection.
scope_err = berr
raise
finally:
# though it should be impossible for any tasks
@ -657,12 +710,14 @@ class Portal:
with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose()
# XXX: since we always (maybe) re-raise (and thus also
# mask runtime machinery related
# multi-`trio.Cancelled`s) any scope error which was
# the underlying cause of this context's exit, add
# different log msgs for each of the (2) cases.
# XXX: we always raise remote errors locally and
# generally speaking mask runtime-machinery related
# multi-`trio.Cancelled`s. As such, any `scope_error`
# which was the underlying cause of this context's exit
# should be stored as the `Context._local_error` and
# used in determining `Context.cancelled_caught: bool`.
if scope_err is not None:
ctx._local_error: BaseException = scope_err
etype: Type[BaseException] = type(scope_err)
# CASE 2
@ -691,7 +746,7 @@ class Portal:
# child has already cleared it and clobbered IPC.
# FINALLY, remove the context from runtime tracking and
# exit Bo
# exit!
self.actor._contexts.pop(
(self.channel.uid, ctx.cid),
None,

View File

@ -25,7 +25,6 @@ import logging
import signal
import sys
import os
import typing
import warnings

View File

@ -15,7 +15,10 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Actor primitives and helpers
The fundamental core machinery implementing every "actor" including
the process-local (python-interpreter global) `Actor` state-type
primitive(s), RPC-in-task scheduling, and IPC connectivity and
low-level transport msg handling.
"""
from __future__ import annotations
@ -41,8 +44,14 @@ import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore
from trio_typing import TaskStatus
import trio
from trio import (
CancelScope,
)
from trio_typing import (
Nursery,
TaskStatus,
)
from ._ipc import Channel
from ._context import (
@ -86,21 +95,22 @@ async def _invoke(
] = trio.TASK_STATUS_IGNORED,
):
'''
Invoke local func and deliver result(s) over provided channel.
Schedule a `trio` task-as-func and deliver result(s) over
connected IPC channel.
This is the core "RPC task" starting machinery.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`.
'''
__tracebackhide__ = True
treat_as_gen: bool = False
failed_resp: bool = False
# possibly a traceback (not sure what typing is for this..)
tb = None
cancel_scope = trio.CancelScope()
cancel_scope = CancelScope()
# activated cancel scope ref
cs: trio.CancelScope | None = None
cs: CancelScope | None = None
ctx = actor.get_context(
chan,
@ -112,6 +122,7 @@ async def _invoke(
)
context: bool = False
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
@ -153,6 +164,7 @@ async def _invoke(
except TypeError:
raise
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope
@ -183,6 +195,7 @@ async def _invoke(
await chan.send({'stop': True, 'cid': cid})
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push
@ -199,6 +212,20 @@ async def _invoke(
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
# our most general case: a remote SC-transitive,
# IPC-linked, cross-actor-task "context"
# ------ - ------
# TODO: every other "func type" should be implemented from
# a special case of this impl eventually!
# -[ ] streaming funcs should instead of being async-for
# handled directly here wrapped in
# a async-with-open_stream() closure that does the
# normal thing you'd expect a far end streaming context
# to (if written by the app-dev).
# -[ ] one off async funcs can literally just be called
# here and awaited directly, possibly just with a small
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
@ -209,21 +236,30 @@ async def _invoke(
ctx._scope = nurse.cancel_scope
task_status.started(ctx)
res = await coro
await chan.send({'return': res, 'cid': cid})
await chan.send({
'return': res,
'cid': cid
})
# XXX: do we ever trigger this block any more?
except (
BaseExceptionGroup,
trio.Cancelled,
):
# if a context error was set then likely
# thei multierror was raised due to that
if ctx._remote_error is not None:
raise ctx._remote_error
) as scope_error:
# maybe TODO: pack in ``trio.Cancelled.__traceback__`` here
# so they can be unwrapped and displayed on the caller
# side?
# always set this (callee) side's exception as the
# local error on the context
ctx._local_error: BaseException = scope_error
# if a remote error was set then likely the
# exception group was raised due to that, so
# and we instead raise that error immediately!
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
# maybe TODO: pack in
# ``trio.Cancelled.__traceback__`` here so they can
# be unwrapped and displayed on the caller side?
raise
finally:
@ -234,11 +270,11 @@ async def _invoke(
# don't pop the local context until we know the
# associated child isn't in debug any more
await _debug.maybe_wait_for_debugger()
ctx = actor._contexts.pop((chan.uid, cid))
if ctx:
log.runtime(
f'Context entrypoint {func} was terminated:\n{ctx}'
)
ctx: Context = actor._contexts.pop((chan.uid, cid))
log.runtime(
f'Context entrypoint {func} was terminated:\n'
f'{ctx}'
)
if ctx.cancelled_caught:
@ -246,44 +282,46 @@ async def _invoke(
# before raising any context cancelled case
# so that real remote errors don't get masked as
# ``ContextCancelled``s.
re = ctx._remote_error
if re:
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
fname = func.__name__
cs: trio.CancelScope = ctx._scope
fname: str = func.__name__
cs: CancelScope = ctx._scope
if cs.cancel_called:
canceller = ctx._cancelled_remote
# await _debug.breakpoint()
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller
msg: str = (
f'`{fname}()`@{our_uid} cancelled by '
)
# NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call
# ``Context.cancel()`` directly, we're going to
# need to change this logic branch since it will
# always enter..
# need to change this logic branch since it
# will always enter..
if ctx._cancel_called:
msg = f'`{fname}()`@{actor.uid} cancelled itself'
else:
msg = (
f'`{fname}()`@{actor.uid} '
'was remotely cancelled by '
)
# TODO: test for this!!!!!
canceller: tuple = our_uid
msg += 'itself '
# if the channel which spawned the ctx is the
# one that cancelled it then we report that, vs.
# it being some other random actor that for ex.
# some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx.
if canceller == ctx.chan.uid:
msg += f'its caller {canceller}'
elif canceller == ctx.chan.uid:
msg += f'its caller {canceller} '
else:
msg += f'remote actor {canceller}'
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += f' with msg:\n{ctx._cancel_msg}'
msg += (
' with msg:\n'
f'{ctx._cancel_msg}'
)
# task-contex was either cancelled by request using
# ``Portal.cancel_actor()`` or ``Context.cancel()``
@ -296,37 +334,76 @@ async def _invoke(
canceller=canceller,
)
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
else:
# regular async function
try:
await chan.send({'functype': 'asyncfunc', 'cid': cid})
except trio.BrokenResourceError:
await chan.send({
'functype': 'asyncfunc',
'cid': cid
})
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
failed_resp = True
if is_rpc:
raise
else:
# TODO: should this be an `.exception()` call?
log.warning(
f'Failed to respond to non-rpc request: {func}'
f'Failed to respond to non-rpc request: {func}\n'
f'{ipc_err}'
)
with cancel_scope as cs:
ctx._scope = cs
ctx._scope: CancelScope = cs
task_status.started(ctx)
result = await coro
fname = func.__name__
fname: str = func.__name__
log.runtime(f'{fname}() result: {result}')
if not failed_resp:
# only send result if we know IPC isn't down
await chan.send(
{'return': result,
'cid': cid}
)
# NOTE: only send result if we know IPC isn't down
if (
not failed_resp
and chan.connected()
):
try:
await chan.send(
{'return': result,
'cid': cid}
)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to return result:\n'
f'{func}@{actor.uid}\n'
f'remote chan: {chan.uid}'
)
except (
Exception,
BaseExceptionGroup,
) as err:
# always hide this frame from debug REPL if the crash
# originated from an rpc task and we DID NOT fail
# due to an IPC transport error!
if (
is_rpc
and chan.connected()
):
__tracebackhide__: bool = True
if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
@ -360,24 +437,31 @@ async def _invoke(
log.exception("Actor crashed:")
# always ship errors back to caller
err_msg = pack_error(err, tb=tb)
err_msg: dict[str, dict] = pack_error(
err,
tb=tb,
)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
if is_rpc:
try:
await chan.send(err_msg)
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
):
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?"
)
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?\n"
f'{ipc_err}'
)
# error is probably from above coro running code *not from the
# underlyingn rpc invocation* since a scope was never allocated
@ -403,7 +487,11 @@ async def _invoke(
log.warning(
f"Task {func} likely errored or cancelled before start")
else:
log.cancel(f'{func.__name__}({kwargs}) failed?')
log.cancel(
'Failed to de-alloc internal task!?\n'
f'cid: {cid}\n'
f'{func.__name__}({kwargs})'
)
finally:
if not actor._rpc_tasks:
@ -420,7 +508,7 @@ async def try_ship_error_to_parent(
err: Union[Exception, BaseExceptionGroup],
) -> None:
with trio.CancelScope(shield=True):
with CancelScope(shield=True):
try:
# internal error so ship to parent without cid
await channel.send(pack_error(err))
@ -467,13 +555,13 @@ class Actor:
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
_root_n: trio.Nursery | None = None
_service_n: trio.Nursery | None = None
_server_n: trio.Nursery | None = None
_root_n: Nursery | None = None
_service_n: Nursery | None = None
_server_n: Nursery | None = None
# Information about `__main__` from parent
_parent_main_data: dict[str, str]
_parent_chan_cs: trio.CancelScope | None = None
_parent_chan_cs: CancelScope | None = None
# syncs for setup/teardown sequences
_server_down: trio.Event | None = None
@ -1005,7 +1093,7 @@ class Actor:
async def _serve_forever(
self,
handler_nursery: trio.Nursery,
handler_nursery: Nursery,
*,
# (host, port) to bind for channel server
accept_host: tuple[str, int] | None = None,
@ -1073,12 +1161,17 @@ class Actor:
- return control the parent channel message loop
'''
log.cancel(f"{self.uid} is trying to cancel")
log.cancel(
f'{self.uid} requested to cancel by:\n'
f'{requesting_uid}'
)
# TODO: what happens here when we self-cancel tho?
self._cancel_called_by_remote: tuple = requesting_uid
self._cancel_called = True
# cancel all ongoing rpc tasks
with trio.CancelScope(shield=True):
with CancelScope(shield=True):
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
@ -1088,7 +1181,9 @@ class Actor:
dbcs.cancel()
# kill all ongoing tasks
await self.cancel_rpc_tasks(requesting_uid=requesting_uid)
await self.cancel_rpc_tasks(
requesting_uid=requesting_uid,
)
# stop channel server
self.cancel_server()
@ -1118,8 +1213,8 @@ class Actor:
self,
cid: str,
chan: Channel,
requesting_uid: tuple[str, str] | None = None,
) -> bool:
'''
Cancel a local task by call-id / channel.
@ -1136,7 +1231,7 @@ class Actor:
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope = ctx._scope
scope: CancelScope = ctx._scope
except KeyError:
log.cancel(f"{cid} has already completed/terminated?")
return True
@ -1146,10 +1241,10 @@ class Actor:
f"peer: {chan.uid}\n")
if (
ctx._cancelled_remote is None
ctx._canceller is None
and requesting_uid
):
ctx._cancelled_remote: tuple = requesting_uid
ctx._canceller: tuple = requesting_uid
# don't allow cancelling this function mid-execution
# (is this necessary?)
@ -1159,6 +1254,7 @@ class Actor:
# TODO: shouldn't we eventually be calling ``Context.cancel()``
# directly here instead (since that method can handle both
# side's calls into it?
# await ctx.cancel()
scope.cancel()
# wait for _invoke to mark the task complete
@ -1186,9 +1282,12 @@ class Actor:
registered for each.
'''
tasks = self._rpc_tasks
tasks: dict = self._rpc_tasks
if tasks:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
log.cancel(
f'Cancelling all {len(tasks)} rpc tasks:\n'
f'{tasks}'
)
for (
(chan, cid),
(ctx, func, is_complete),
@ -1206,7 +1305,9 @@ class Actor:
)
log.cancel(
f"Waiting for remaining rpc tasks to complete {tasks}")
'Waiting for remaining rpc tasks to complete:\n'
f'{tasks}'
)
await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None:
@ -1436,7 +1537,7 @@ async def async_main(
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if actor.name == 'brokerd.ib':
# with trio.CancelScope(shield=True):
# with CancelScope(shield=True):
# await _debug.breakpoint()
actor.lifetime_stack.close()
@ -1472,7 +1573,7 @@ async def async_main(
):
log.runtime(
f"Waiting for remaining peers {actor._peers} to clear")
with trio.CancelScope(shield=True):
with CancelScope(shield=True):
await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
@ -1483,7 +1584,7 @@ async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
@ -1501,7 +1602,7 @@ async def process_messages(
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try:
with trio.CancelScope(shield=shield) as loop_cs:
with CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
@ -1563,18 +1664,18 @@ async def process_messages(
if ns == 'self':
if funcname == 'cancel':
func = actor.cancel
func: Callable = actor.cancel
kwargs['requesting_uid'] = chan.uid
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete = _debug.Lock.local_pdb_complete
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
with CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``async_main()``
@ -1602,7 +1703,7 @@ async def process_messages(
# we immediately start the runtime machinery
# shutdown
# with trio.CancelScope(shield=True):
# with CancelScope(shield=True):
kwargs['chan'] = chan
target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid
@ -1627,7 +1728,7 @@ async def process_messages(
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func = getattr(actor, funcname)
func: Callable = getattr(actor, funcname)
else:
# complain to client about restricted modules
@ -1717,9 +1818,10 @@ async def process_messages(
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task:
sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called
sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)

View File

@ -204,6 +204,21 @@ async def do_hard_kill(
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
Used in 2 main cases:
- "unknown remote runtime state": a hanging/stalled actor that
isn't responding after sending a (graceful) runtime cancel
request via an IPC msg.
- "cancelled during spawn": a process who's actor runtime was
cancelled before full startup completed (such that
cancel-request-handling machinery was never fully
initialized) and thus a "cancel request msg" is never going
to be handled.
'''
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
@ -219,6 +234,9 @@ async def do_hard_kill(
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
#
# This code was originally triggred by ``proc.__aexit__()``
# but now must be called manually.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
@ -234,10 +252,14 @@ async def do_hard_kill(
with trio.CancelScope(shield=True):
await proc.wait()
# XXX NOTE XXX: zombie squad dispatch:
# (should ideally never, but) If we do get here it means
# graceful termination of a process failed and we need to
# resort to OS level signalling to interrupt and cancel the
# (presumably stalled or hung) actor. Since we never allow
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and
# into here.
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
proc.kill()
@ -252,10 +274,13 @@ async def soft_wait(
portal: Portal,
) -> None:
# Wait for proc termination but **dont' yet** call
# ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace).
# This is a "soft" (cancellable) join/reap.
'''
Wait for proc termination but **dont' yet** teardown
std-streams (since it will clobber any ongoing pdb REPL
session). This is our "soft" (and thus itself cancellable)
join/reap on an actor-runtime-in-process.
'''
uid = portal.channel.uid
try:
log.cancel(f'Soft waiting on actor:\n{uid}')
@ -278,7 +303,13 @@ async def soft_wait(
await wait_func(proc)
n.cancel_scope.cancel()
# start a task to wait on the termination of the
# process by itself waiting on a (caller provided) wait
# function which should unblock when the target process
# has terminated.
n.start_soon(cancel_on_proc_deth)
# send the actor-runtime a cancel request.
await portal.cancel_actor()
if proc.poll() is None: # type: ignore

View File

@ -34,7 +34,7 @@ import warnings
import trio
from ._exceptions import (
unpack_error,
_raise_from_no_key_in_msg,
)
from .log import get_logger
from .trionics import (
@ -54,61 +54,6 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or stream._eoc:
log.debug(f"{stream} was stopped at remote end")
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error
# case is activated above.
raise src_err
# raise RuntimeError(
# 'Unknown non-yield stream msg?\n'
# f'{msg}'
# )
class MsgStream(trio.abc.Channel):
'''
A bidirectional message stream for receiving logically sequenced
@ -148,10 +93,13 @@ class MsgStream(trio.abc.Channel):
try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
async def receive(self):
@ -161,6 +109,16 @@ class MsgStream(trio.abc.Channel):
determined by the underlying protocol).
'''
# NOTE: `trio.ReceiveChannel` implements
# EOC handling as follows (aka uses it
# to gracefully exit async for loops):
#
# async def __anext__(self) -> ReceiveType:
# try:
# return await self.receive()
# except trio.EndOfChannel:
# raise StopAsyncIteration
# see ``.aclose()`` for notes on the old behaviour prior to
# introducing this
if self._eoc:
@ -174,10 +132,13 @@ class MsgStream(trio.abc.Channel):
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
except (

View File

@ -48,12 +48,15 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = {
LEVELS: dict[str, int] = {
'TRANSPORT': 5,
'RUNTIME': 15,
'CANCEL': 16,
'PDB': 500,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
# }
STD_PALETTE = {
'CRITICAL': 'red',
@ -102,7 +105,11 @@ class StackLevelAdapter(logging.LoggerAdapter):
Cancellation logging, mostly for runtime reporting.
'''
return self.log(16, msg)
return self.log(
level=16,
msg=msg,
# stacklevel=4,
)
def pdb(
self,
@ -114,14 +121,37 @@ class StackLevelAdapter(logging.LoggerAdapter):
'''
return self.log(500, msg)
def log(self, level, msg, *args, **kwargs):
"""
def log(
self,
level,
msg,
*args,
**kwargs,
):
'''
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
"""
'''
if self.isEnabledFor(level):
stacklevel: int = 3
if (
level in LEVELS.values()
# or level in _custom_levels
):
stacklevel: int = 4
# msg, kwargs = self.process(msg, kwargs)
self._log(level, msg, args, **kwargs)
self._log(
level=level,
msg=msg,
args=args,
# NOTE: not sure how this worked before but, it
# seems with our custom level methods defined above
# we do indeed (now) require another stack level??
stacklevel=stacklevel,
**kwargs,
)
# LOL, the stdlib doesn't allow passing through ``stacklevel``..
def _log(
@ -134,12 +164,15 @@ class StackLevelAdapter(logging.LoggerAdapter):
stack_info=False,
# XXX: bit we added to show fileinfo from actual caller.
# this level then ``.log()`` then finally the caller's level..
stacklevel=3,
# - this level
# - then ``.log()``
# - then finally the caller's level..
stacklevel=4,
):
"""
'''
Low-level log implementation, proxied to allow nested logger adapters.
"""
'''
return self.logger._log(
level,
msg,

View File

@ -19,22 +19,13 @@ Sugary patterns for trio + tractor designs.
'''
from ._mngrs import (
gather_contexts,
maybe_open_context,
maybe_open_nursery,
gather_contexts as gather_contexts,
maybe_open_context as maybe_open_context,
maybe_open_nursery as maybe_open_nursery,
)
from ._broadcast import (
broadcast_receiver,
BroadcastReceiver,
Lagged,
AsyncReceiver as AsyncReceiver,
broadcast_receiver as broadcast_receiver,
BroadcastReceiver as BroadcastReceiver,
Lagged as Lagged,
)
__all__ = [
'gather_contexts',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',
'maybe_open_context',
'maybe_open_nursery',
]

View File

@ -225,6 +225,7 @@ async def maybe_open_context(
# yielded output
yielded: Any = None
lock_registered: bool = False
# Lock resource acquisition around task racing / ``trio``'s
# scheduler protocol.
@ -232,6 +233,7 @@ async def maybe_open_context(
# to allow re-entrant use cases where one `maybe_open_context()`
# wrapped factor may want to call into another.
lock = _Cache.locks.setdefault(fid, trio.Lock())
lock_registered: bool = True
await lock.acquire()
# XXX: one singleton nursery per actor and we want to
@ -291,4 +293,9 @@ async def maybe_open_context(
_, no_more_users = entry
no_more_users.set()
_Cache.locks.pop(fid)
if lock_registered:
maybe_lock = _Cache.locks.pop(fid, None)
if maybe_lock is None:
log.error(
f'Resource lock for {fid} ALREADY POPPED?'
)