Compare commits

..

No commits in common. "a55ea18c7d2654c414ae12e35611cf404d45a8b8" and "cef9ab73534220eac3703161e08a256aeedd7436" have entirely different histories.

7 changed files with 273 additions and 512 deletions

View File

@ -74,15 +74,7 @@ def pytest_configure(config):
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
def loglevel(request): def loglevel(request):
orig = tractor.log._default_loglevel orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
level_from_cli = request.config.option.loglevel
# disable built-in capture when user passes the `--ll` value
# presuming they already know they want to see console logging
# and don't need it repeated by pytest.
if level_from_cli:
request.config.option.showcapture = 'no'
level = tractor.log._default_loglevel = level_from_cli
yield level yield level
tractor.log._default_loglevel = orig tractor.log._default_loglevel = orig

View File

@ -23,9 +23,9 @@ async def sleep_forever():
await trio.sleep_forever() await trio.sleep_forever()
async def do_nuthin(sleep=0): async def do_nuthin():
# just nick the scheduler # just nick the scheduler
await trio.sleep(sleep) await trio.sleep(0)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -100,7 +100,6 @@ def test_multierror(arb_addr):
@pytest.mark.parametrize('delay', (0, 0.5)) @pytest.mark.parametrize('delay', (0, 0.5))
@pytest.mark.parametrize( @pytest.mark.parametrize(
'num_subactors', range(25, 26), 'num_subactors', range(25, 26),
# 'num_subactors', range(2, 3),
) )
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``trio.MultiError`` out of a nursery where """Verify we raise a ``trio.MultiError`` out of a nursery where
@ -223,7 +222,7 @@ async def test_cancel_infinite_streamer(start_method):
# daemon complete quickly delay while single task # daemon complete quickly delay while single task
# actors error after brief delay # actors error after brief delay
(3, tractor.MultiError, AssertionError, (3, tractor.MultiError, AssertionError,
(assert_err, {'delay': 1}), (do_nuthin, {'sleep': 0}, False)), (assert_err, {'delay': 1}), (do_nuthin, {}, False)),
], ],
ids=[ ids=[
'1_run_in_actor_fails', '1_run_in_actor_fails',
@ -326,7 +325,6 @@ async def spawn_and_error(breadth, depth) -> None:
) )
kwargs = { kwargs = {
'name': f'{name}_errorer_{i}', 'name': f'{name}_errorer_{i}',
# 'delay': 0.01,
} }
await nursery.run_in_actor(*args, **kwargs) await nursery.run_in_actor(*args, **kwargs)
@ -391,23 +389,13 @@ async def test_nested_multierrors(loglevel, start_method):
# on windows sometimes spawning is just too slow and # on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead # we get back the (sent) cancel signal instead
if platform.system() == 'Windows': if platform.system() == 'Windows':
assert subexc.type in ( assert (subexc.type is trio.MultiError) or (
trio.MultiError, subexc.type is tractor.RemoteActorError)
tractor.RemoteActorError,
)
else: else:
assert subexc.type in ( assert subexc.type is trio.MultiError
trio.MultiError,
trio.Cancelled,
# tractor.RemoteActorError,
)
else: else:
assert subexc.type in ( assert (subexc.type is tractor.RemoteActorError) or (
tractor.RemoteActorError, subexc.type is trio.Cancelled)
trio.Cancelled,
)
else: else:
pytest.fail(f'Got no error from nursery?') pytest.fail(f'Got no error from nursery?')

View File

@ -180,7 +180,6 @@ def test_multi_actor_subs_arbiter_pub(
'streamer', 'streamer',
enable_modules=[__name__], enable_modules=[__name__],
) )
name = 'streamer'
even_portal = await n.run_in_actor( even_portal = await n.run_in_actor(
subs, subs,

View File

@ -58,7 +58,7 @@ async def _invoke(
Invoke local func and deliver result(s) over provided channel. Invoke local func and deliver result(s) over provided channel.
''' '''
# __tracebackhide__ = True __tracebackhide__ = True
treat_as_gen = False treat_as_gen = False
# possible a traceback (not sure what typing is for this..) # possible a traceback (not sure what typing is for this..)
@ -69,7 +69,6 @@ async def _invoke(
ctx = Context(chan, cid) ctx = Context(chan, cid)
context: bool = False context: bool = False
fname = func.__name__
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
@ -165,7 +164,6 @@ async def _invoke(
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err: except trio.Cancelled as err:
tb = err.__traceback__ tb = err.__traceback__
raise
if cs.cancelled_caught: if cs.cancelled_caught:
@ -173,6 +171,7 @@ async def _invoke(
# so they can be unwrapped and displayed on the caller # so they can be unwrapped and displayed on the caller
# side! # side!
fname = func.__name__
if ctx._cancel_called: if ctx._cancel_called:
msg = f'{fname} cancelled itself' msg = f'{fname} cancelled itself'
@ -193,33 +192,9 @@ async def _invoke(
await chan.send({'functype': 'asyncfunc', 'cid': cid}) await chan.send({'functype': 'asyncfunc', 'cid': cid})
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
try: await chan.send({'return': await coro, 'cid': cid})
await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err:
tb = err.__traceback__
raise
# await chan.send({'return': await coro, 'cid': cid})
if cs.cancelled_caught: except (Exception, trio.MultiError) as err:
# if cs.cancel_called:
if cs.cancel_called:
msg = (
f'{fname} was remotely cancelled by its caller '
f'{ctx.chan.uid}'
)
else:
msg = f'{fname} cancelled itself'
raise ContextCancelled(
msg,
suberror_type=trio.Cancelled,
)
except (
Exception,
trio.MultiError,
# trio.Cancelled,
) as err:
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
@ -273,7 +248,7 @@ async def _invoke(
# If we're cancelled before the task returns then the # If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
log.warning( log.warning(
f"Task {func} likely errored or cancelled before start") f"Task {func} likely errored or cancelled before it started")
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed") log.runtime("All RPC tasks have completed")
@ -383,14 +358,6 @@ class Actor:
Tuple[Any, Any, Any, Any, Any]] = None Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def wait_for_peer( async def wait_for_peer(
self, uid: Tuple[str, str] self, uid: Tuple[str, str]
) -> Tuple[trio.Event, Channel]: ) -> Tuple[trio.Event, Channel]:
@ -576,8 +543,7 @@ class Actor:
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
assert send_chan.cid == cid # type: ignore assert send_chan.cid == cid # type: ignore
if 'error' in msg: # if 'error' in msg:
recv_chan
# ctx = getattr(recv_chan, '_ctx', None) # ctx = getattr(recv_chan, '_ctx', None)
# if ctx: # if ctx:
# ctx._error_from_remote_msg(msg) # ctx._error_from_remote_msg(msg)
@ -652,7 +618,6 @@ class Actor:
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
msg = None msg = None
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
uid = chan.uid
log.runtime(f"Entering msg loop for {chan} from {chan.uid}") log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
@ -676,7 +641,7 @@ class Actor:
log.runtime( log.runtime(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {uid}") f" {chan} from {chan.uid}")
break break
@ -711,46 +676,38 @@ class Actor:
f"{ns}.{funcname}({kwargs})") f"{ns}.{funcname}({kwargs})")
if ns == 'self': if ns == 'self':
func = getattr(self, funcname) func = getattr(self, funcname)
if funcname == 'cancel': if funcname == 'cancel':
# self.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
log.cancel( # don't start entire actor runtime cancellation if this
f"{self.uid} remote cancel msg from {uid}") # actor is in debug mode
# don't start entire actor runtime
# cancellation if this actor is in debug
# mode
pdb_complete = _debug._local_pdb_complete pdb_complete = _debug._local_pdb_complete
if pdb_complete: if pdb_complete:
log.cancel(
f'{self.uid} is in debug, wait for unlock')
await pdb_complete.wait() await pdb_complete.wait()
# we immediately start the runtime machinery # we immediately start the runtime machinery shutdown
# shutdown
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await _invoke( # self.cancel() was called so kill this msg loop
self, cid, chan, func, kwargs, is_rpc=False # and break out into ``_async_main()``
) log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
# await self._cancel_complete.wait()
loop_cs.cancel() loop_cs.cancel()
continue break
if funcname == '_cancel_task': if funcname == '_cancel_task':
task_cid = kwargs['cid']
log.cancel(
f'Actor {uid} requests cancel for {task_cid}')
# we immediately start the runtime machinery # we immediately start the runtime machinery shutdown
# shutdown
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
kwargs['chan'] = chan kwargs['chan'] = chan
await _invoke( log.cancel(
self, cid, chan, func, kwargs, is_rpc=False f"Actor {self.uid} was remotely cancelled; "
) "waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
continue continue
else: else:
# complain to client about restricted modules # complain to client about restricted modules
@ -795,8 +752,7 @@ class Actor:
log.runtime( log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
# end of async for, channel disconnect vis # end of async for, channel disconnect vis ``trio.EndOfChannel``
# ``trio.EndOfChannel``
log.runtime( log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks" f"{chan} for {chan.uid} disconnected, cancelling tasks"
) )
@ -1043,7 +999,7 @@ class Actor:
raise raise
finally: finally:
log.runtime("root runtime nursery complete") log.info("Runtime nursery complete")
# tear down all lifetime contexts if not in guest mode # tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint? # XXX: should this just be in the entrypoint?
@ -1237,10 +1193,7 @@ class Actor:
tasks = self._rpc_tasks tasks = self._rpc_tasks
if tasks: if tasks:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for ( for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
(chan, cid),
(scope, func, is_complete),
) in tasks.copy().items():
if only_chan is not None: if only_chan is not None:
if only_chan != chan: if only_chan != chan:
continue continue
@ -1249,9 +1202,8 @@ class Actor:
if func != self._cancel_task: if func != self._cancel_task:
await self._cancel_task(cid, chan) await self._cancel_task(cid, chan)
if tasks: log.cancel(
log.cancel( f"Waiting for remaining rpc tasks to complete {tasks}")
f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None: def cancel_server(self) -> None:

View File

@ -84,15 +84,6 @@ class Portal:
] = None ] = None
self._streams: Set[ReceiveMsgStream] = set() self._streams: Set[ReceiveMsgStream] = set()
self.actor = current_actor() self.actor = current_actor()
self._cancel_called: bool = False
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def _submit( async def _submit(
self, self,
@ -138,7 +129,7 @@ class Portal:
resptype: str, resptype: str,
first_msg: dict first_msg: dict
) -> Any: ) -> Any:
# __tracebackhide__ = True __tracebackhide__ = True
assert resptype == 'asyncfunc' # single response assert resptype == 'asyncfunc' # single response
msg = await recv_chan.receive() msg = await recv_chan.receive()
@ -154,7 +145,7 @@ class Portal:
Return the result(s) from the remote actor's "main" task. Return the result(s) from the remote actor's "main" task.
""" """
# __tracebackhide__ = True __tracebackhide__ = True
# Check for non-rpc errors slapped on the # Check for non-rpc errors slapped on the
# channel for which we always raise # channel for which we always raise
exc = self.channel._exc exc = self.channel._exc
@ -206,16 +197,9 @@ class Portal:
# we'll need to .aclose all those channels here # we'll need to .aclose all those channels here
await self._cancel_streams() await self._cancel_streams()
async def cancel_actor(self) -> None: async def cancel_actor(self):
''' """Cancel the actor on the other end of this portal.
Cancel the actor on the other end of this portal. """
That means cancelling the "actor runtime" not just any one
task that's running there.
'''
self._cancel_called = True
if not self.channel.connected(): if not self.channel.connected():
log.cancel("This portal is already closed can't cancel") log.cancel("This portal is already closed can't cancel")
return False return False
@ -223,8 +207,8 @@ class Portal:
await self._cancel_streams() await self._cancel_streams()
log.cancel( log.cancel(
f"Sending runtime cancel msg to {self.channel.uid} @ " f"Sending actor cancel request to {self.channel.uid} on "
f"{self.channel.raddr}") f"{self.channel}")
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield # XXX: sure would be nice to make this work with a proper shield

View File

@ -31,12 +31,8 @@ from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor from ._actor import Actor
from ._entry import _mp_main from ._entry import _mp_main
from ._exceptions import ( from ._exceptions import ActorFailure, RemoteActorError
ActorFailure, from ._debug import maybe_wait_for_debugger
RemoteActorError,
ContextCancelled,
)
from ._debug import maybe_wait_for_debugger, breakpoint
log = get_logger('tractor') log = get_logger('tractor')
@ -96,7 +92,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
async def result_from_portal( async def result_from_portal(
portal: Portal, portal: Portal,
actor: Actor, actor: Actor,
@ -104,7 +99,7 @@ async def result_from_portal(
cancel_on_result: bool = False, cancel_on_result: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> tuple[Optional[Any], Optional[BaseException]]: ) -> None:
""" """
Cancel actor gracefully once it's "main" portal's Cancel actor gracefully once it's "main" portal's
result arrives. result arrives.
@ -112,11 +107,9 @@ async def result_from_portal(
Should only be called for actors spawned with `run_in_actor()`. Should only be called for actors spawned with `run_in_actor()`.
""" """
# __tracebackhide__ = True __tracebackhide__ = True
uid = portal.channel.uid uid = portal.channel.uid
remote_result = None
is_remote_result = None
# cancel control is explicityl done by the caller # cancel control is explicityl done by the caller
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
@ -127,42 +120,50 @@ async def result_from_portal(
# a MultiError and we still send out a cancel request # a MultiError and we still send out a cancel request
# result = await exhaust_portal(portal, actor) # result = await exhaust_portal(portal, actor)
try: try:
log.info(f"Waiting on final result from {actor.uid}") log.debug(f"Waiting on final result from {actor.uid}")
# XXX: streams should never be reaped here since they should # XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api # always be established and shutdown using a context manager api
result = await portal.result() result = await portal.result()
is_remote_result = True log.debug(f"Returning final result: {result}")
log.info(f"Returning final result: {result}")
except RemoteActorError as rerr:
# this includes real remote errors as well as
# `ContextCancelled`
is_remote_result = True
result = rerr
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError`` # we reraise in the parent task via a ``trio.MultiError``
is_remote_result = False result = err
errors[actor.uid] = err
raise
except trio.Cancelled as err:
# lol, of course we need this too ;P
# TODO: merge with above?
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
result = err result = err
# errors[actor.uid] = err # errors[actor.uid] = err
# raise # raise
if cs.cancelled_caught: if cancel_on_result:
log.warning(f"Cancelled `Portal.result()` waiter for {uid}") if isinstance(result, Exception):
# errors[actor.uid] = result
log.warning(
f"Cancelling single-task-run {uid} after error {result}"
)
# raise result
return result, is_remote_result else:
log.runtime(
f"Cancelling {uid} gracefully "
f"after one-time-task result {result}")
# except trio.Cancelled as err: # an actor that was `.run_in_actor()` executes a single task
# # lol, of course we need this too ;P # and delivers the result, then we cancel it.
# # TODO: merge with above? # TODO: likely in the future we should just implement this using
# log.warning(f"Cancelled `Portal.result()` waiter for {uid}") # the new `open_context()` IPC api, since it's the more general
# result = err # api and can represent this form.
# # errors[actor.uid] = err # XXX: do we need this?
# raise # await maybe_wait_for_debugger()
await portal.cancel_actor()
return result
# return result
async def do_hard_kill( async def do_hard_kill(
@ -203,18 +204,16 @@ async def do_hard_kill(
async def reap_proc( async def reap_proc(
proc: trio.Process, proc: trio.Process,
uid: tuple[str, str], terminate_after: float = float('inf'),
terminate_after: Optional[float] = None,
hard_kill_after: int = 0.1, hard_kill_after: int = 0.1,
) -> None: ) -> None:
with trio.move_on_after(terminate_after or float('inf')) as cs: with trio.move_on_after(terminate_after) as cs:
# Wait for proc termination but **dont' yet** do # Wait for proc termination but **dont' yet** do
# any out-of-ipc-land termination / process # any out-of-ipc-land termination / process
# killing. This is a "light" (cancellable) join, # killing. This is a "light" (cancellable) join,
# the hard join is below after timeout # the hard join is below after timeout
await proc.wait() await proc.wait()
log.info(f'Proc for {uid} terminated gracefully')
if cs.cancelled_caught and terminate_after is not float('inf'): if cs.cancelled_caught and terminate_after is not float('inf'):
# Always "hard" join lingering sub procs since no # Always "hard" join lingering sub procs since no
@ -243,7 +242,6 @@ async def new_proc(
bind_addr: Tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
graceful_kill_timeout: int = 3, graceful_kill_timeout: int = 3,
@ -353,7 +351,6 @@ async def new_proc(
) as cerr: ) as cerr:
log.exception(f'Relaying unexpected {cerr} to nursery') log.exception(f'Relaying unexpected {cerr} to nursery')
await breakpoint()
# sending IPC-msg level cancel requests is expected to be # sending IPC-msg level cancel requests is expected to be
# managed by the nursery. # managed by the nursery.
@ -370,230 +367,149 @@ async def new_proc(
# True, # cancel_on_result # True, # cancel_on_result
) )
# Graceful reap attempt - 2 cases:
# - actor nursery was cancelled in which case
# we want to try a soft reap of the actor via
# ipc cancellation and then failing that do a hard
# reap.
# - this is normal termination and we must wait indefinitely
# for ria to return and daemon actors to be cancelled
reaping_cancelled: bool = False
ria = portal in actor_nursery._cancel_after_result_on_exit
result = None
# this is the soft reap sequence. we can
# either collect results:
# - ria actors get them them via ``Portal.result()``
# - we wait forever on daemon actors until they're
# cancelled by user code via ``Portal.cancel_actor()``
# or ``ActorNursery.cancel(). in the latter case
# we have to expect another cancel here since
# the task spawning nurseries will both be cacelled
# by ``ActorNursery.cancel()``.
# OR, we're cancelled while collecting results, which
# case we need to try another soft cancel and reap attempt.
try:
log.cancel(f'Starting soft actor reap for {uid}')
cancel_scope = None
reap_timeout = None
if portal.channel.connected() and ria:
result, is_remote = await result_from_portal(
portal,
subactor,
errors,
# True, # cancel_on_result
)
if is_remote:
if isinstance(result, RemoteActorError):
# errors[actor.uid] = result
if (
portal.cancel_called and
isinstance(result, ContextCancelled)
):
log.cancel(f'{uid} received expected cancel')
errors[uid] = result
# fall through to below soft proc reap
reap_timeout = 0.5
else:
log.warning(
f"Cancelling single-task-run {uid} after remote error {result}"
)
# likely a real remote error propagation
# so pass up to nursery strat
should_raise = await actor_nursery._handle_err(
result,
portal=portal,
)
# propagate up to spawn nursery to be
# grouped into any multierror.
# if should_raise:
# raise result
else:
log.runtime(
f"Cancelling {uid} gracefully "
f"after one-time-task result {result}")
# an actor that was `.run_in_actor()` executes a single task
# and delivers the result, then we cancel it.
# TODO: likely in the future we should just implement this using
# the new `open_context()` IPC api, since it's the more general
# api and can represent this form.
# XXX: do we need this?
# await maybe_wait_for_debugger()
await portal.cancel_actor()
else:
log.exception(
f"Cancelling single-task-run {uid} after local error"
)
raise result
# soft & cancellable
await reap_proc(proc, uid, terminate_after=reap_timeout)
# except (
# ContextCancelled,
# ) as err:
# if portal.cancel_called:
# log.cancel('{uid} received expected cancel')
# # soft & cancellable
# await reap_proc(proc, uid, terminate_after=0.1)
# except (
# RemoteActorError,
# ) as err:
# reaping_cancelled = err
# log.exception(f'{uid} remote error')
# await actor_nursery._handle_err(err, portal=portal)
except (
trio.Cancelled,
) as err:
# NOTE: for now we pack the cancelleds and expect the actor
# nursery to re-raise them in a multierror but we could
# have also let them bubble up through the spawn nursery.
# in theory it's more correct to raise any
# `ContextCancelled` errors we get back from the
# `Portal.cancel_actor()` call and in that error
# have meta-data about whether we timeout out or
# actually got a cancel message back from the remote task.
# IF INSTEAD we raised *here* then this logic has to be
# handled inside the oca supervisor block and the spawn_n
# task cancelleds would have to be replaced with the remote
# task `ContextCancelled`s, *if* they ever arrive.
errors[uid] = err
# with trio.CancelScope(shield=True):
# await breakpoint()
if actor_nursery.cancel_called:
log.cancel(f'{uid} soft reap cancelled by nursery')
else:
if not actor_nursery._spawn_n.cancel_scope.cancel_called:
# this would be pretty weird and unexpected
await breakpoint()
# actor nursery wasn't cancelled before the spawn
# nursery was which likely means that there was
# an error in the actor nursery enter and the
# spawn nursery cancellation "beat" the call to
# .cancel()? that's a bug right?
# saw this with settings bugs in the ordermode pane in
# piker.
log.exception(f'{uid} soft wait error?')
raise RuntimeError(
'Task spawn nursery cancelled before actor nursery?')
finally: finally:
if reaping_cancelled:
assert actor_nursery.cancel_called
if actor_nursery.cancelled:
log.cancel(f'Nursery cancelled during soft wait for {uid}')
with trio.CancelScope(shield=True):
await maybe_wait_for_debugger()
# XXX: we should probably just
# check for a `ContextCancelled` on portals
# here and fill them in over `trio.Cancelled` right?
# hard reap sequence with timeouts
if proc.poll() is None:
log.cancel(f'Attempting hard reap for {uid}')
with trio.CancelScope(shield=True):
# hard reap sequence
# ``Portal.cancel_actor()`` is expected to have
# been called by the supervising nursery so we
# do **not** call it here.
await reap_proc(
proc,
uid,
# this is the same as previous timeout
# setting before rewriting this spawn
# section
terminate_after=3,
)
# if somehow the hard reap didn't collect the child then
# we send in the big gunz.
while proc.poll() is None:
log.critical(
f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n'
f'{proc}'
)
with trio.CancelScope(shield=True):
await reap_proc(
proc,
uid,
terminate_after=0.1,
)
log.info(f"Joined {proc}")
# 2 cases: # 2 cases:
# - the actor terminated gracefully # - actor nursery was cancelled in which case
# - we're cancelled and likely need to re-raise # we want to try a soft reap of the actor via
# ipc cancellation and then failing that do a hard
# reap.
# - this is normal termination and we must wait indefinitely
# for ria and daemon actors
reaping_cancelled: bool = False
ria = portal in actor_nursery._cancel_after_result_on_exit
# pop child entry to indicate we no longer managing this # this is the soft reap sequence. we can
# subactor # either collect results:
subactor, proc, portal = actor_nursery._children.pop( # - ria actors get them them via ``Portal.result()``
subactor.uid) # - we wait forever on daemon actors until they're
# cancelled by user code via ``Portal.cancel_actor()``
# or ``ActorNursery.cancel(). in the latter case
# we have to expect another cancel here since
# the task spawning nurseries will both be cacelled
# by ``ActorNursery.cancel()``.
if not actor_nursery._children: # OR, we're cancelled while collecting results, which
# all subactor children have completed # case we need to try another soft cancel and reap attempt.
log.cancel(f"{uid} reports all children complete!") try:
log.cancel(f'Starting soft actor reap for {uid}')
cancel_scope = None
# async with trio.open_nursery() as nursery:
actor_nursery._all_children_reaped.set() if portal.channel.connected() and ria:
spawn_n = actor_nursery._spawn_n # we wait for result and cancel on completion
# with trio.CancelScope(shield=True): await result_from_portal(
# await breakpoint() portal,
if not spawn_n._closed: subactor,
# the parent task that opened the actor nursery errors,
# hasn't yet closed it so we cancel that task now. True, # cancel_on_result
spawn_n.cancel_scope.cancel() )
# # collect any expected ``.run_in_actor()`` results
# cancel_scope = await nursery.start(
# result_from_portal,
# portal,
# subactor,
# errors,
# True, # cancel_on_result
# )
# not entirely sure why we need this.. but without it # soft & cancellable
# the reaping cancelled error is never reported upwards await reap_proc(proc)
# to the spawn nursery?
# if reaping_cancelled: # # if proc terminates before portal result
# raise reaping_cancelled # if cancel_scope:
# cancel_scope.cancel()
except (
RemoteActorError,
) as err:
reaping_cancelled = err
log.exception(f'{uid} remote error')
await actor_nursery._handle_err(err, portal=portal)
except (
trio.Cancelled,
) as err:
reaping_cancelled = err
if actor_nursery.cancelled:
log.cancel(f'{uid} wait cancelled by nursery')
else:
log.exception(f'{uid} soft wait error?')
except (
BaseException
) as err:
reaping_cancelled = err
log.exception(f'{uid} soft reap local error')
finally:
if reaping_cancelled:
if actor_nursery.cancelled:
log.cancel(f'Nursery cancelled during soft wait for {uid}')
with trio.CancelScope(shield=True):
await maybe_wait_for_debugger()
# XXX: can't do this, it'll hang some tests.. no
# idea why yet.
# with trio.CancelScope(shield=True):
# await actor_nursery._handle_err(
# reaping_cancelled,
# portal=portal
# )
# hard reap sequence with timeouts
if proc.poll() is None:
log.cancel(f'Attempting hard reap for {uid}')
with trio.CancelScope(shield=True):
# hard reap sequence
# ``Portal.cancel_actor()`` is expected to have
# been called by the supervising nursery so we
# do **not** call it here.
await reap_proc(
proc,
# this is the same as previous timeout
# setting before rewriting this spawn
# section
terminate_after=3,
)
# if somehow the hard reap didn't collect the child then
# we send in the big gunz.
while proc.poll() is None:
log.critical(
f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n'
f'{proc}'
)
with trio.CancelScope(shield=True):
await reap_proc(
proc,
terminate_after=0.1,
)
log.info(f"Joined {proc}")
# 2 cases:
# - the actor terminated gracefully
# - we're cancelled and likely need to re-raise
# pop child entry to indicate we no longer managing this
# subactor
subactor, proc, portal = actor_nursery._children.pop(
subactor.uid)
if not actor_nursery._children:
log.cancel(f"{uid} reports all children complete!")
actor_nursery._all_children_reaped.set()
# not entirely sure why we need this.. but without it
# the reaping cancelled error is never reported upwards
# to the spawn nursery?
if reaping_cancelled:
raise reaping_cancelled
else: else:
# `multiprocessing` # `multiprocessing`
@ -710,8 +626,7 @@ async def mp_new_proc(
# no shield is required here (vs. above on the trio backend) # no shield is required here (vs. above on the trio backend)
# since debug mode is not supported on mp. # since debug mode is not supported on mp.
with trio.CancelScope(shield=True): await actor_nursery._join_procs.wait()
await actor_nursery._join_procs.wait()
finally: finally:
# XXX: in the case we were cancelled before the sub-proc # XXX: in the case we were cancelled before the sub-proc
@ -726,23 +641,13 @@ async def mp_new_proc(
try: try:
# async with trio.open_nursery() as n: # async with trio.open_nursery() as n:
# n.cancel_scope.shield = True # n.cancel_scope.shield = True
print('soft mp reap') cancel_scope = await nursery.start(
# cancel_scope = await nursery.start( result_from_portal,
result = await result_from_portal(
portal, portal,
subactor, subactor,
errors, errors
# True,
) )
except trio.Cancelled as err:
# except trio.Cancelled as err:
except BaseException as err:
log.exception('hard mp reap')
with trio.CancelScope(shield=True):
await actor_nursery._handle_err(err, portal=portal)
print('sent to nursery')
cancel_exc = err cancel_exc = err
# if the reaping task was cancelled we may have hit # if the reaping task was cancelled we may have hit
@ -752,43 +657,31 @@ async def mp_new_proc(
reaping_cancelled = True reaping_cancelled = True
if proc.is_alive(): if proc.is_alive():
with trio.CancelScope(shield=True): with trio.move_on_after(0.1) as cs:
print('hard reaping') cs.shield = True
with trio.move_on_after(0.1) as cs: await proc_waiter(proc)
cs.shield = True
await proc_waiter(proc)
if cs.cancelled_caught: if cs.cancelled_caught:
print('pwning mp proc')
proc.terminate() proc.terminate()
finally:
# if not reaping_cancelled and proc.is_alive(): if not reaping_cancelled and proc.is_alive():
# await proc_waiter(proc) await proc_waiter(proc)
# TODO: timeout block here? # TODO: timeout block here?
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# pop child entry to indicate we are no longer managing subactor # cancel result waiter that may have been spawned in
subactor, proc, portal = actor_nursery._children.pop(subactor.uid) # tandem if not done already
if cancel_scope:
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel()
if not actor_nursery._children: elif reaping_cancelled: # let the cancellation bubble up
# all subactor children have completed assert cancel_exc
# log.cancel(f"{uid} reports all children complete!") raise cancel_exc
actor_nursery._all_children_reaped.set()
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel()
if reaping_cancelled: # let the cancellation bubble up
print('raising')
assert cancel_exc
raise cancel_exc

View File

@ -12,7 +12,7 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from . import _debug from . import _debug
from ._debug import maybe_wait_for_debugger, breakpoint from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process, is_root_process from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
@ -48,19 +48,10 @@ class ActorNursery:
# cancelled when their "main" result arrives # cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set() self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False self.cancelled: bool = False
self._cancel_called: bool = False
self._join_procs = trio.Event() self._join_procs = trio.Event()
self._all_children_reaped = trio.Event() self._all_children_reaped = trio.Event()
self.errors = errors self.errors = errors
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
@ -186,22 +177,17 @@ class ActorNursery:
If ``hard_killl`` is set to ``True`` then kill the processes If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
""" """
self.cancelled = True
# entries may be poppsed by the spawning backend as # entries may be poppsed by the spawning backend as
# actors cancel individually # actors cancel individually
childs = self._children.copy() childs = self._children.copy()
if self.cancel_called:
log.warning(
f'Nursery with children {len(childs)} already cancelled')
return
log.cancel( log.cancel(
f'Cancelling nursery in {self._actor.uid} with children\n' f'Cancelling nursery in {self._actor.uid} with children\n'
f'{childs.keys()}' f'{childs.keys()}'
) )
self._cancel_called = True
# wake up all spawn tasks to move on as those nursery # wake up all spawn tasks to move on as those nursery
# has ``__aexit__()``-ed # has ``__aexit__()``-ed
@ -210,69 +196,43 @@ class ActorNursery:
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
# one-cancels-all strat # one-cancels-all strat
try: async with trio.open_nursery() as cancel_sender:
async with trio.open_nursery() as cancel_sender: for subactor, proc, portal in childs.values():
for subactor, proc, portal in childs.values(): cancel_sender.start_soon(portal.cancel_actor)
if not portal.cancel_called and portal.channel.connected():
cancel_sender.start_soon(portal.cancel_actor)
except trio.MultiError as err:
_err = err
log.exception(f'{self} errors during cancel')
# await breakpoint()
# # LOL, ok so multiprocessing requires this for some reason..
# with trio.CancelScope(shield=True):
# await trio.lowlevel.checkpoint()
# cancel all spawner tasks # cancel all spawner tasks
# self._spawn_n.cancel_scope.cancel() # self._spawn_n.cancel_scope.cancel()
self.cancelled = True
async def _handle_err( async def _handle_err(
self, self,
err: BaseException, err: BaseException,
portal: Optional[Portal] = None, portal: Optional[Portal] = None,
is_ctx_error: bool = False,
) -> bool: ) -> None:
# XXX: hypothetically an error could be # XXX: hypothetically an error could be
# raised and then a cancel signal shows up # raised and then a cancel signal shows up
# slightly after in which case the `else:` # slightly after in which case the `else:`
# block here might not complete? For now, # block here might not complete? For now,
# shield both. # shield both.
if is_ctx_error: with trio.CancelScope(shield=True):
assert not portal etype = type(err)
uid = self._actor.uid
else:
uid = portal.channel.uid
if err not in self.errors.values(): if etype in (
self.errors[uid] = err trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
with trio.CancelScope(shield=True): # cancel all subactors
etype = type(err) await self.cancel()
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.error(
f"Nursery for {current_actor().uid} "
f"errored from {uid} with\n{err}")
# cancel all subactors
await self.cancel()
return True
log.warning(f'Skipping duplicate error for {uid}')
return False
@asynccontextmanager @asynccontextmanager
@ -290,7 +250,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
# actors spawned in "daemon mode" (aka started using # actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``). # ``ActorNursery.start_actor()``).
src_err: Optional[BaseException] = None src_err: Optional[BaseException] = None
nurse_err: Optional[BaseException] = None
# errors from this daemon actor nursery bubble up to caller # errors from this daemon actor nursery bubble up to caller
try: try:
@ -339,20 +298,14 @@ async def _open_and_supervise_one_cancels_all_nursery(
# same as a user code failure. # same as a user code failure.
except BaseException as err: except BaseException as err:
print('ERROR')
# anursery._join_procs.set() # anursery._join_procs.set()
src_err = err src_err = err
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
should_raise = await anursery._handle_err(err, is_ctx_error=True) await anursery._handle_err(err)
raise
# XXX: raising here causes some cancellation
# / multierror tests to fail because of what appears to
# be double raise? we probably need to see how `trio`
# does this case..
if should_raise:
raise
# except trio.MultiError as err:
except BaseException as err: except BaseException as err:
# nursery bubble up # nursery bubble up
nurse_err = err nurse_err = err
@ -378,15 +331,15 @@ async def _open_and_supervise_one_cancels_all_nursery(
# collected in ``errors`` so cancel all actors, summarize # collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise. # all errors and re-raise.
# await breakpoint() if src_err and src_err not in errors.values():
errors[actor.uid] = src_err
if errors: if errors:
# if nurse_err or src_err:
if anursery._children: if anursery._children:
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!") raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
# await anursery.cancel() # await anursery.cancel()
# use `MultiError` as needed # use `MultiError` as needed
if len(errors) > 1: if len(errors) > 1:
raise trio.MultiError(tuple(errors.values())) raise trio.MultiError(tuple(errors.values()))