Compare commits

..

No commits in common. "a55ea18c7d2654c414ae12e35611cf404d45a8b8" and "cef9ab73534220eac3703161e08a256aeedd7436" have entirely different histories.

7 changed files with 273 additions and 512 deletions

View File

@ -74,15 +74,7 @@ def pytest_configure(config):
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
orig = tractor.log._default_loglevel
level_from_cli = request.config.option.loglevel
# disable built-in capture when user passes the `--ll` value
# presuming they already know they want to see console logging
# and don't need it repeated by pytest.
if level_from_cli:
request.config.option.showcapture = 'no'
level = tractor.log._default_loglevel = level_from_cli
level = tractor.log._default_loglevel = request.config.option.loglevel
yield level
tractor.log._default_loglevel = orig

View File

@ -23,9 +23,9 @@ async def sleep_forever():
await trio.sleep_forever()
async def do_nuthin(sleep=0):
async def do_nuthin():
# just nick the scheduler
await trio.sleep(sleep)
await trio.sleep(0)
@pytest.mark.parametrize(
@ -100,7 +100,6 @@ def test_multierror(arb_addr):
@pytest.mark.parametrize('delay', (0, 0.5))
@pytest.mark.parametrize(
'num_subactors', range(25, 26),
# 'num_subactors', range(2, 3),
)
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``trio.MultiError`` out of a nursery where
@ -223,7 +222,7 @@ async def test_cancel_infinite_streamer(start_method):
# daemon complete quickly delay while single task
# actors error after brief delay
(3, tractor.MultiError, AssertionError,
(assert_err, {'delay': 1}), (do_nuthin, {'sleep': 0}, False)),
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
],
ids=[
'1_run_in_actor_fails',
@ -326,7 +325,6 @@ async def spawn_and_error(breadth, depth) -> None:
)
kwargs = {
'name': f'{name}_errorer_{i}',
# 'delay': 0.01,
}
await nursery.run_in_actor(*args, **kwargs)
@ -391,23 +389,13 @@ async def test_nested_multierrors(loglevel, start_method):
# on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead
if platform.system() == 'Windows':
assert subexc.type in (
trio.MultiError,
tractor.RemoteActorError,
)
assert (subexc.type is trio.MultiError) or (
subexc.type is tractor.RemoteActorError)
else:
assert subexc.type in (
trio.MultiError,
trio.Cancelled,
# tractor.RemoteActorError,
)
assert subexc.type is trio.MultiError
else:
assert subexc.type in (
tractor.RemoteActorError,
trio.Cancelled,
)
assert (subexc.type is tractor.RemoteActorError) or (
subexc.type is trio.Cancelled)
else:
pytest.fail(f'Got no error from nursery?')

View File

