Compare commits
10 Commits
cef9ab7353
...
a55ea18c7d
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | a55ea18c7d | |
Tyler Goodlet | 797bb22981 | |
Tyler Goodlet | 2c74db9cb7 | |
Tyler Goodlet | 39feb627a8 | |
Tyler Goodlet | 348423ece7 | |
Tyler Goodlet | 5eb7c4c857 | |
Tyler Goodlet | 4d30e25591 | |
Tyler Goodlet | c01d2f8aea | |
Tyler Goodlet | 8e21bb046e | |
Tyler Goodlet | 66137030d9 |
|
@ -74,7 +74,15 @@ def pytest_configure(config):
|
|||
@pytest.fixture(scope='session', autouse=True)
|
||||
def loglevel(request):
|
||||
orig = tractor.log._default_loglevel
|
||||
level = tractor.log._default_loglevel = request.config.option.loglevel
|
||||
|
||||
level_from_cli = request.config.option.loglevel
|
||||
# disable built-in capture when user passes the `--ll` value
|
||||
# presuming they already know they want to see console logging
|
||||
# and don't need it repeated by pytest.
|
||||
if level_from_cli:
|
||||
request.config.option.showcapture = 'no'
|
||||
|
||||
level = tractor.log._default_loglevel = level_from_cli
|
||||
yield level
|
||||
tractor.log._default_loglevel = orig
|
||||
|
||||
|
|
|
@ -23,9 +23,9 @@ async def sleep_forever():
|
|||
await trio.sleep_forever()
|
||||
|
||||
|
||||
async def do_nuthin():
|
||||
async def do_nuthin(sleep=0):
|
||||
# just nick the scheduler
|
||||
await trio.sleep(0)
|
||||
await trio.sleep(sleep)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -100,6 +100,7 @@ def test_multierror(arb_addr):
|
|||
@pytest.mark.parametrize('delay', (0, 0.5))
|
||||
@pytest.mark.parametrize(
|
||||
'num_subactors', range(25, 26),
|
||||
# 'num_subactors', range(2, 3),
|
||||
)
|
||||
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
|
||||
"""Verify we raise a ``trio.MultiError`` out of a nursery where
|
||||
|
@ -222,7 +223,7 @@ async def test_cancel_infinite_streamer(start_method):
|
|||
# daemon complete quickly delay while single task
|
||||
# actors error after brief delay
|
||||
(3, tractor.MultiError, AssertionError,
|
||||
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
|
||||
(assert_err, {'delay': 1}), (do_nuthin, {'sleep': 0}, False)),
|
||||
],
|
||||
ids=[
|
||||
'1_run_in_actor_fails',
|
||||
|
@ -325,6 +326,7 @@ async def spawn_and_error(breadth, depth) -> None:
|
|||
)
|
||||
kwargs = {
|
||||
'name': f'{name}_errorer_{i}',
|
||||
# 'delay': 0.01,
|
||||
}
|
||||
await nursery.run_in_actor(*args, **kwargs)
|
||||
|
||||
|
@ -389,13 +391,23 @@ async def test_nested_multierrors(loglevel, start_method):
|
|||
# on windows sometimes spawning is just too slow and
|
||||
# we get back the (sent) cancel signal instead
|
||||
if platform.system() == 'Windows':
|
||||
assert (subexc.type is trio.MultiError) or (
|
||||
subexc.type is tractor.RemoteActorError)
|
||||
assert subexc.type in (
|
||||
trio.MultiError,
|
||||
tractor.RemoteActorError,
|
||||
)
|
||||
|
||||
else:
|
||||
assert subexc.type is trio.MultiError
|
||||
assert subexc.type in (
|
||||
trio.MultiError,
|
||||
trio.Cancelled,
|
||||
# tractor.RemoteActorError,
|
||||
)
|
||||
else:
|
||||
assert (subexc.type is tractor.RemoteActorError) or (
|
||||
subexc.type is trio.Cancelled)
|
||||
assert subexc.type in (
|
||||
tractor.RemoteActorError,
|
||||
trio.Cancelled,
|
||||
)
|
||||
|
||||
else:
|
||||
pytest.fail(f'Got no error from nursery?')
|
||||
|
||||
|
|
|
@ -180,6 +180,7 @@ def test_multi_actor_subs_arbiter_pub(
|
|||
'streamer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
name = 'streamer'
|
||||
|
||||
even_portal = await n.run_in_actor(
|
||||
subs,
|
||||
|
|
|
@ -58,7 +58,7 @@ async def _invoke(
|
|||
Invoke local func and deliver result(s) over provided channel.
|
||||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
# __tracebackhide__ = True
|
||||
treat_as_gen = False
|
||||
|
||||
# possible a traceback (not sure what typing is for this..)
|
||||
|
@ -69,6 +69,7 @@ async def _invoke(
|
|||
|
||||
ctx = Context(chan, cid)
|
||||
context: bool = False
|
||||
fname = func.__name__
|
||||
|
||||
if getattr(func, '_tractor_stream_function', False):
|
||||
# handle decorated ``@tractor.stream`` async functions
|
||||
|
@ -164,6 +165,7 @@ async def _invoke(
|
|||
await chan.send({'return': await coro, 'cid': cid})
|
||||
except trio.Cancelled as err:
|
||||
tb = err.__traceback__
|
||||
raise
|
||||
|
||||
if cs.cancelled_caught:
|
||||
|
||||
|
@ -171,7 +173,6 @@ async def _invoke(
|
|||
# so they can be unwrapped and displayed on the caller
|
||||
# side!
|
||||
|
||||
fname = func.__name__
|
||||
if ctx._cancel_called:
|
||||
msg = f'{fname} cancelled itself'
|
||||
|
||||
|
@ -192,9 +193,33 @@ async def _invoke(
|
|||
await chan.send({'functype': 'asyncfunc', 'cid': cid})
|
||||
with cancel_scope as cs:
|
||||
task_status.started(cs)
|
||||
try:
|
||||
await chan.send({'return': await coro, 'cid': cid})
|
||||
except trio.Cancelled as err:
|
||||
tb = err.__traceback__
|
||||
raise
|
||||
# await chan.send({'return': await coro, 'cid': cid})
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
if cs.cancelled_caught:
|
||||
# if cs.cancel_called:
|
||||
if cs.cancel_called:
|
||||
msg = (
|
||||
f'{fname} was remotely cancelled by its caller '
|
||||
f'{ctx.chan.uid}'
|
||||
)
|
||||
else:
|
||||
msg = f'{fname} cancelled itself'
|
||||
|
||||
raise ContextCancelled(
|
||||
msg,
|
||||
suberror_type=trio.Cancelled,
|
||||
)
|
||||
|
||||
except (
|
||||
Exception,
|
||||
trio.MultiError,
|
||||
# trio.Cancelled,
|
||||
) as err:
|
||||
|
||||
if not is_multi_cancelled(err):
|
||||
|
||||
|
@ -248,7 +273,7 @@ async def _invoke(
|
|||
# If we're cancelled before the task returns then the
|
||||
# cancel scope will not have been inserted yet
|
||||
log.warning(
|
||||
f"Task {func} likely errored or cancelled before it started")
|
||||
f"Task {func} likely errored or cancelled before start")
|
||||
finally:
|
||||
if not actor._rpc_tasks:
|
||||
log.runtime("All RPC tasks have completed")
|
||||
|
@ -358,6 +383,14 @@ class Actor:
|
|||
Tuple[Any, Any, Any, Any, Any]] = None
|
||||
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
'''
|
||||
Same principle as ``trio.CancelScope.cancel_called``.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
async def wait_for_peer(
|
||||
self, uid: Tuple[str, str]
|
||||
) -> Tuple[trio.Event, Channel]:
|
||||
|
@ -543,7 +576,8 @@ class Actor:
|
|||
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
|
||||
assert send_chan.cid == cid # type: ignore
|
||||
|
||||
# if 'error' in msg:
|
||||
if 'error' in msg:
|
||||
recv_chan
|
||||
# ctx = getattr(recv_chan, '_ctx', None)
|
||||
# if ctx:
|
||||
# ctx._error_from_remote_msg(msg)
|
||||
|
@ -618,6 +652,7 @@ class Actor:
|
|||
# worked out we'll likely want to use that!
|
||||
msg = None
|
||||
nursery_cancelled_before_task: bool = False
|
||||
uid = chan.uid
|
||||
|
||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||
try:
|
||||
|
@ -641,7 +676,7 @@ class Actor:
|
|||
|
||||
log.runtime(
|
||||
f"Msg loop signalled to terminate for"
|
||||
f" {chan} from {chan.uid}")
|
||||
f" {chan} from {uid}")
|
||||
|
||||
break
|
||||
|
||||
|
@ -676,38 +711,46 @@ class Actor:
|
|||
f"{ns}.{funcname}({kwargs})")
|
||||
if ns == 'self':
|
||||
func = getattr(self, funcname)
|
||||
if funcname == 'cancel':
|
||||
|
||||
# don't start entire actor runtime cancellation if this
|
||||
# actor is in debug mode
|
||||
if funcname == 'cancel':
|
||||
# self.cancel() was called so kill this
|
||||
# msg loop and break out into
|
||||
# ``_async_main()``
|
||||
|
||||
log.cancel(
|
||||
f"{self.uid} remote cancel msg from {uid}")
|
||||
|
||||
# don't start entire actor runtime
|
||||
# cancellation if this actor is in debug
|
||||
# mode
|
||||
pdb_complete = _debug._local_pdb_complete
|
||||
if pdb_complete:
|
||||
log.cancel(
|
||||
f'{self.uid} is in debug, wait for unlock')
|
||||
await pdb_complete.wait()
|
||||
|
||||
# we immediately start the runtime machinery shutdown
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
with trio.CancelScope(shield=True):
|
||||
# self.cancel() was called so kill this msg loop
|
||||
# and break out into ``_async_main()``
|
||||
log.cancel(
|
||||
f"Actor {self.uid} was remotely cancelled; "
|
||||
"waiting on cancellation completion..")
|
||||
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
|
||||
# await self._cancel_complete.wait()
|
||||
await _invoke(
|
||||
self, cid, chan, func, kwargs, is_rpc=False
|
||||
)
|
||||
|
||||
loop_cs.cancel()
|
||||
break
|
||||
continue
|
||||
|
||||
if funcname == '_cancel_task':
|
||||
|
||||
# we immediately start the runtime machinery shutdown
|
||||
with trio.CancelScope(shield=True):
|
||||
# self.cancel() was called so kill this msg loop
|
||||
# and break out into ``_async_main()``
|
||||
kwargs['chan'] = chan
|
||||
task_cid = kwargs['cid']
|
||||
log.cancel(
|
||||
f"Actor {self.uid} was remotely cancelled; "
|
||||
"waiting on cancellation completion..")
|
||||
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
|
||||
f'Actor {uid} requests cancel for {task_cid}')
|
||||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
with trio.CancelScope(shield=True):
|
||||
kwargs['chan'] = chan
|
||||
await _invoke(
|
||||
self, cid, chan, func, kwargs, is_rpc=False
|
||||
)
|
||||
continue
|
||||
else:
|
||||
# complain to client about restricted modules
|
||||
|
@ -752,7 +795,8 @@ class Actor:
|
|||
log.runtime(
|
||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||
|
||||
# end of async for, channel disconnect vis ``trio.EndOfChannel``
|
||||
# end of async for, channel disconnect vis
|
||||
# ``trio.EndOfChannel``
|
||||
log.runtime(
|
||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||
)
|
||||
|
@ -999,7 +1043,7 @@ class Actor:
|
|||
raise
|
||||
|
||||
finally:
|
||||
log.info("Runtime nursery complete")
|
||||
log.runtime("root runtime nursery complete")
|
||||
|
||||
# tear down all lifetime contexts if not in guest mode
|
||||
# XXX: should this just be in the entrypoint?
|
||||
|
@ -1193,7 +1237,10 @@ class Actor:
|
|||
tasks = self._rpc_tasks
|
||||
if tasks:
|
||||
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||
for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
|
||||
for (
|
||||
(chan, cid),
|
||||
(scope, func, is_complete),
|
||||
) in tasks.copy().items():
|
||||
if only_chan is not None:
|
||||
if only_chan != chan:
|
||||
continue
|
||||
|
@ -1202,6 +1249,7 @@ class Actor:
|
|||
if func != self._cancel_task:
|
||||
await self._cancel_task(cid, chan)
|
||||
|
||||
if tasks:
|
||||
log.cancel(
|
||||
f"Waiting for remaining rpc tasks to complete {tasks}")
|
||||
await self._ongoing_rpc_tasks.wait()
|
||||
|
|
|
@ -84,6 +84,15 @@ class Portal:
|
|||
] = None
|
||||
self._streams: Set[ReceiveMsgStream] = set()
|
||||
self.actor = current_actor()
|
||||
self._cancel_called: bool = False
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
'''
|
||||
Same principle as ``trio.CancelScope.cancel_called``.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
async def _submit(
|
||||
self,
|
||||
|
@ -129,7 +138,7 @@ class Portal:
|
|||
resptype: str,
|
||||
first_msg: dict
|
||||
) -> Any:
|
||||
__tracebackhide__ = True
|
||||
# __tracebackhide__ = True
|
||||
assert resptype == 'asyncfunc' # single response
|
||||
|
||||
msg = await recv_chan.receive()
|
||||
|
@ -145,7 +154,7 @@ class Portal:
|
|||
Return the result(s) from the remote actor's "main" task.
|
||||
|
||||
"""
|
||||
__tracebackhide__ = True
|
||||
# __tracebackhide__ = True
|
||||
# Check for non-rpc errors slapped on the
|
||||
# channel for which we always raise
|
||||
exc = self.channel._exc
|
||||
|
@ -197,9 +206,16 @@ class Portal:
|
|||
# we'll need to .aclose all those channels here
|
||||
await self._cancel_streams()
|
||||
|
||||
async def cancel_actor(self):
|
||||
"""Cancel the actor on the other end of this portal.
|
||||
"""
|
||||
async def cancel_actor(self) -> None:
|
||||
'''
|
||||
Cancel the actor on the other end of this portal.
|
||||
|
||||
That means cancelling the "actor runtime" not just any one
|
||||
task that's running there.
|
||||
|
||||
'''
|
||||
self._cancel_called = True
|
||||
|
||||
if not self.channel.connected():
|
||||
log.cancel("This portal is already closed can't cancel")
|
||||
return False
|
||||
|
@ -207,8 +223,8 @@ class Portal:
|
|||
await self._cancel_streams()
|
||||
|
||||
log.cancel(
|
||||
f"Sending actor cancel request to {self.channel.uid} on "
|
||||
f"{self.channel}")
|
||||
f"Sending runtime cancel msg to {self.channel.uid} @ "
|
||||
f"{self.channel.raddr}")
|
||||
try:
|
||||
# send cancel cmd - might not get response
|
||||
# XXX: sure would be nice to make this work with a proper shield
|
||||
|
|
|
@ -31,8 +31,12 @@ from .log import get_logger
|
|||
from ._portal import Portal
|
||||
from ._actor import Actor
|
||||
from ._entry import _mp_main
|
||||
from ._exceptions import ActorFailure, RemoteActorError
|
||||
from ._debug import maybe_wait_for_debugger
|
||||
from ._exceptions import (
|
||||
ActorFailure,
|
||||
RemoteActorError,
|
||||
ContextCancelled,
|
||||
)
|
||||
from ._debug import maybe_wait_for_debugger, breakpoint
|
||||
|
||||
|
||||
log = get_logger('tractor')
|
||||
|
@ -92,6 +96,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
|
|||
|
||||
|
||||
async def result_from_portal(
|
||||
|
||||
portal: Portal,
|
||||
actor: Actor,
|
||||
|
||||
|
@ -99,7 +104,7 @@ async def result_from_portal(
|
|||
cancel_on_result: bool = False,
|
||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
) -> tuple[Optional[Any], Optional[BaseException]]:
|
||||
"""
|
||||
Cancel actor gracefully once it's "main" portal's
|
||||
result arrives.
|
||||
|
@ -107,9 +112,11 @@ async def result_from_portal(
|
|||
Should only be called for actors spawned with `run_in_actor()`.
|
||||
|
||||
"""
|
||||
__tracebackhide__ = True
|
||||
# __tracebackhide__ = True
|
||||
|
||||
uid = portal.channel.uid
|
||||
remote_result = None
|
||||
is_remote_result = None
|
||||
|
||||
# cancel control is explicityl done by the caller
|
||||
with trio.CancelScope() as cs:
|
||||
|
@ -120,50 +127,42 @@ async def result_from_portal(
|
|||
# a MultiError and we still send out a cancel request
|
||||
# result = await exhaust_portal(portal, actor)
|
||||
try:
|
||||
log.debug(f"Waiting on final result from {actor.uid}")
|
||||
log.info(f"Waiting on final result from {actor.uid}")
|
||||
|
||||
# XXX: streams should never be reaped here since they should
|
||||
# always be established and shutdown using a context manager api
|
||||
result = await portal.result()
|
||||
log.debug(f"Returning final result: {result}")
|
||||
is_remote_result = True
|
||||
log.info(f"Returning final result: {result}")
|
||||
|
||||
except RemoteActorError as rerr:
|
||||
# this includes real remote errors as well as
|
||||
# `ContextCancelled`
|
||||
is_remote_result = True
|
||||
result = rerr
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
# we reraise in the parent task via a ``trio.MultiError``
|
||||
result = err
|
||||
errors[actor.uid] = err
|
||||
raise
|
||||
|
||||
except trio.Cancelled as err:
|
||||
# lol, of course we need this too ;P
|
||||
# TODO: merge with above?
|
||||
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
|
||||
is_remote_result = False
|
||||
result = err
|
||||
# errors[actor.uid] = err
|
||||
# raise
|
||||
|
||||
if cancel_on_result:
|
||||
if isinstance(result, Exception):
|
||||
# errors[actor.uid] = result
|
||||
log.warning(
|
||||
f"Cancelling single-task-run {uid} after error {result}"
|
||||
)
|
||||
# raise result
|
||||
if cs.cancelled_caught:
|
||||
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
|
||||
|
||||
else:
|
||||
log.runtime(
|
||||
f"Cancelling {uid} gracefully "
|
||||
f"after one-time-task result {result}")
|
||||
return result, is_remote_result
|
||||
|
||||
# an actor that was `.run_in_actor()` executes a single task
|
||||
# and delivers the result, then we cancel it.
|
||||
# TODO: likely in the future we should just implement this using
|
||||
# the new `open_context()` IPC api, since it's the more general
|
||||
# api and can represent this form.
|
||||
# XXX: do we need this?
|
||||
# await maybe_wait_for_debugger()
|
||||
await portal.cancel_actor()
|
||||
# except trio.Cancelled as err:
|
||||
# # lol, of course we need this too ;P
|
||||
# # TODO: merge with above?
|
||||
# log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
|
||||
# result = err
|
||||
# # errors[actor.uid] = err
|
||||
# raise
|
||||
|
||||
return result
|
||||
|
||||
# return result
|
||||
|
||||
|
||||
async def do_hard_kill(
|
||||
|
@ -204,16 +203,18 @@ async def do_hard_kill(
|
|||
async def reap_proc(
|
||||
|
||||
proc: trio.Process,
|
||||
terminate_after: float = float('inf'),
|
||||
uid: tuple[str, str],
|
||||
terminate_after: Optional[float] = None,
|
||||
hard_kill_after: int = 0.1,
|
||||
|
||||
) -> None:
|
||||
with trio.move_on_after(terminate_after) as cs:
|
||||
with trio.move_on_after(terminate_after or float('inf')) as cs:
|
||||
# Wait for proc termination but **dont' yet** do
|
||||
# any out-of-ipc-land termination / process
|
||||
# killing. This is a "light" (cancellable) join,
|
||||
# the hard join is below after timeout
|
||||
await proc.wait()
|
||||
log.info(f'Proc for {uid} terminated gracefully')
|
||||
|
||||
if cs.cancelled_caught and terminate_after is not float('inf'):
|
||||
# Always "hard" join lingering sub procs since no
|
||||
|
@ -242,6 +243,7 @@ async def new_proc(
|
|||
bind_addr: Tuple[str, int],
|
||||
parent_addr: Tuple[str, int],
|
||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||
|
||||
*,
|
||||
|
||||
graceful_kill_timeout: int = 3,
|
||||
|
@ -351,6 +353,7 @@ async def new_proc(
|
|||
) as cerr:
|
||||
|
||||
log.exception(f'Relaying unexpected {cerr} to nursery')
|
||||
await breakpoint()
|
||||
|
||||
# sending IPC-msg level cancel requests is expected to be
|
||||
# managed by the nursery.
|
||||
|
@ -367,16 +370,16 @@ async def new_proc(
|
|||
# True, # cancel_on_result
|
||||
)
|
||||
|
||||
finally:
|
||||
# 2 cases:
|
||||
# Graceful reap attempt - 2 cases:
|
||||
# - actor nursery was cancelled in which case
|
||||
# we want to try a soft reap of the actor via
|
||||
# ipc cancellation and then failing that do a hard
|
||||
# reap.
|
||||
# - this is normal termination and we must wait indefinitely
|
||||
# for ria and daemon actors
|
||||
# for ria to return and daemon actors to be cancelled
|
||||
reaping_cancelled: bool = False
|
||||
ria = portal in actor_nursery._cancel_after_result_on_exit
|
||||
result = None
|
||||
|
||||
# this is the soft reap sequence. we can
|
||||
# either collect results:
|
||||
|
@ -393,70 +396,138 @@ async def new_proc(
|
|||
try:
|
||||
log.cancel(f'Starting soft actor reap for {uid}')
|
||||
cancel_scope = None
|
||||
# async with trio.open_nursery() as nursery:
|
||||
reap_timeout = None
|
||||
|
||||
if portal.channel.connected() and ria:
|
||||
|
||||
# we wait for result and cancel on completion
|
||||
await result_from_portal(
|
||||
result, is_remote = await result_from_portal(
|
||||
portal,
|
||||
subactor,
|
||||
errors,
|
||||
True, # cancel_on_result
|
||||
)
|
||||
# # collect any expected ``.run_in_actor()`` results
|
||||
# cancel_scope = await nursery.start(
|
||||
# result_from_portal,
|
||||
# portal,
|
||||
# subactor,
|
||||
# errors,
|
||||
# True, # cancel_on_result
|
||||
# )
|
||||
)
|
||||
if is_remote:
|
||||
if isinstance(result, RemoteActorError):
|
||||
# errors[actor.uid] = result
|
||||
if (
|
||||
portal.cancel_called and
|
||||
isinstance(result, ContextCancelled)
|
||||
):
|
||||
log.cancel(f'{uid} received expected cancel')
|
||||
errors[uid] = result
|
||||
|
||||
# fall through to below soft proc reap
|
||||
reap_timeout = 0.5
|
||||
|
||||
else:
|
||||
log.warning(
|
||||
f"Cancelling single-task-run {uid} after remote error {result}"
|
||||
)
|
||||
|
||||
# likely a real remote error propagation
|
||||
# so pass up to nursery strat
|
||||
should_raise = await actor_nursery._handle_err(
|
||||
result,
|
||||
portal=portal,
|
||||
)
|
||||
|
||||
# propagate up to spawn nursery to be
|
||||
# grouped into any multierror.
|
||||
# if should_raise:
|
||||
# raise result
|
||||
|
||||
else:
|
||||
log.runtime(
|
||||
f"Cancelling {uid} gracefully "
|
||||
f"after one-time-task result {result}")
|
||||
|
||||
# an actor that was `.run_in_actor()` executes a single task
|
||||
# and delivers the result, then we cancel it.
|
||||
# TODO: likely in the future we should just implement this using
|
||||
# the new `open_context()` IPC api, since it's the more general
|
||||
# api and can represent this form.
|
||||
# XXX: do we need this?
|
||||
# await maybe_wait_for_debugger()
|
||||
await portal.cancel_actor()
|
||||
|
||||
else:
|
||||
log.exception(
|
||||
f"Cancelling single-task-run {uid} after local error"
|
||||
)
|
||||
raise result
|
||||
|
||||
# soft & cancellable
|
||||
await reap_proc(proc)
|
||||
await reap_proc(proc, uid, terminate_after=reap_timeout)
|
||||
|
||||
# # if proc terminates before portal result
|
||||
# if cancel_scope:
|
||||
# cancel_scope.cancel()
|
||||
# except (
|
||||
# ContextCancelled,
|
||||
# ) as err:
|
||||
# if portal.cancel_called:
|
||||
# log.cancel('{uid} received expected cancel')
|
||||
|
||||
except (
|
||||
RemoteActorError,
|
||||
) as err:
|
||||
reaping_cancelled = err
|
||||
log.exception(f'{uid} remote error')
|
||||
await actor_nursery._handle_err(err, portal=portal)
|
||||
# # soft & cancellable
|
||||
# await reap_proc(proc, uid, terminate_after=0.1)
|
||||
|
||||
# except (
|
||||
# RemoteActorError,
|
||||
# ) as err:
|
||||
# reaping_cancelled = err
|
||||
# log.exception(f'{uid} remote error')
|
||||
# await actor_nursery._handle_err(err, portal=portal)
|
||||
|
||||
except (
|
||||
trio.Cancelled,
|
||||
) as err:
|
||||
reaping_cancelled = err
|
||||
if actor_nursery.cancelled:
|
||||
log.cancel(f'{uid} wait cancelled by nursery')
|
||||
else:
|
||||
log.exception(f'{uid} soft wait error?')
|
||||
|
||||
except (
|
||||
BaseException
|
||||
) as err:
|
||||
reaping_cancelled = err
|
||||
log.exception(f'{uid} soft reap local error')
|
||||
# NOTE: for now we pack the cancelleds and expect the actor
|
||||
# nursery to re-raise them in a multierror but we could
|
||||
# have also let them bubble up through the spawn nursery.
|
||||
|
||||
# in theory it's more correct to raise any
|
||||
# `ContextCancelled` errors we get back from the
|
||||
# `Portal.cancel_actor()` call and in that error
|
||||
# have meta-data about whether we timeout out or
|
||||
# actually got a cancel message back from the remote task.
|
||||
|
||||
# IF INSTEAD we raised *here* then this logic has to be
|
||||
# handled inside the oca supervisor block and the spawn_n
|
||||
# task cancelleds would have to be replaced with the remote
|
||||
# task `ContextCancelled`s, *if* they ever arrive.
|
||||
errors[uid] = err
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await breakpoint()
|
||||
|
||||
if actor_nursery.cancel_called:
|
||||
log.cancel(f'{uid} soft reap cancelled by nursery')
|
||||
else:
|
||||
if not actor_nursery._spawn_n.cancel_scope.cancel_called:
|
||||
# this would be pretty weird and unexpected
|
||||
await breakpoint()
|
||||
|
||||
# actor nursery wasn't cancelled before the spawn
|
||||
# nursery was which likely means that there was
|
||||
# an error in the actor nursery enter and the
|
||||
# spawn nursery cancellation "beat" the call to
|
||||
# .cancel()? that's a bug right?
|
||||
|
||||
# saw this with settings bugs in the ordermode pane in
|
||||
# piker.
|
||||
log.exception(f'{uid} soft wait error?')
|
||||
raise RuntimeError(
|
||||
'Task spawn nursery cancelled before actor nursery?')
|
||||
|
||||
finally:
|
||||
if reaping_cancelled:
|
||||
assert actor_nursery.cancel_called
|
||||
if actor_nursery.cancelled:
|
||||
log.cancel(f'Nursery cancelled during soft wait for {uid}')
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
await maybe_wait_for_debugger()
|
||||
|
||||
# XXX: can't do this, it'll hang some tests.. no
|
||||
# idea why yet.
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await actor_nursery._handle_err(
|
||||
# reaping_cancelled,
|
||||
# portal=portal
|
||||
# )
|
||||
# XXX: we should probably just
|
||||
# check for a `ContextCancelled` on portals
|
||||
# here and fill them in over `trio.Cancelled` right?
|
||||
|
||||
# hard reap sequence with timeouts
|
||||
if proc.poll() is None:
|
||||
|
@ -471,6 +542,7 @@ async def new_proc(
|
|||
|
||||
await reap_proc(
|
||||
proc,
|
||||
uid,
|
||||
# this is the same as previous timeout
|
||||
# setting before rewriting this spawn
|
||||
# section
|
||||
|
@ -488,6 +560,7 @@ async def new_proc(
|
|||
with trio.CancelScope(shield=True):
|
||||
await reap_proc(
|
||||
proc,
|
||||
uid,
|
||||
terminate_after=0.1,
|
||||
)
|
||||
|
||||
|
@ -501,15 +574,26 @@ async def new_proc(
|
|||
# subactor
|
||||
subactor, proc, portal = actor_nursery._children.pop(
|
||||
subactor.uid)
|
||||
|
||||
if not actor_nursery._children:
|
||||
# all subactor children have completed
|
||||
log.cancel(f"{uid} reports all children complete!")
|
||||
|
||||
actor_nursery._all_children_reaped.set()
|
||||
|
||||
spawn_n = actor_nursery._spawn_n
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await breakpoint()
|
||||
if not spawn_n._closed:
|
||||
# the parent task that opened the actor nursery
|
||||
# hasn't yet closed it so we cancel that task now.
|
||||
spawn_n.cancel_scope.cancel()
|
||||
|
||||
# not entirely sure why we need this.. but without it
|
||||
# the reaping cancelled error is never reported upwards
|
||||
# to the spawn nursery?
|
||||
if reaping_cancelled:
|
||||
raise reaping_cancelled
|
||||
# if reaping_cancelled:
|
||||
# raise reaping_cancelled
|
||||
|
||||
else:
|
||||
# `multiprocessing`
|
||||
|
@ -626,6 +710,7 @@ async def mp_new_proc(
|
|||
|
||||
# no shield is required here (vs. above on the trio backend)
|
||||
# since debug mode is not supported on mp.
|
||||
with trio.CancelScope(shield=True):
|
||||
await actor_nursery._join_procs.wait()
|
||||
|
||||
finally:
|
||||
|
@ -641,13 +726,23 @@ async def mp_new_proc(
|
|||
try:
|
||||
# async with trio.open_nursery() as n:
|
||||
# n.cancel_scope.shield = True
|
||||
cancel_scope = await nursery.start(
|
||||
result_from_portal,
|
||||
print('soft mp reap')
|
||||
# cancel_scope = await nursery.start(
|
||||
result = await result_from_portal(
|
||||
portal,
|
||||
subactor,
|
||||
errors
|
||||
errors,
|
||||
# True,
|
||||
)
|
||||
except trio.Cancelled as err:
|
||||
|
||||
# except trio.Cancelled as err:
|
||||
except BaseException as err:
|
||||
|
||||
log.exception('hard mp reap')
|
||||
with trio.CancelScope(shield=True):
|
||||
await actor_nursery._handle_err(err, portal=portal)
|
||||
print('sent to nursery')
|
||||
|
||||
cancel_exc = err
|
||||
|
||||
# if the reaping task was cancelled we may have hit
|
||||
|
@ -657,23 +752,34 @@ async def mp_new_proc(
|
|||
reaping_cancelled = True
|
||||
|
||||
if proc.is_alive():
|
||||
with trio.CancelScope(shield=True):
|
||||
print('hard reaping')
|
||||
with trio.move_on_after(0.1) as cs:
|
||||
cs.shield = True
|
||||
await proc_waiter(proc)
|
||||
|
||||
if cs.cancelled_caught:
|
||||
print('pwning mp proc')
|
||||
proc.terminate()
|
||||
finally:
|
||||
|
||||
if not reaping_cancelled and proc.is_alive():
|
||||
await proc_waiter(proc)
|
||||
# if not reaping_cancelled and proc.is_alive():
|
||||
# await proc_waiter(proc)
|
||||
|
||||
# TODO: timeout block here?
|
||||
proc.join()
|
||||
|
||||
log.debug(f"Joined {proc}")
|
||||
|
||||
# pop child entry to indicate we are no longer managing subactor
|
||||
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
|
||||
|
||||
if not actor_nursery._children:
|
||||
# all subactor children have completed
|
||||
# log.cancel(f"{uid} reports all children complete!")
|
||||
actor_nursery._all_children_reaped.set()
|
||||
|
||||
|
||||
# cancel result waiter that may have been spawned in
|
||||
# tandem if not done already
|
||||
if cancel_scope:
|
||||
|
@ -682,6 +788,7 @@ async def mp_new_proc(
|
|||
f"{subactor.uid}")
|
||||
cancel_scope.cancel()
|
||||
|
||||
elif reaping_cancelled: # let the cancellation bubble up
|
||||
if reaping_cancelled: # let the cancellation bubble up
|
||||
print('raising')
|
||||
assert cancel_exc
|
||||
raise cancel_exc
|
||||
|
|
|
@ -12,7 +12,7 @@ import trio
|
|||
from async_generator import asynccontextmanager
|
||||
|
||||
from . import _debug
|
||||
from ._debug import maybe_wait_for_debugger
|
||||
from ._debug import maybe_wait_for_debugger, breakpoint
|
||||
from ._state import current_actor, is_main_process, is_root_process
|
||||
from .log import get_logger, get_loglevel
|
||||
from ._actor import Actor
|
||||
|
@ -48,10 +48,19 @@ class ActorNursery:
|
|||
# cancelled when their "main" result arrives
|
||||
self._cancel_after_result_on_exit: set = set()
|
||||
self.cancelled: bool = False
|
||||
self._cancel_called: bool = False
|
||||
self._join_procs = trio.Event()
|
||||
self._all_children_reaped = trio.Event()
|
||||
self.errors = errors
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
'''
|
||||
Same principle as ``trio.CancelScope.cancel_called``.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
async def start_actor(
|
||||
self,
|
||||
name: str,
|
||||
|
@ -177,17 +186,22 @@ class ActorNursery:
|
|||
|
||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
||||
directly without any far end graceful ``trio`` cancellation.
|
||||
"""
|
||||
self.cancelled = True
|
||||
|
||||
"""
|
||||
# entries may be poppsed by the spawning backend as
|
||||
# actors cancel individually
|
||||
childs = self._children.copy()
|
||||
|
||||
if self.cancel_called:
|
||||
log.warning(
|
||||
f'Nursery with children {len(childs)} already cancelled')
|
||||
return
|
||||
|
||||
log.cancel(
|
||||
f'Cancelling nursery in {self._actor.uid} with children\n'
|
||||
f'{childs.keys()}'
|
||||
)
|
||||
self._cancel_called = True
|
||||
|
||||
# wake up all spawn tasks to move on as those nursery
|
||||
# has ``__aexit__()``-ed
|
||||
|
@ -196,24 +210,45 @@ class ActorNursery:
|
|||
await maybe_wait_for_debugger()
|
||||
|
||||
# one-cancels-all strat
|
||||
try:
|
||||
async with trio.open_nursery() as cancel_sender:
|
||||
for subactor, proc, portal in childs.values():
|
||||
if not portal.cancel_called and portal.channel.connected():
|
||||
cancel_sender.start_soon(portal.cancel_actor)
|
||||
|
||||
except trio.MultiError as err:
|
||||
_err = err
|
||||
log.exception(f'{self} errors during cancel')
|
||||
# await breakpoint()
|
||||
# # LOL, ok so multiprocessing requires this for some reason..
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await trio.lowlevel.checkpoint()
|
||||
|
||||
# cancel all spawner tasks
|
||||
# self._spawn_n.cancel_scope.cancel()
|
||||
self.cancelled = True
|
||||
|
||||
async def _handle_err(
|
||||
self,
|
||||
err: BaseException,
|
||||
portal: Optional[Portal] = None,
|
||||
is_ctx_error: bool = False,
|
||||
|
||||
) -> None:
|
||||
) -> bool:
|
||||
# XXX: hypothetically an error could be
|
||||
# raised and then a cancel signal shows up
|
||||
# slightly after in which case the `else:`
|
||||
# block here might not complete? For now,
|
||||
# shield both.
|
||||
if is_ctx_error:
|
||||
assert not portal
|
||||
uid = self._actor.uid
|
||||
else:
|
||||
uid = portal.channel.uid
|
||||
|
||||
if err not in self.errors.values():
|
||||
self.errors[uid] = err
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
etype = type(err)
|
||||
|
||||
|
@ -227,13 +262,18 @@ class ActorNursery:
|
|||
f"Nursery for {current_actor().uid} "
|
||||
f"was cancelled with {etype}")
|
||||
else:
|
||||
log.exception(
|
||||
log.error(
|
||||
f"Nursery for {current_actor().uid} "
|
||||
f"errored with {err}, ")
|
||||
f"errored from {uid} with\n{err}")
|
||||
|
||||
# cancel all subactors
|
||||
await self.cancel()
|
||||
|
||||
return True
|
||||
|
||||
log.warning(f'Skipping duplicate error for {uid}')
|
||||
return False
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _open_and_supervise_one_cancels_all_nursery(
|
||||
|
@ -250,6 +290,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
# actors spawned in "daemon mode" (aka started using
|
||||
# ``ActorNursery.start_actor()``).
|
||||
src_err: Optional[BaseException] = None
|
||||
nurse_err: Optional[BaseException] = None
|
||||
|
||||
# errors from this daemon actor nursery bubble up to caller
|
||||
try:
|
||||
|
@ -298,14 +339,20 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
# same as a user code failure.
|
||||
|
||||
except BaseException as err:
|
||||
print('ERROR')
|
||||
# anursery._join_procs.set()
|
||||
src_err = err
|
||||
|
||||
# with trio.CancelScope(shield=True):
|
||||
await anursery._handle_err(err)
|
||||
should_raise = await anursery._handle_err(err, is_ctx_error=True)
|
||||
|
||||
# XXX: raising here causes some cancellation
|
||||
# / multierror tests to fail because of what appears to
|
||||
# be double raise? we probably need to see how `trio`
|
||||
# does this case..
|
||||
if should_raise:
|
||||
raise
|
||||
|
||||
# except trio.MultiError as err:
|
||||
except BaseException as err:
|
||||
# nursery bubble up
|
||||
nurse_err = err
|
||||
|
@ -331,15 +378,15 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
# collected in ``errors`` so cancel all actors, summarize
|
||||
# all errors and re-raise.
|
||||
|
||||
if src_err and src_err not in errors.values():
|
||||
errors[actor.uid] = src_err
|
||||
|
||||
# await breakpoint()
|
||||
if errors:
|
||||
# if nurse_err or src_err:
|
||||
if anursery._children:
|
||||
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await anursery.cancel()
|
||||
|
||||
|
||||
# use `MultiError` as needed
|
||||
if len(errors) > 1:
|
||||
raise trio.MultiError(tuple(errors.values()))
|
||||
|
|
Loading…
Reference in New Issue