@ -180,7 +180,6 @@ def test_multi_actor_subs_arbiter_pub(
'streamer',
enable_modules=[__name__],
)
name = 'streamer'
even_portal = await n.run_in_actor(
subs,

View File

@ -58,7 +58,7 @@ async def _invoke(
Invoke local func and deliver result(s) over provided channel.
'''
# __tracebackhide__ = True
__tracebackhide__ = True
treat_as_gen = False
# possible a traceback (not sure what typing is for this..)
@ -69,7 +69,6 @@ async def _invoke(
ctx = Context(chan, cid)
context: bool = False
fname = func.__name__
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
@ -165,7 +164,6 @@ async def _invoke(
await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err:
tb = err.__traceback__
raise
if cs.cancelled_caught:
@ -173,6 +171,7 @@ async def _invoke(
# so they can be unwrapped and displayed on the caller
# side!
fname = func.__name__
if ctx._cancel_called:
msg = f'{fname} cancelled itself'
@ -193,33 +192,9 @@ async def _invoke(
await chan.send({'functype': 'asyncfunc', 'cid': cid})
with cancel_scope as cs:
task_status.started(cs)
try:
await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err:
tb = err.__traceback__
raise
# await chan.send({'return': await coro, 'cid': cid})
if cs.cancelled_caught:
# if cs.cancel_called:
if cs.cancel_called:
msg = (
f'{fname} was remotely cancelled by its caller '
f'{ctx.chan.uid}'
)
else:
msg = f'{fname} cancelled itself'
raise ContextCancelled(
msg,
suberror_type=trio.Cancelled,
)
except (
Exception,
trio.MultiError,
# trio.Cancelled,
) as err:
except (Exception, trio.MultiError) as err:
if not is_multi_cancelled(err):
@ -273,7 +248,7 @@ async def _invoke(
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warning(
f"Task {func} likely errored or cancelled before start")
f"Task {func} likely errored or cancelled before it started")
finally:
if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed")
@ -383,14 +358,6 @@ class Actor:
Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def wait_for_peer(
self, uid: Tuple[str, str]
) -> Tuple[trio.Event, Channel]:
@ -576,8 +543,7 @@ class Actor:
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
assert send_chan.cid == cid # type: ignore
if 'error' in msg:
recv_chan
# if 'error' in msg:
# ctx = getattr(recv_chan, '_ctx', None)
# if ctx:
# ctx._error_from_remote_msg(msg)
@ -652,7 +618,6 @@ class Actor:
# worked out we'll likely want to use that!
msg = None
nursery_cancelled_before_task: bool = False
uid = chan.uid
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try:
@ -676,7 +641,7 @@ class Actor:
log.runtime(
f"Msg loop signalled to terminate for"
f" {chan} from {uid}")
f" {chan} from {chan.uid}")
break
@ -711,46 +676,38 @@ class Actor:
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
if funcname == 'cancel':
# self.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
log.cancel(
f"{self.uid} remote cancel msg from {uid}")
# don't start entire actor runtime
# cancellation if this actor is in debug
# mode
# don't start entire actor runtime cancellation if this
# actor is in debug mode
pdb_complete = _debug._local_pdb_complete
if pdb_complete:
log.cancel(
f'{self.uid} is in debug, wait for unlock')
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
# we immediately start the runtime machinery shutdown
with trio.CancelScope(shield=True):
await _invoke(
self, cid, chan, func, kwargs, is_rpc=False
)
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
# await self._cancel_complete.wait()
loop_cs.cancel()
continue
break
if funcname == '_cancel_task':
task_cid = kwargs['cid']
log.cancel(
f'Actor {uid} requests cancel for {task_cid}')
# we immediately start the runtime machinery
# shutdown
# we immediately start the runtime machinery shutdown
with trio.CancelScope(shield=True):
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
kwargs['chan'] = chan
await _invoke(
self, cid, chan, func, kwargs, is_rpc=False
)
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
continue
else:
# complain to client about restricted modules
@ -795,8 +752,7 @@ class Actor:
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
# end of async for, channel disconnect vis
# ``trio.EndOfChannel``
# end of async for, channel disconnect vis ``trio.EndOfChannel``
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
@ -1043,7 +999,7 @@ class Actor:
raise
finally:
log.runtime("root runtime nursery complete")
log.info("Runtime nursery complete")
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
@ -1237,10 +1193,7 @@ class Actor:
tasks = self._rpc_tasks
if tasks:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (
(chan, cid),
(scope, func, is_complete),
) in tasks.copy().items():
for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
if only_chan is not None:
if only_chan != chan:
continue
@ -1249,7 +1202,6 @@ class Actor:
if func != self._cancel_task:
await self._cancel_task(cid, chan)
if tasks:
log.cancel(
f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait()

View File

@ -84,15 +84,6 @@ class Portal:
] = None
self._streams: Set[ReceiveMsgStream] = set()
self.actor = current_actor()
self._cancel_called: bool = False
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def _submit(
self,
@ -138,7 +129,7 @@ class Portal:
resptype: str,
first_msg: dict
) -> Any:
# __tracebackhide__ = True
__tracebackhide__ = True
assert resptype == 'asyncfunc' # single response
msg = await recv_chan.receive()
@ -154,7 +145,7 @@ class Portal:
Return the result(s) from the remote actor's "main" task.
"""
# __tracebackhide__ = True
__tracebackhide__ = True
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
@ -206,16 +197,9 @@ class Portal:
# we'll need to .aclose all those channels here
await self._cancel_streams()
async def cancel_actor(self) -> None:
'''
Cancel the actor on the other end of this portal.
That means cancelling the "actor runtime" not just any one
task that's running there.
'''
self._cancel_called = True
async def cancel_actor(self):
"""Cancel the actor on the other end of this portal.
"""
if not self.channel.connected():
log.cancel("This portal is already closed can't cancel")
return False
@ -223,8 +207,8 @@ class Portal:
await self._cancel_streams()
log.cancel(
f"Sending runtime cancel msg to {self.channel.uid} @ "
f"{self.channel.raddr}")
f"Sending actor cancel request to {self.channel.uid} on "
f"{self.channel}")
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield

View File

@ -31,12 +31,8 @@ from .log import get_logger
from ._portal import Portal
from ._actor import Actor
from ._entry import _mp_main
from ._exceptions import (
ActorFailure,
RemoteActorError,
ContextCancelled,
)
from ._debug import maybe_wait_for_debugger, breakpoint
from ._exceptions import ActorFailure, RemoteActorError
from ._debug import maybe_wait_for_debugger
log = get_logger('tractor')
@ -96,7 +92,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
async def result_from_portal(
portal: Portal,
actor: Actor,
@ -104,7 +99,7 @@ async def result_from_portal(
cancel_on_result: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> tuple[Optional[Any], Optional[BaseException]]:
) -> None:
"""
Cancel actor gracefully once it's "main" portal's
result arrives.
@ -112,11 +107,9 @@ async def result_from_portal(
Should only be called for actors spawned with `run_in_actor()`.
"""
# __tracebackhide__ = True
__tracebackhide__ = True
uid = portal.channel.uid
remote_result = None
is_remote_result = None
# cancel control is explicityl done by the caller
with trio.CancelScope() as cs:
@ -127,42 +120,50 @@ async def result_from_portal(
# a MultiError and we still send out a cancel request
# result = await exhaust_portal(portal, actor)
try:
log.info(f"Waiting on final result from {actor.uid}")
log.debug(f"Waiting on final result from {actor.uid}")
# XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api
result = await portal.result()
is_remote_result = True
log.info(f"Returning final result: {result}")
except RemoteActorError as rerr:
# this includes real remote errors as well as
# `ContextCancelled`
is_remote_result = True
result = rerr
log.debug(f"Returning final result: {result}")
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
is_remote_result = False
result = err
errors[actor.uid] = err
raise
except trio.Cancelled as err:
# lol, of course we need this too ;P
# TODO: merge with above?
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
result = err
# errors[actor.uid] = err
# raise
if cs.cancelled_caught:
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
if cancel_on_result:
if isinstance(result, Exception):
# errors[actor.uid] = result
log.warning(
f"Cancelling single-task-run {uid} after error {result}"
)
# raise result
return result, is_remote_result
else:
log.runtime(
f"Cancelling {uid} gracefully "
f"after one-time-task result {result}")
# except trio.Cancelled as err:
# # lol, of course we need this too ;P
# # TODO: merge with above?
# log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
# result = err
# # errors[actor.uid] = err
# raise
# an actor that was `.run_in_actor()` executes a single task
# and delivers the result, then we cancel it.
# TODO: likely in the future we should just implement this using
# the new `open_context()` IPC api, since it's the more general
# api and can represent this form.
# XXX: do we need this?
# await maybe_wait_for_debugger()
await portal.cancel_actor()
# return result
return result
async def do_hard_kill(
@ -203,18 +204,16 @@ async def do_hard_kill(
async def reap_proc(
proc: trio.Process,
uid: tuple[str, str],
terminate_after: Optional[float] = None,
terminate_after: float = float('inf'),
hard_kill_after: int = 0.1,
) -> None:
with trio.move_on_after(terminate_after or float('inf')) as cs:
with trio.move_on_after(terminate_after) as cs:
# Wait for proc termination but **dont' yet** do
# any out-of-ipc-land termination / process
# killing. This is a "light" (cancellable) join,
# the hard join is below after timeout
await proc.wait()
log.info(f'Proc for {uid} terminated gracefully')
if cs.cancelled_caught and terminate_after is not float('inf'):
# Always "hard" join lingering sub procs since no
@ -243,7 +242,6 @@ async def new_proc(
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
graceful_kill_timeout: int = 3,
@ -353,7 +351,6 @@ async def new_proc(
) as cerr:
log.exception(f'Relaying unexpected {cerr} to nursery')
await breakpoint()
# sending IPC-msg level cancel requests is expected to be
# managed by the nursery.
@ -370,16 +367,16 @@ async def new_proc(
# True, # cancel_on_result
)
# Graceful reap attempt - 2 cases:
finally:
# 2 cases:
# - actor nursery was cancelled in which case
# we want to try a soft reap of the actor via
# ipc cancellation and then failing that do a hard
# reap.
# - this is normal termination and we must wait indefinitely
# for ria to return and daemon actors to be cancelled
# for ria and daemon actors
reaping_cancelled: bool = False
ria = portal in actor_nursery._cancel_after_result_on_exit
result = None
# this is the soft reap sequence. we can
# either collect results:
@ -396,138 +393,70 @@ async def new_proc(
try:
log.cancel(f'Starting soft actor reap for {uid}')
cancel_scope = None
reap_timeout = None
# async with trio.open_nursery() as nursery:
if portal.channel.connected() and ria:
result, is_remote = await result_from_portal(
# we wait for result and cancel on completion
await result_from_portal(
portal,
subactor,
errors,
True, # cancel_on_result
)
# # collect any expected ``.run_in_actor()`` results
# cancel_scope = await nursery.start(
# result_from_portal,
# portal,
# subactor,
# errors,
# True, # cancel_on_result
)
if is_remote:
if isinstance(result, RemoteActorError):
# errors[actor.uid] = result
if (
portal.cancel_called and
isinstance(result, ContextCancelled)
):
log.cancel(f'{uid} received expected cancel')
errors[uid] = result
# fall through to below soft proc reap
reap_timeout = 0.5
else:
log.warning(
f"Cancelling single-task-run {uid} after remote error {result}"
)
# likely a real remote error propagation
# so pass up to nursery strat
should_raise = await actor_nursery._handle_err(
result,
portal=portal,
)
# propagate up to spawn nursery to be
# grouped into any multierror.
# if should_raise:
# raise result
else:
log.runtime(
f"Cancelling {uid} gracefully "
f"after one-time-task result {result}")
# an actor that was `.run_in_actor()` executes a single task
# and delivers the result, then we cancel it.
# TODO: likely in the future we should just implement this using
# the new `open_context()` IPC api, since it's the more general
# api and can represent this form.
# XXX: do we need this?
# await maybe_wait_for_debugger()
await portal.cancel_actor()
else:
log.exception(
f"Cancelling single-task-run {uid} after local error"
)
raise result
# )
# soft & cancellable
await reap_proc(proc, uid, terminate_after=reap_timeout)
await reap_proc(proc)
# except (
# ContextCancelled,
# ) as err:
# if portal.cancel_called:
# log.cancel('{uid} received expected cancel')
# # if proc terminates before portal result
# if cancel_scope:
# cancel_scope.cancel()
# # soft & cancellable
# await reap_proc(proc, uid, terminate_after=0.1)
# except (
# RemoteActorError,
# ) as err:
# reaping_cancelled = err
# log.exception(f'{uid} remote error')
# await actor_nursery._handle_err(err, portal=portal)
except (
RemoteActorError,
) as err:
reaping_cancelled = err
log.exception(f'{uid} remote error')
await actor_nursery._handle_err(err, portal=portal)
except (
trio.Cancelled,
) as err:
# NOTE: for now we pack the cancelleds and expect the actor
# nursery to re-raise them in a multierror but we could
# have also let them bubble up through the spawn nursery.
# in theory it's more correct to raise any
# `ContextCancelled` errors we get back from the
# `Portal.cancel_actor()` call and in that error
# have meta-data about whether we timeout out or
# actually got a cancel message back from the remote task.
# IF INSTEAD we raised *here* then this logic has to be
# handled inside the oca supervisor block and the spawn_n
# task cancelleds would have to be replaced with the remote
# task `ContextCancelled`s, *if* they ever arrive.
errors[uid] = err
# with trio.CancelScope(shield=True):
# await breakpoint()
if actor_nursery.cancel_called:
log.cancel(f'{uid} soft reap cancelled by nursery')
reaping_cancelled = err
if actor_nursery.cancelled:
log.cancel(f'{uid} wait cancelled by nursery')
else:
if not actor_nursery._spawn_n.cancel_scope.cancel_called:
# this would be pretty weird and unexpected
await breakpoint()
# actor nursery wasn't cancelled before the spawn
# nursery was which likely means that there was
# an error in the actor nursery enter and the
# spawn nursery cancellation "beat" the call to
# .cancel()? that's a bug right?
# saw this with settings bugs in the ordermode pane in
# piker.
log.exception(f'{uid} soft wait error?')
raise RuntimeError(
'Task spawn nursery cancelled before actor nursery?')
except (
BaseException
) as err:
reaping_cancelled = err
log.exception(f'{uid} soft reap local error')
finally:
if reaping_cancelled:
assert actor_nursery.cancel_called
if actor_nursery.cancelled:
log.cancel(f'Nursery cancelled during soft wait for {uid}')
with trio.CancelScope(shield=True):
await maybe_wait_for_debugger()
# XXX: we should probably just
# check for a `ContextCancelled` on portals
# here and fill them in over `trio.Cancelled` right?
# XXX: can't do this, it'll hang some tests.. no
# idea why yet.
# with trio.CancelScope(shield=True):
# await actor_nursery._handle_err(
# reaping_cancelled,
# portal=portal
# )
# hard reap sequence with timeouts
if proc.poll() is None:
@ -542,7 +471,6 @@ async def new_proc(
await reap_proc(
proc,
uid,
# this is the same as previous timeout
# setting before rewriting this spawn
# section
@ -560,7 +488,6 @@ async def new_proc(
with trio.CancelScope(shield=True):
await reap_proc(
proc,
uid,
terminate_after=0.1,
)
@ -574,26 +501,15 @@ async def new_proc(
# subactor
subactor, proc, portal = actor_nursery._children.pop(
subactor.uid)
if not actor_nursery._children:
# all subactor children have completed
log.cancel(f"{uid} reports all children complete!")
actor_nursery._all_children_reaped.set()
spawn_n = actor_nursery._spawn_n
# with trio.CancelScope(shield=True):
# await breakpoint()
if not spawn_n._closed:
# the parent task that opened the actor nursery
# hasn't yet closed it so we cancel that task now.
spawn_n.cancel_scope.cancel()
# not entirely sure why we need this.. but without it
# the reaping cancelled error is never reported upwards
# to the spawn nursery?
# if reaping_cancelled:
# raise reaping_cancelled
if reaping_cancelled:
raise reaping_cancelled
else:
# `multiprocessing`
@ -710,7 +626,6 @@ async def mp_new_proc(
# no shield is required here (vs. above on the trio backend)
# since debug mode is not supported on mp.
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
finally:
@ -726,23 +641,13 @@ async def mp_new_proc(
try:
# async with trio.open_nursery() as n:
# n.cancel_scope.shield = True
print('soft mp reap')
# cancel_scope = await nursery.start(
result = await result_from_portal(
cancel_scope = await nursery.start(
result_from_portal,
portal,
subactor,
errors,
# True,
errors
)
# except trio.Cancelled as err:
except BaseException as err:
log.exception('hard mp reap')
with trio.CancelScope(shield=True):
await actor_nursery._handle_err(err, portal=portal)
print('sent to nursery')
except trio.Cancelled as err:
cancel_exc = err
# if the reaping task was cancelled we may have hit
@ -752,34 +657,23 @@ async def mp_new_proc(
reaping_cancelled = True
if proc.is_alive():
with trio.CancelScope(shield=True):
print('hard reaping')
with trio.move_on_after(0.1) as cs:
cs.shield = True
await proc_waiter(proc)
if cs.cancelled_caught:
print('pwning mp proc')
proc.terminate()
finally:
# if not reaping_cancelled and proc.is_alive():
# await proc_waiter(proc)
if not reaping_cancelled and proc.is_alive():
await proc_waiter(proc)
# TODO: timeout block here?
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
if not actor_nursery._children:
# all subactor children have completed
# log.cancel(f"{uid} reports all children complete!")
actor_nursery._all_children_reaped.set()
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
@ -788,7 +682,6 @@ async def mp_new_proc(
f"{subactor.uid}")
cancel_scope.cancel()
if reaping_cancelled: # let the cancellation bubble up
print('raising')
elif reaping_cancelled: # let the cancellation bubble up
assert cancel_exc
raise cancel_exc

View File

@ -12,7 +12,7 @@ import trio
from async_generator import asynccontextmanager
from . import _debug
from ._debug import maybe_wait_for_debugger, breakpoint
from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel
from ._actor import Actor
@ -48,19 +48,10 @@ class ActorNursery:
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
self._cancel_called: bool = False
self._join_procs = trio.Event()
self._all_children_reaped = trio.Event()
self.errors = errors
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def start_actor(
self,
name: str,
@ -186,22 +177,17 @@ class ActorNursery:
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
self.cancelled = True
# entries may be poppsed by the spawning backend as
# actors cancel individually
childs = self._children.copy()
if self.cancel_called:
log.warning(
f'Nursery with children {len(childs)} already cancelled')
return
log.cancel(
f'Cancelling nursery in {self._actor.uid} with children\n'
f'{childs.keys()}'
)
self._cancel_called = True
# wake up all spawn tasks to move on as those nursery
# has ``__aexit__()``-ed
@ -210,45 +196,24 @@ class ActorNursery:
await maybe_wait_for_debugger()
# one-cancels-all strat
try:
async with trio.open_nursery() as cancel_sender:
for subactor, proc, portal in childs.values():
if not portal.cancel_called and portal.channel.connected():
cancel_sender.start_soon(portal.cancel_actor)
except trio.MultiError as err:
_err = err
log.exception(f'{self} errors during cancel')
# await breakpoint()
# # LOL, ok so multiprocessing requires this for some reason..
# with trio.CancelScope(shield=True):
# await trio.lowlevel.checkpoint()
# cancel all spawner tasks
# self._spawn_n.cancel_scope.cancel()
self.cancelled = True
async def _handle_err(
self,
err: BaseException,
portal: Optional[Portal] = None,
is_ctx_error: bool = False,
) -> bool:
) -> None:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
if is_ctx_error:
assert not portal
uid = self._actor.uid
else:
uid = portal.channel.uid
if err not in self.errors.values():
self.errors[uid] = err
with trio.CancelScope(shield=True):
etype = type(err)
@ -262,18 +227,13 @@ class ActorNursery:
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.error(
log.exception(
f"Nursery for {current_actor().uid} "
f"errored from {uid} with\n{err}")
f"errored with {err}, ")
# cancel all subactors
await self.cancel()
return True
log.warning(f'Skipping duplicate error for {uid}')
return False
@asynccontextmanager
async def _open_and_supervise_one_cancels_all_nursery(
@ -290,7 +250,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
src_err: Optional[BaseException] = None
nurse_err: Optional[BaseException] = None
# errors from this daemon actor nursery bubble up to caller
try:
@ -339,20 +298,14 @@ async def _open_and_supervise_one_cancels_all_nursery(
# same as a user code failure.
except BaseException as err:
print('ERROR')
# anursery._join_procs.set()
src_err = err
# with trio.CancelScope(shield=True):
should_raise = await anursery._handle_err(err, is_ctx_error=True)
# XXX: raising here causes some cancellation
# / multierror tests to fail because of what appears to
# be double raise? we probably need to see how `trio`
# does this case..
if should_raise:
await anursery._handle_err(err)
raise
# except trio.MultiError as err:
except BaseException as err:
# nursery bubble up
nurse_err = err
@ -378,15 +331,15 @@ async def _open_and_supervise_one_cancels_all_nursery(
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
# await breakpoint()
if src_err and src_err not in errors.values():
errors[actor.uid] = src_err
if errors:
# if nurse_err or src_err:
if anursery._children:
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
# with trio.CancelScope(shield=True):
# await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))