Compare commits
No commits in common. "a55ea18c7d2654c414ae12e35611cf404d45a8b8" and "cef9ab73534220eac3703161e08a256aeedd7436" have entirely different histories.
a55ea18c7d
...
cef9ab7353
|
@ -74,15 +74,7 @@ def pytest_configure(config):
|
||||||
@pytest.fixture(scope='session', autouse=True)
|
@pytest.fixture(scope='session', autouse=True)
|
||||||
def loglevel(request):
|
def loglevel(request):
|
||||||
orig = tractor.log._default_loglevel
|
orig = tractor.log._default_loglevel
|
||||||
|
level = tractor.log._default_loglevel = request.config.option.loglevel
|
||||||
level_from_cli = request.config.option.loglevel
|
|
||||||
# disable built-in capture when user passes the `--ll` value
|
|
||||||
# presuming they already know they want to see console logging
|
|
||||||
# and don't need it repeated by pytest.
|
|
||||||
if level_from_cli:
|
|
||||||
request.config.option.showcapture = 'no'
|
|
||||||
|
|
||||||
level = tractor.log._default_loglevel = level_from_cli
|
|
||||||
yield level
|
yield level
|
||||||
tractor.log._default_loglevel = orig
|
tractor.log._default_loglevel = orig
|
||||||
|
|
||||||
|
|
|
@ -23,9 +23,9 @@ async def sleep_forever():
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
async def do_nuthin(sleep=0):
|
async def do_nuthin():
|
||||||
# just nick the scheduler
|
# just nick the scheduler
|
||||||
await trio.sleep(sleep)
|
await trio.sleep(0)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
@ -100,7 +100,6 @@ def test_multierror(arb_addr):
|
||||||
@pytest.mark.parametrize('delay', (0, 0.5))
|
@pytest.mark.parametrize('delay', (0, 0.5))
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'num_subactors', range(25, 26),
|
'num_subactors', range(25, 26),
|
||||||
# 'num_subactors', range(2, 3),
|
|
||||||
)
|
)
|
||||||
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
|
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
|
||||||
"""Verify we raise a ``trio.MultiError`` out of a nursery where
|
"""Verify we raise a ``trio.MultiError`` out of a nursery where
|
||||||
|
@ -223,7 +222,7 @@ async def test_cancel_infinite_streamer(start_method):
|
||||||
# daemon complete quickly delay while single task
|
# daemon complete quickly delay while single task
|
||||||
# actors error after brief delay
|
# actors error after brief delay
|
||||||
(3, tractor.MultiError, AssertionError,
|
(3, tractor.MultiError, AssertionError,
|
||||||
(assert_err, {'delay': 1}), (do_nuthin, {'sleep': 0}, False)),
|
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
|
||||||
],
|
],
|
||||||
ids=[
|
ids=[
|
||||||
'1_run_in_actor_fails',
|
'1_run_in_actor_fails',
|
||||||
|
@ -326,7 +325,6 @@ async def spawn_and_error(breadth, depth) -> None:
|
||||||
)
|
)
|
||||||
kwargs = {
|
kwargs = {
|
||||||
'name': f'{name}_errorer_{i}',
|
'name': f'{name}_errorer_{i}',
|
||||||
# 'delay': 0.01,
|
|
||||||
}
|
}
|
||||||
await nursery.run_in_actor(*args, **kwargs)
|
await nursery.run_in_actor(*args, **kwargs)
|
||||||
|
|
||||||
|
@ -391,23 +389,13 @@ async def test_nested_multierrors(loglevel, start_method):
|
||||||
# on windows sometimes spawning is just too slow and
|
# on windows sometimes spawning is just too slow and
|
||||||
# we get back the (sent) cancel signal instead
|
# we get back the (sent) cancel signal instead
|
||||||
if platform.system() == 'Windows':
|
if platform.system() == 'Windows':
|
||||||
assert subexc.type in (
|
assert (subexc.type is trio.MultiError) or (
|
||||||
trio.MultiError,
|
subexc.type is tractor.RemoteActorError)
|
||||||
tractor.RemoteActorError,
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
assert subexc.type in (
|
assert subexc.type is trio.MultiError
|
||||||
trio.MultiError,
|
|
||||||
trio.Cancelled,
|
|
||||||
# tractor.RemoteActorError,
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
assert subexc.type in (
|
assert (subexc.type is tractor.RemoteActorError) or (
|
||||||
tractor.RemoteActorError,
|
subexc.type is trio.Cancelled)
|
||||||
trio.Cancelled,
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
pytest.fail(f'Got no error from nursery?')
|
pytest.fail(f'Got no error from nursery?')
|
||||||
|
|
||||||
|
|
|
@ -180,7 +180,6 @@ def test_multi_actor_subs_arbiter_pub(
|
||||||
'streamer',
|
'streamer',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
name = 'streamer'
|
|
||||||
|
|
||||||
even_portal = await n.run_in_actor(
|
even_portal = await n.run_in_actor(
|
||||||
subs,
|
subs,
|
||||||
|
|
|
@ -58,7 +58,7 @@ async def _invoke(
|
||||||
Invoke local func and deliver result(s) over provided channel.
|
Invoke local func and deliver result(s) over provided channel.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# __tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
treat_as_gen = False
|
treat_as_gen = False
|
||||||
|
|
||||||
# possible a traceback (not sure what typing is for this..)
|
# possible a traceback (not sure what typing is for this..)
|
||||||
|
@ -69,7 +69,6 @@ async def _invoke(
|
||||||
|
|
||||||
ctx = Context(chan, cid)
|
ctx = Context(chan, cid)
|
||||||
context: bool = False
|
context: bool = False
|
||||||
fname = func.__name__
|
|
||||||
|
|
||||||
if getattr(func, '_tractor_stream_function', False):
|
if getattr(func, '_tractor_stream_function', False):
|
||||||
# handle decorated ``@tractor.stream`` async functions
|
# handle decorated ``@tractor.stream`` async functions
|
||||||
|
@ -165,7 +164,6 @@ async def _invoke(
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
except trio.Cancelled as err:
|
except trio.Cancelled as err:
|
||||||
tb = err.__traceback__
|
tb = err.__traceback__
|
||||||
raise
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
|
|
||||||
|
@ -173,6 +171,7 @@ async def _invoke(
|
||||||
# so they can be unwrapped and displayed on the caller
|
# so they can be unwrapped and displayed on the caller
|
||||||
# side!
|
# side!
|
||||||
|
|
||||||
|
fname = func.__name__
|
||||||
if ctx._cancel_called:
|
if ctx._cancel_called:
|
||||||
msg = f'{fname} cancelled itself'
|
msg = f'{fname} cancelled itself'
|
||||||
|
|
||||||
|
@ -193,33 +192,9 @@ async def _invoke(
|
||||||
await chan.send({'functype': 'asyncfunc', 'cid': cid})
|
await chan.send({'functype': 'asyncfunc', 'cid': cid})
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
try:
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
|
||||||
except trio.Cancelled as err:
|
|
||||||
tb = err.__traceback__
|
|
||||||
raise
|
|
||||||
# await chan.send({'return': await coro, 'cid': cid})
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
except (Exception, trio.MultiError) as err:
|
||||||
# if cs.cancel_called:
|
|
||||||
if cs.cancel_called:
|
|
||||||
msg = (
|
|
||||||
f'{fname} was remotely cancelled by its caller '
|
|
||||||
f'{ctx.chan.uid}'
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
msg = f'{fname} cancelled itself'
|
|
||||||
|
|
||||||
raise ContextCancelled(
|
|
||||||
msg,
|
|
||||||
suberror_type=trio.Cancelled,
|
|
||||||
)
|
|
||||||
|
|
||||||
except (
|
|
||||||
Exception,
|
|
||||||
trio.MultiError,
|
|
||||||
# trio.Cancelled,
|
|
||||||
) as err:
|
|
||||||
|
|
||||||
if not is_multi_cancelled(err):
|
if not is_multi_cancelled(err):
|
||||||
|
|
||||||
|
@ -273,7 +248,7 @@ async def _invoke(
|
||||||
# If we're cancelled before the task returns then the
|
# If we're cancelled before the task returns then the
|
||||||
# cancel scope will not have been inserted yet
|
# cancel scope will not have been inserted yet
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Task {func} likely errored or cancelled before start")
|
f"Task {func} likely errored or cancelled before it started")
|
||||||
finally:
|
finally:
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
log.runtime("All RPC tasks have completed")
|
log.runtime("All RPC tasks have completed")
|
||||||
|
@ -383,14 +358,6 @@ class Actor:
|
||||||
Tuple[Any, Any, Any, Any, Any]] = None
|
Tuple[Any, Any, Any, Any, Any]] = None
|
||||||
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
|
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
|
||||||
|
|
||||||
@property
|
|
||||||
def cancel_called(self) -> bool:
|
|
||||||
'''
|
|
||||||
Same principle as ``trio.CancelScope.cancel_called``.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return self._cancel_called
|
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self, uid: Tuple[str, str]
|
self, uid: Tuple[str, str]
|
||||||
) -> Tuple[trio.Event, Channel]:
|
) -> Tuple[trio.Event, Channel]:
|
||||||
|
@ -576,8 +543,7 @@ class Actor:
|
||||||
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
|
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
|
||||||
assert send_chan.cid == cid # type: ignore
|
assert send_chan.cid == cid # type: ignore
|
||||||
|
|
||||||
if 'error' in msg:
|
# if 'error' in msg:
|
||||||
recv_chan
|
|
||||||
# ctx = getattr(recv_chan, '_ctx', None)
|
# ctx = getattr(recv_chan, '_ctx', None)
|
||||||
# if ctx:
|
# if ctx:
|
||||||
# ctx._error_from_remote_msg(msg)
|
# ctx._error_from_remote_msg(msg)
|
||||||
|
@ -652,7 +618,6 @@ class Actor:
|
||||||
# worked out we'll likely want to use that!
|
# worked out we'll likely want to use that!
|
||||||
msg = None
|
msg = None
|
||||||
nursery_cancelled_before_task: bool = False
|
nursery_cancelled_before_task: bool = False
|
||||||
uid = chan.uid
|
|
||||||
|
|
||||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||||
try:
|
try:
|
||||||
|
@ -676,7 +641,7 @@ class Actor:
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Msg loop signalled to terminate for"
|
f"Msg loop signalled to terminate for"
|
||||||
f" {chan} from {uid}")
|
f" {chan} from {chan.uid}")
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -711,46 +676,38 @@ class Actor:
|
||||||
f"{ns}.{funcname}({kwargs})")
|
f"{ns}.{funcname}({kwargs})")
|
||||||
if ns == 'self':
|
if ns == 'self':
|
||||||
func = getattr(self, funcname)
|
func = getattr(self, funcname)
|
||||||
|
|
||||||
if funcname == 'cancel':
|
if funcname == 'cancel':
|
||||||
# self.cancel() was called so kill this
|
|
||||||
# msg loop and break out into
|
|
||||||
# ``_async_main()``
|
|
||||||
|
|
||||||
log.cancel(
|
# don't start entire actor runtime cancellation if this
|
||||||
f"{self.uid} remote cancel msg from {uid}")
|
# actor is in debug mode
|
||||||
|
|
||||||
# don't start entire actor runtime
|
|
||||||
# cancellation if this actor is in debug
|
|
||||||
# mode
|
|
||||||
pdb_complete = _debug._local_pdb_complete
|
pdb_complete = _debug._local_pdb_complete
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
log.cancel(
|
|
||||||
f'{self.uid} is in debug, wait for unlock')
|
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
|
||||||
# we immediately start the runtime machinery
|
# we immediately start the runtime machinery shutdown
|
||||||
# shutdown
|
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await _invoke(
|
# self.cancel() was called so kill this msg loop
|
||||||
self, cid, chan, func, kwargs, is_rpc=False
|
# and break out into ``_async_main()``
|
||||||
)
|
log.cancel(
|
||||||
|
f"Actor {self.uid} was remotely cancelled; "
|
||||||
|
"waiting on cancellation completion..")
|
||||||
|
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
|
||||||
|
# await self._cancel_complete.wait()
|
||||||
|
|
||||||
loop_cs.cancel()
|
loop_cs.cancel()
|
||||||
continue
|
break
|
||||||
|
|
||||||
if funcname == '_cancel_task':
|
if funcname == '_cancel_task':
|
||||||
task_cid = kwargs['cid']
|
|
||||||
log.cancel(
|
|
||||||
f'Actor {uid} requests cancel for {task_cid}')
|
|
||||||
|
|
||||||
# we immediately start the runtime machinery
|
# we immediately start the runtime machinery shutdown
|
||||||
# shutdown
|
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
|
# self.cancel() was called so kill this msg loop
|
||||||
|
# and break out into ``_async_main()``
|
||||||
kwargs['chan'] = chan
|
kwargs['chan'] = chan
|
||||||
await _invoke(
|
log.cancel(
|
||||||
self, cid, chan, func, kwargs, is_rpc=False
|
f"Actor {self.uid} was remotely cancelled; "
|
||||||
)
|
"waiting on cancellation completion..")
|
||||||
|
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# complain to client about restricted modules
|
# complain to client about restricted modules
|
||||||
|
@ -795,8 +752,7 @@ class Actor:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
|
|
||||||
# end of async for, channel disconnect vis
|
# end of async for, channel disconnect vis ``trio.EndOfChannel``
|
||||||
# ``trio.EndOfChannel``
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||||
)
|
)
|
||||||
|
@ -1043,7 +999,7 @@ class Actor:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.runtime("root runtime nursery complete")
|
log.info("Runtime nursery complete")
|
||||||
|
|
||||||
# tear down all lifetime contexts if not in guest mode
|
# tear down all lifetime contexts if not in guest mode
|
||||||
# XXX: should this just be in the entrypoint?
|
# XXX: should this just be in the entrypoint?
|
||||||
|
@ -1237,10 +1193,7 @@ class Actor:
|
||||||
tasks = self._rpc_tasks
|
tasks = self._rpc_tasks
|
||||||
if tasks:
|
if tasks:
|
||||||
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||||
for (
|
for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
|
||||||
(chan, cid),
|
|
||||||
(scope, func, is_complete),
|
|
||||||
) in tasks.copy().items():
|
|
||||||
if only_chan is not None:
|
if only_chan is not None:
|
||||||
if only_chan != chan:
|
if only_chan != chan:
|
||||||
continue
|
continue
|
||||||
|
@ -1249,9 +1202,8 @@ class Actor:
|
||||||
if func != self._cancel_task:
|
if func != self._cancel_task:
|
||||||
await self._cancel_task(cid, chan)
|
await self._cancel_task(cid, chan)
|
||||||
|
|
||||||
if tasks:
|
log.cancel(
|
||||||
log.cancel(
|
f"Waiting for remaining rpc tasks to complete {tasks}")
|
||||||
f"Waiting for remaining rpc tasks to complete {tasks}")
|
|
||||||
await self._ongoing_rpc_tasks.wait()
|
await self._ongoing_rpc_tasks.wait()
|
||||||
|
|
||||||
def cancel_server(self) -> None:
|
def cancel_server(self) -> None:
|
||||||
|
|
|
@ -84,15 +84,6 @@ class Portal:
|
||||||
] = None
|
] = None
|
||||||
self._streams: Set[ReceiveMsgStream] = set()
|
self._streams: Set[ReceiveMsgStream] = set()
|
||||||
self.actor = current_actor()
|
self.actor = current_actor()
|
||||||
self._cancel_called: bool = False
|
|
||||||
|
|
||||||
@property
|
|
||||||
def cancel_called(self) -> bool:
|
|
||||||
'''
|
|
||||||
Same principle as ``trio.CancelScope.cancel_called``.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return self._cancel_called
|
|
||||||
|
|
||||||
async def _submit(
|
async def _submit(
|
||||||
self,
|
self,
|
||||||
|
@ -138,7 +129,7 @@ class Portal:
|
||||||
resptype: str,
|
resptype: str,
|
||||||
first_msg: dict
|
first_msg: dict
|
||||||
) -> Any:
|
) -> Any:
|
||||||
# __tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
assert resptype == 'asyncfunc' # single response
|
assert resptype == 'asyncfunc' # single response
|
||||||
|
|
||||||
msg = await recv_chan.receive()
|
msg = await recv_chan.receive()
|
||||||
|
@ -154,7 +145,7 @@ class Portal:
|
||||||
Return the result(s) from the remote actor's "main" task.
|
Return the result(s) from the remote actor's "main" task.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# __tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
# Check for non-rpc errors slapped on the
|
# Check for non-rpc errors slapped on the
|
||||||
# channel for which we always raise
|
# channel for which we always raise
|
||||||
exc = self.channel._exc
|
exc = self.channel._exc
|
||||||
|
@ -206,16 +197,9 @@ class Portal:
|
||||||
# we'll need to .aclose all those channels here
|
# we'll need to .aclose all those channels here
|
||||||
await self._cancel_streams()
|
await self._cancel_streams()
|
||||||
|
|
||||||
async def cancel_actor(self) -> None:
|
async def cancel_actor(self):
|
||||||
'''
|
"""Cancel the actor on the other end of this portal.
|
||||||
Cancel the actor on the other end of this portal.
|
"""
|
||||||
|
|
||||||
That means cancelling the "actor runtime" not just any one
|
|
||||||
task that's running there.
|
|
||||||
|
|
||||||
'''
|
|
||||||
self._cancel_called = True
|
|
||||||
|
|
||||||
if not self.channel.connected():
|
if not self.channel.connected():
|
||||||
log.cancel("This portal is already closed can't cancel")
|
log.cancel("This portal is already closed can't cancel")
|
||||||
return False
|
return False
|
||||||
|
@ -223,8 +207,8 @@ class Portal:
|
||||||
await self._cancel_streams()
|
await self._cancel_streams()
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Sending runtime cancel msg to {self.channel.uid} @ "
|
f"Sending actor cancel request to {self.channel.uid} on "
|
||||||
f"{self.channel.raddr}")
|
f"{self.channel}")
|
||||||
try:
|
try:
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
# XXX: sure would be nice to make this work with a proper shield
|
# XXX: sure would be nice to make this work with a proper shield
|
||||||
|
|
|
@ -31,12 +31,8 @@ from .log import get_logger
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._entry import _mp_main
|
from ._entry import _mp_main
|
||||||
from ._exceptions import (
|
from ._exceptions import ActorFailure, RemoteActorError
|
||||||
ActorFailure,
|
from ._debug import maybe_wait_for_debugger
|
||||||
RemoteActorError,
|
|
||||||
ContextCancelled,
|
|
||||||
)
|
|
||||||
from ._debug import maybe_wait_for_debugger, breakpoint
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
@ -96,7 +92,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
|
||||||
|
|
||||||
|
|
||||||
async def result_from_portal(
|
async def result_from_portal(
|
||||||
|
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
|
|
||||||
|
@ -104,7 +99,7 @@ async def result_from_portal(
|
||||||
cancel_on_result: bool = False,
|
cancel_on_result: bool = False,
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> tuple[Optional[Any], Optional[BaseException]]:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Cancel actor gracefully once it's "main" portal's
|
Cancel actor gracefully once it's "main" portal's
|
||||||
result arrives.
|
result arrives.
|
||||||
|
@ -112,11 +107,9 @@ async def result_from_portal(
|
||||||
Should only be called for actors spawned with `run_in_actor()`.
|
Should only be called for actors spawned with `run_in_actor()`.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# __tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
|
|
||||||
uid = portal.channel.uid
|
uid = portal.channel.uid
|
||||||
remote_result = None
|
|
||||||
is_remote_result = None
|
|
||||||
|
|
||||||
# cancel control is explicityl done by the caller
|
# cancel control is explicityl done by the caller
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
|
@ -127,42 +120,50 @@ async def result_from_portal(
|
||||||
# a MultiError and we still send out a cancel request
|
# a MultiError and we still send out a cancel request
|
||||||
# result = await exhaust_portal(portal, actor)
|
# result = await exhaust_portal(portal, actor)
|
||||||
try:
|
try:
|
||||||
log.info(f"Waiting on final result from {actor.uid}")
|
log.debug(f"Waiting on final result from {actor.uid}")
|
||||||
|
|
||||||
# XXX: streams should never be reaped here since they should
|
# XXX: streams should never be reaped here since they should
|
||||||
# always be established and shutdown using a context manager api
|
# always be established and shutdown using a context manager api
|
||||||
result = await portal.result()
|
result = await portal.result()
|
||||||
is_remote_result = True
|
log.debug(f"Returning final result: {result}")
|
||||||
log.info(f"Returning final result: {result}")
|
|
||||||
|
|
||||||
except RemoteActorError as rerr:
|
|
||||||
# this includes real remote errors as well as
|
|
||||||
# `ContextCancelled`
|
|
||||||
is_remote_result = True
|
|
||||||
result = rerr
|
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# we reraise in the parent task via a ``trio.MultiError``
|
# we reraise in the parent task via a ``trio.MultiError``
|
||||||
is_remote_result = False
|
result = err
|
||||||
|
errors[actor.uid] = err
|
||||||
|
raise
|
||||||
|
|
||||||
|
except trio.Cancelled as err:
|
||||||
|
# lol, of course we need this too ;P
|
||||||
|
# TODO: merge with above?
|
||||||
|
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
|
||||||
result = err
|
result = err
|
||||||
# errors[actor.uid] = err
|
# errors[actor.uid] = err
|
||||||
# raise
|
# raise
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cancel_on_result:
|
||||||
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
|
if isinstance(result, Exception):
|
||||||
|
# errors[actor.uid] = result
|
||||||
|
log.warning(
|
||||||
|
f"Cancelling single-task-run {uid} after error {result}"
|
||||||
|
)
|
||||||
|
# raise result
|
||||||
|
|
||||||
return result, is_remote_result
|
else:
|
||||||
|
log.runtime(
|
||||||
|
f"Cancelling {uid} gracefully "
|
||||||
|
f"after one-time-task result {result}")
|
||||||
|
|
||||||
# except trio.Cancelled as err:
|
# an actor that was `.run_in_actor()` executes a single task
|
||||||
# # lol, of course we need this too ;P
|
# and delivers the result, then we cancel it.
|
||||||
# # TODO: merge with above?
|
# TODO: likely in the future we should just implement this using
|
||||||
# log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
|
# the new `open_context()` IPC api, since it's the more general
|
||||||
# result = err
|
# api and can represent this form.
|
||||||
# # errors[actor.uid] = err
|
# XXX: do we need this?
|
||||||
# raise
|
# await maybe_wait_for_debugger()
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
return result
|
||||||
# return result
|
|
||||||
|
|
||||||
|
|
||||||
async def do_hard_kill(
|
async def do_hard_kill(
|
||||||
|
@ -203,18 +204,16 @@ async def do_hard_kill(
|
||||||
async def reap_proc(
|
async def reap_proc(
|
||||||
|
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
uid: tuple[str, str],
|
terminate_after: float = float('inf'),
|
||||||
terminate_after: Optional[float] = None,
|
|
||||||
hard_kill_after: int = 0.1,
|
hard_kill_after: int = 0.1,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
with trio.move_on_after(terminate_after or float('inf')) as cs:
|
with trio.move_on_after(terminate_after) as cs:
|
||||||
# Wait for proc termination but **dont' yet** do
|
# Wait for proc termination but **dont' yet** do
|
||||||
# any out-of-ipc-land termination / process
|
# any out-of-ipc-land termination / process
|
||||||
# killing. This is a "light" (cancellable) join,
|
# killing. This is a "light" (cancellable) join,
|
||||||
# the hard join is below after timeout
|
# the hard join is below after timeout
|
||||||
await proc.wait()
|
await proc.wait()
|
||||||
log.info(f'Proc for {uid} terminated gracefully')
|
|
||||||
|
|
||||||
if cs.cancelled_caught and terminate_after is not float('inf'):
|
if cs.cancelled_caught and terminate_after is not float('inf'):
|
||||||
# Always "hard" join lingering sub procs since no
|
# Always "hard" join lingering sub procs since no
|
||||||
|
@ -243,7 +242,6 @@ async def new_proc(
|
||||||
bind_addr: Tuple[str, int],
|
bind_addr: Tuple[str, int],
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
|
||||||
graceful_kill_timeout: int = 3,
|
graceful_kill_timeout: int = 3,
|
||||||
|
@ -353,7 +351,6 @@ async def new_proc(
|
||||||
) as cerr:
|
) as cerr:
|
||||||
|
|
||||||
log.exception(f'Relaying unexpected {cerr} to nursery')
|
log.exception(f'Relaying unexpected {cerr} to nursery')
|
||||||
await breakpoint()
|
|
||||||
|
|
||||||
# sending IPC-msg level cancel requests is expected to be
|
# sending IPC-msg level cancel requests is expected to be
|
||||||
# managed by the nursery.
|
# managed by the nursery.
|
||||||
|
@ -370,230 +367,149 @@ async def new_proc(
|
||||||
# True, # cancel_on_result
|
# True, # cancel_on_result
|
||||||
)
|
)
|
||||||
|
|
||||||
# Graceful reap attempt - 2 cases:
|
|
||||||
# - actor nursery was cancelled in which case
|
|
||||||
# we want to try a soft reap of the actor via
|
|
||||||
# ipc cancellation and then failing that do a hard
|
|
||||||
# reap.
|
|
||||||
# - this is normal termination and we must wait indefinitely
|
|
||||||
# for ria to return and daemon actors to be cancelled
|
|
||||||
reaping_cancelled: bool = False
|
|
||||||
ria = portal in actor_nursery._cancel_after_result_on_exit
|
|
||||||
result = None
|
|
||||||
|
|
||||||
# this is the soft reap sequence. we can
|
|
||||||
# either collect results:
|
|
||||||
# - ria actors get them them via ``Portal.result()``
|
|
||||||
# - we wait forever on daemon actors until they're
|
|
||||||
# cancelled by user code via ``Portal.cancel_actor()``
|
|
||||||
# or ``ActorNursery.cancel(). in the latter case
|
|
||||||
# we have to expect another cancel here since
|
|
||||||
# the task spawning nurseries will both be cacelled
|
|
||||||
# by ``ActorNursery.cancel()``.
|
|
||||||
|
|
||||||
# OR, we're cancelled while collecting results, which
|
|
||||||
# case we need to try another soft cancel and reap attempt.
|
|
||||||
try:
|
|
||||||
log.cancel(f'Starting soft actor reap for {uid}')
|
|
||||||
cancel_scope = None
|
|
||||||
reap_timeout = None
|
|
||||||
|
|
||||||
if portal.channel.connected() and ria:
|
|
||||||
|
|
||||||
result, is_remote = await result_from_portal(
|
|
||||||
portal,
|
|
||||||
subactor,
|
|
||||||
errors,
|
|
||||||
# True, # cancel_on_result
|
|
||||||
)
|
|
||||||
if is_remote:
|
|
||||||
if isinstance(result, RemoteActorError):
|
|
||||||
# errors[actor.uid] = result
|
|
||||||
if (
|
|
||||||
portal.cancel_called and
|
|
||||||
isinstance(result, ContextCancelled)
|
|
||||||
):
|
|
||||||
log.cancel(f'{uid} received expected cancel')
|
|
||||||
errors[uid] = result
|
|
||||||
|
|
||||||
# fall through to below soft proc reap
|
|
||||||
reap_timeout = 0.5
|
|
||||||
|
|
||||||
else:
|
|
||||||
log.warning(
|
|
||||||
f"Cancelling single-task-run {uid} after remote error {result}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# likely a real remote error propagation
|
|
||||||
# so pass up to nursery strat
|
|
||||||
should_raise = await actor_nursery._handle_err(
|
|
||||||
result,
|
|
||||||
portal=portal,
|
|
||||||
)
|
|
||||||
|
|
||||||
# propagate up to spawn nursery to be
|
|
||||||
# grouped into any multierror.
|
|
||||||
# if should_raise:
|
|
||||||
# raise result
|
|
||||||
|
|
||||||
else:
|
|
||||||
log.runtime(
|
|
||||||
f"Cancelling {uid} gracefully "
|
|
||||||
f"after one-time-task result {result}")
|
|
||||||
|
|
||||||
# an actor that was `.run_in_actor()` executes a single task
|
|
||||||
# and delivers the result, then we cancel it.
|
|
||||||
# TODO: likely in the future we should just implement this using
|
|
||||||
# the new `open_context()` IPC api, since it's the more general
|
|
||||||
# api and can represent this form.
|
|
||||||
# XXX: do we need this?
|
|
||||||
# await maybe_wait_for_debugger()
|
|
||||||
await portal.cancel_actor()
|
|
||||||
|
|
||||||
else:
|
|
||||||
log.exception(
|
|
||||||
f"Cancelling single-task-run {uid} after local error"
|
|
||||||
)
|
|
||||||
raise result
|
|
||||||
|
|
||||||
# soft & cancellable
|
|
||||||
await reap_proc(proc, uid, terminate_after=reap_timeout)
|
|
||||||
|
|
||||||
# except (
|
|
||||||
# ContextCancelled,
|
|
||||||
# ) as err:
|
|
||||||
# if portal.cancel_called:
|
|
||||||
# log.cancel('{uid} received expected cancel')
|
|
||||||
|
|
||||||
# # soft & cancellable
|
|
||||||
# await reap_proc(proc, uid, terminate_after=0.1)
|
|
||||||
|
|
||||||
# except (
|
|
||||||
# RemoteActorError,
|
|
||||||
# ) as err:
|
|
||||||
# reaping_cancelled = err
|
|
||||||
# log.exception(f'{uid} remote error')
|
|
||||||
# await actor_nursery._handle_err(err, portal=portal)
|
|
||||||
|
|
||||||
except (
|
|
||||||
trio.Cancelled,
|
|
||||||
) as err:
|
|
||||||
|
|
||||||
# NOTE: for now we pack the cancelleds and expect the actor
|
|
||||||
# nursery to re-raise them in a multierror but we could
|
|
||||||
# have also let them bubble up through the spawn nursery.
|
|
||||||
|
|
||||||
# in theory it's more correct to raise any
|
|
||||||
# `ContextCancelled` errors we get back from the
|
|
||||||
# `Portal.cancel_actor()` call and in that error
|
|
||||||
# have meta-data about whether we timeout out or
|
|
||||||
# actually got a cancel message back from the remote task.
|
|
||||||
|
|
||||||
# IF INSTEAD we raised *here* then this logic has to be
|
|
||||||
# handled inside the oca supervisor block and the spawn_n
|
|
||||||
# task cancelleds would have to be replaced with the remote
|
|
||||||
# task `ContextCancelled`s, *if* they ever arrive.
|
|
||||||
errors[uid] = err
|
|
||||||
# with trio.CancelScope(shield=True):
|
|
||||||
# await breakpoint()
|
|
||||||
|
|
||||||
if actor_nursery.cancel_called:
|
|
||||||
log.cancel(f'{uid} soft reap cancelled by nursery')
|
|
||||||
else:
|
|
||||||
if not actor_nursery._spawn_n.cancel_scope.cancel_called:
|
|
||||||
# this would be pretty weird and unexpected
|
|
||||||
await breakpoint()
|
|
||||||
|
|
||||||
# actor nursery wasn't cancelled before the spawn
|
|
||||||
# nursery was which likely means that there was
|
|
||||||
# an error in the actor nursery enter and the
|
|
||||||
# spawn nursery cancellation "beat" the call to
|
|
||||||
# .cancel()? that's a bug right?
|
|
||||||
|
|
||||||
# saw this with settings bugs in the ordermode pane in
|
|
||||||
# piker.
|
|
||||||
log.exception(f'{uid} soft wait error?')
|
|
||||||
raise RuntimeError(
|
|
||||||
'Task spawn nursery cancelled before actor nursery?')
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if reaping_cancelled:
|
|
||||||
assert actor_nursery.cancel_called
|
|
||||||
if actor_nursery.cancelled:
|
|
||||||
log.cancel(f'Nursery cancelled during soft wait for {uid}')
|
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await maybe_wait_for_debugger()
|
|
||||||
|
|
||||||
# XXX: we should probably just
|
|
||||||
# check for a `ContextCancelled` on portals
|
|
||||||
# here and fill them in over `trio.Cancelled` right?
|
|
||||||
|
|
||||||
# hard reap sequence with timeouts
|
|
||||||
if proc.poll() is None:
|
|
||||||
log.cancel(f'Attempting hard reap for {uid}')
|
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
|
|
||||||
# hard reap sequence
|
|
||||||
# ``Portal.cancel_actor()`` is expected to have
|
|
||||||
# been called by the supervising nursery so we
|
|
||||||
# do **not** call it here.
|
|
||||||
|
|
||||||
await reap_proc(
|
|
||||||
proc,
|
|
||||||
uid,
|
|
||||||
# this is the same as previous timeout
|
|
||||||
# setting before rewriting this spawn
|
|
||||||
# section
|
|
||||||
terminate_after=3,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# if somehow the hard reap didn't collect the child then
|
|
||||||
# we send in the big gunz.
|
|
||||||
while proc.poll() is None:
|
|
||||||
log.critical(
|
|
||||||
f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n'
|
|
||||||
f'{proc}'
|
|
||||||
)
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await reap_proc(
|
|
||||||
proc,
|
|
||||||
uid,
|
|
||||||
terminate_after=0.1,
|
|
||||||
)
|
|
||||||
|
|
||||||
log.info(f"Joined {proc}")
|
|
||||||
|
|
||||||
# 2 cases:
|
# 2 cases:
|
||||||
# - the actor terminated gracefully
|
# - actor nursery was cancelled in which case
|
||||||
# - we're cancelled and likely need to re-raise
|
# we want to try a soft reap of the actor via
|
||||||
|
# ipc cancellation and then failing that do a hard
|
||||||
|
# reap.
|
||||||
|
# - this is normal termination and we must wait indefinitely
|
||||||
|
# for ria and daemon actors
|
||||||
|
reaping_cancelled: bool = False
|
||||||
|
ria = portal in actor_nursery._cancel_after_result_on_exit
|
||||||
|
|
||||||
# pop child entry to indicate we no longer managing this
|
# this is the soft reap sequence. we can
|
||||||
# subactor
|
# either collect results:
|
||||||
subactor, proc, portal = actor_nursery._children.pop(
|
# - ria actors get them them via ``Portal.result()``
|
||||||
subactor.uid)
|
# - we wait forever on daemon actors until they're
|
||||||
|
# cancelled by user code via ``Portal.cancel_actor()``
|
||||||
|
# or ``ActorNursery.cancel(). in the latter case
|
||||||
|
# we have to expect another cancel here since
|
||||||
|
# the task spawning nurseries will both be cacelled
|
||||||
|
# by ``ActorNursery.cancel()``.
|
||||||
|
|
||||||
if not actor_nursery._children:
|
# OR, we're cancelled while collecting results, which
|
||||||
# all subactor children have completed
|
# case we need to try another soft cancel and reap attempt.
|
||||||
log.cancel(f"{uid} reports all children complete!")
|
try:
|
||||||
|
log.cancel(f'Starting soft actor reap for {uid}')
|
||||||
|
cancel_scope = None
|
||||||
|
# async with trio.open_nursery() as nursery:
|
||||||
|
|
||||||
actor_nursery._all_children_reaped.set()
|
if portal.channel.connected() and ria:
|
||||||
|
|
||||||
spawn_n = actor_nursery._spawn_n
|
# we wait for result and cancel on completion
|
||||||
# with trio.CancelScope(shield=True):
|
await result_from_portal(
|
||||||
# await breakpoint()
|
portal,
|
||||||
if not spawn_n._closed:
|
subactor,
|
||||||
# the parent task that opened the actor nursery
|
errors,
|
||||||
# hasn't yet closed it so we cancel that task now.
|
True, # cancel_on_result
|
||||||
spawn_n.cancel_scope.cancel()
|
)
|
||||||
|
# # collect any expected ``.run_in_actor()`` results
|
||||||
|
# cancel_scope = await nursery.start(
|
||||||
|
# result_from_portal,
|
||||||
|
# portal,
|
||||||
|
# subactor,
|
||||||
|
# errors,
|
||||||
|
# True, # cancel_on_result
|
||||||
|
# )
|
||||||
|
|
||||||
# not entirely sure why we need this.. but without it
|
# soft & cancellable
|
||||||
# the reaping cancelled error is never reported upwards
|
await reap_proc(proc)
|
||||||
# to the spawn nursery?
|
|
||||||
# if reaping_cancelled:
|
# # if proc terminates before portal result
|
||||||
# raise reaping_cancelled
|
# if cancel_scope:
|
||||||
|
# cancel_scope.cancel()
|
||||||
|
|
||||||
|
except (
|
||||||
|
RemoteActorError,
|
||||||
|
) as err:
|
||||||
|
reaping_cancelled = err
|
||||||
|
log.exception(f'{uid} remote error')
|
||||||
|
await actor_nursery._handle_err(err, portal=portal)
|
||||||
|
|
||||||
|
except (
|
||||||
|
trio.Cancelled,
|
||||||
|
) as err:
|
||||||
|
reaping_cancelled = err
|
||||||
|
if actor_nursery.cancelled:
|
||||||
|
log.cancel(f'{uid} wait cancelled by nursery')
|
||||||
|
else:
|
||||||
|
log.exception(f'{uid} soft wait error?')
|
||||||
|
|
||||||
|
except (
|
||||||
|
BaseException
|
||||||
|
) as err:
|
||||||
|
reaping_cancelled = err
|
||||||
|
log.exception(f'{uid} soft reap local error')
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if reaping_cancelled:
|
||||||
|
if actor_nursery.cancelled:
|
||||||
|
log.cancel(f'Nursery cancelled during soft wait for {uid}')
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await maybe_wait_for_debugger()
|
||||||
|
|
||||||
|
# XXX: can't do this, it'll hang some tests.. no
|
||||||
|
# idea why yet.
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await actor_nursery._handle_err(
|
||||||
|
# reaping_cancelled,
|
||||||
|
# portal=portal
|
||||||
|
# )
|
||||||
|
|
||||||
|
# hard reap sequence with timeouts
|
||||||
|
if proc.poll() is None:
|
||||||
|
log.cancel(f'Attempting hard reap for {uid}')
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
|
# hard reap sequence
|
||||||
|
# ``Portal.cancel_actor()`` is expected to have
|
||||||
|
# been called by the supervising nursery so we
|
||||||
|
# do **not** call it here.
|
||||||
|
|
||||||
|
await reap_proc(
|
||||||
|
proc,
|
||||||
|
# this is the same as previous timeout
|
||||||
|
# setting before rewriting this spawn
|
||||||
|
# section
|
||||||
|
terminate_after=3,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# if somehow the hard reap didn't collect the child then
|
||||||
|
# we send in the big gunz.
|
||||||
|
while proc.poll() is None:
|
||||||
|
log.critical(
|
||||||
|
f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n'
|
||||||
|
f'{proc}'
|
||||||
|
)
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await reap_proc(
|
||||||
|
proc,
|
||||||
|
terminate_after=0.1,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(f"Joined {proc}")
|
||||||
|
|
||||||
|
# 2 cases:
|
||||||
|
# - the actor terminated gracefully
|
||||||
|
# - we're cancelled and likely need to re-raise
|
||||||
|
|
||||||
|
# pop child entry to indicate we no longer managing this
|
||||||
|
# subactor
|
||||||
|
subactor, proc, portal = actor_nursery._children.pop(
|
||||||
|
subactor.uid)
|
||||||
|
if not actor_nursery._children:
|
||||||
|
log.cancel(f"{uid} reports all children complete!")
|
||||||
|
actor_nursery._all_children_reaped.set()
|
||||||
|
|
||||||
|
# not entirely sure why we need this.. but without it
|
||||||
|
# the reaping cancelled error is never reported upwards
|
||||||
|
# to the spawn nursery?
|
||||||
|
if reaping_cancelled:
|
||||||
|
raise reaping_cancelled
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# `multiprocessing`
|
# `multiprocessing`
|
||||||
|
@ -710,8 +626,7 @@ async def mp_new_proc(
|
||||||
|
|
||||||
# no shield is required here (vs. above on the trio backend)
|
# no shield is required here (vs. above on the trio backend)
|
||||||
# since debug mode is not supported on mp.
|
# since debug mode is not supported on mp.
|
||||||
with trio.CancelScope(shield=True):
|
await actor_nursery._join_procs.wait()
|
||||||
await actor_nursery._join_procs.wait()
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# XXX: in the case we were cancelled before the sub-proc
|
# XXX: in the case we were cancelled before the sub-proc
|
||||||
|
@ -726,23 +641,13 @@ async def mp_new_proc(
|
||||||
try:
|
try:
|
||||||
# async with trio.open_nursery() as n:
|
# async with trio.open_nursery() as n:
|
||||||
# n.cancel_scope.shield = True
|
# n.cancel_scope.shield = True
|
||||||
print('soft mp reap')
|
cancel_scope = await nursery.start(
|
||||||
# cancel_scope = await nursery.start(
|
result_from_portal,
|
||||||
result = await result_from_portal(
|
|
||||||
portal,
|
portal,
|
||||||
subactor,
|
subactor,
|
||||||
errors,
|
errors
|
||||||
# True,
|
|
||||||
)
|
)
|
||||||
|
except trio.Cancelled as err:
|
||||||
# except trio.Cancelled as err:
|
|
||||||
except BaseException as err:
|
|
||||||
|
|
||||||
log.exception('hard mp reap')
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await actor_nursery._handle_err(err, portal=portal)
|
|
||||||
print('sent to nursery')
|
|
||||||
|
|
||||||
cancel_exc = err
|
cancel_exc = err
|
||||||
|
|
||||||
# if the reaping task was cancelled we may have hit
|
# if the reaping task was cancelled we may have hit
|
||||||
|
@ -752,43 +657,31 @@ async def mp_new_proc(
|
||||||
reaping_cancelled = True
|
reaping_cancelled = True
|
||||||
|
|
||||||
if proc.is_alive():
|
if proc.is_alive():
|
||||||
with trio.CancelScope(shield=True):
|
with trio.move_on_after(0.1) as cs:
|
||||||
print('hard reaping')
|
cs.shield = True
|
||||||
with trio.move_on_after(0.1) as cs:
|
await proc_waiter(proc)
|
||||||
cs.shield = True
|
|
||||||
await proc_waiter(proc)
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
print('pwning mp proc')
|
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
finally:
|
|
||||||
|
|
||||||
# if not reaping_cancelled and proc.is_alive():
|
if not reaping_cancelled and proc.is_alive():
|
||||||
# await proc_waiter(proc)
|
await proc_waiter(proc)
|
||||||
|
|
||||||
# TODO: timeout block here?
|
# TODO: timeout block here?
|
||||||
proc.join()
|
proc.join()
|
||||||
|
|
||||||
log.debug(f"Joined {proc}")
|
log.debug(f"Joined {proc}")
|
||||||
|
# pop child entry to indicate we are no longer managing subactor
|
||||||
|
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
|
||||||
|
|
||||||
# pop child entry to indicate we are no longer managing subactor
|
# cancel result waiter that may have been spawned in
|
||||||
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
|
# tandem if not done already
|
||||||
|
if cancel_scope:
|
||||||
|
log.warning(
|
||||||
|
"Cancelling existing result waiter task for "
|
||||||
|
f"{subactor.uid}")
|
||||||
|
cancel_scope.cancel()
|
||||||
|
|
||||||
if not actor_nursery._children:
|
elif reaping_cancelled: # let the cancellation bubble up
|
||||||
# all subactor children have completed
|
assert cancel_exc
|
||||||
# log.cancel(f"{uid} reports all children complete!")
|
raise cancel_exc
|
||||||
actor_nursery._all_children_reaped.set()
|
|
||||||
|
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
|
||||||
# tandem if not done already
|
|
||||||
if cancel_scope:
|
|
||||||
log.warning(
|
|
||||||
"Cancelling existing result waiter task for "
|
|
||||||
f"{subactor.uid}")
|
|
||||||
cancel_scope.cancel()
|
|
||||||
|
|
||||||
if reaping_cancelled: # let the cancellation bubble up
|
|
||||||
print('raising')
|
|
||||||
assert cancel_exc
|
|
||||||
raise cancel_exc
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._debug import maybe_wait_for_debugger, breakpoint
|
from ._debug import maybe_wait_for_debugger
|
||||||
from ._state import current_actor, is_main_process, is_root_process
|
from ._state import current_actor, is_main_process, is_root_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
|
@ -48,19 +48,10 @@ class ActorNursery:
|
||||||
# cancelled when their "main" result arrives
|
# cancelled when their "main" result arrives
|
||||||
self._cancel_after_result_on_exit: set = set()
|
self._cancel_after_result_on_exit: set = set()
|
||||||
self.cancelled: bool = False
|
self.cancelled: bool = False
|
||||||
self._cancel_called: bool = False
|
|
||||||
self._join_procs = trio.Event()
|
self._join_procs = trio.Event()
|
||||||
self._all_children_reaped = trio.Event()
|
self._all_children_reaped = trio.Event()
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
|
|
||||||
@property
|
|
||||||
def cancel_called(self) -> bool:
|
|
||||||
'''
|
|
||||||
Same principle as ``trio.CancelScope.cancel_called``.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return self._cancel_called
|
|
||||||
|
|
||||||
async def start_actor(
|
async def start_actor(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -186,22 +177,17 @@ class ActorNursery:
|
||||||
|
|
||||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
If ``hard_killl`` is set to ``True`` then kill the processes
|
||||||
directly without any far end graceful ``trio`` cancellation.
|
directly without any far end graceful ``trio`` cancellation.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
self.cancelled = True
|
||||||
|
|
||||||
# entries may be poppsed by the spawning backend as
|
# entries may be poppsed by the spawning backend as
|
||||||
# actors cancel individually
|
# actors cancel individually
|
||||||
childs = self._children.copy()
|
childs = self._children.copy()
|
||||||
|
|
||||||
if self.cancel_called:
|
|
||||||
log.warning(
|
|
||||||
f'Nursery with children {len(childs)} already cancelled')
|
|
||||||
return
|
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Cancelling nursery in {self._actor.uid} with children\n'
|
f'Cancelling nursery in {self._actor.uid} with children\n'
|
||||||
f'{childs.keys()}'
|
f'{childs.keys()}'
|
||||||
)
|
)
|
||||||
self._cancel_called = True
|
|
||||||
|
|
||||||
# wake up all spawn tasks to move on as those nursery
|
# wake up all spawn tasks to move on as those nursery
|
||||||
# has ``__aexit__()``-ed
|
# has ``__aexit__()``-ed
|
||||||
|
@ -210,69 +196,43 @@ class ActorNursery:
|
||||||
await maybe_wait_for_debugger()
|
await maybe_wait_for_debugger()
|
||||||
|
|
||||||
# one-cancels-all strat
|
# one-cancels-all strat
|
||||||
try:
|
async with trio.open_nursery() as cancel_sender:
|
||||||
async with trio.open_nursery() as cancel_sender:
|
for subactor, proc, portal in childs.values():
|
||||||
for subactor, proc, portal in childs.values():
|
cancel_sender.start_soon(portal.cancel_actor)
|
||||||
if not portal.cancel_called and portal.channel.connected():
|
|
||||||
cancel_sender.start_soon(portal.cancel_actor)
|
|
||||||
|
|
||||||
except trio.MultiError as err:
|
|
||||||
_err = err
|
|
||||||
log.exception(f'{self} errors during cancel')
|
|
||||||
# await breakpoint()
|
|
||||||
# # LOL, ok so multiprocessing requires this for some reason..
|
|
||||||
# with trio.CancelScope(shield=True):
|
|
||||||
# await trio.lowlevel.checkpoint()
|
|
||||||
|
|
||||||
# cancel all spawner tasks
|
# cancel all spawner tasks
|
||||||
# self._spawn_n.cancel_scope.cancel()
|
# self._spawn_n.cancel_scope.cancel()
|
||||||
self.cancelled = True
|
|
||||||
|
|
||||||
async def _handle_err(
|
async def _handle_err(
|
||||||
self,
|
self,
|
||||||
err: BaseException,
|
err: BaseException,
|
||||||
portal: Optional[Portal] = None,
|
portal: Optional[Portal] = None,
|
||||||
is_ctx_error: bool = False,
|
|
||||||
|
|
||||||
) -> bool:
|
) -> None:
|
||||||
# XXX: hypothetically an error could be
|
# XXX: hypothetically an error could be
|
||||||
# raised and then a cancel signal shows up
|
# raised and then a cancel signal shows up
|
||||||
# slightly after in which case the `else:`
|
# slightly after in which case the `else:`
|
||||||
# block here might not complete? For now,
|
# block here might not complete? For now,
|
||||||
# shield both.
|
# shield both.
|
||||||
if is_ctx_error:
|
with trio.CancelScope(shield=True):
|
||||||
assert not portal
|
etype = type(err)
|
||||||
uid = self._actor.uid
|
|
||||||
else:
|
|
||||||
uid = portal.channel.uid
|
|
||||||
|
|
||||||
if err not in self.errors.values():
|
if etype in (
|
||||||
self.errors[uid] = err
|
trio.Cancelled,
|
||||||
|
KeyboardInterrupt
|
||||||
|
) or (
|
||||||
|
is_multi_cancelled(err)
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
f"Nursery for {current_actor().uid} "
|
||||||
|
f"was cancelled with {etype}")
|
||||||
|
else:
|
||||||
|
log.exception(
|
||||||
|
f"Nursery for {current_actor().uid} "
|
||||||
|
f"errored with {err}, ")
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
# cancel all subactors
|
||||||
etype = type(err)
|
await self.cancel()
|
||||||
|
|
||||||
if etype in (
|
|
||||||
trio.Cancelled,
|
|
||||||
KeyboardInterrupt
|
|
||||||
) or (
|
|
||||||
is_multi_cancelled(err)
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
f"Nursery for {current_actor().uid} "
|
|
||||||
f"was cancelled with {etype}")
|
|
||||||
else:
|
|
||||||
log.error(
|
|
||||||
f"Nursery for {current_actor().uid} "
|
|
||||||
f"errored from {uid} with\n{err}")
|
|
||||||
|
|
||||||
# cancel all subactors
|
|
||||||
await self.cancel()
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
log.warning(f'Skipping duplicate error for {uid}')
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
@ -290,7 +250,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# actors spawned in "daemon mode" (aka started using
|
# actors spawned in "daemon mode" (aka started using
|
||||||
# ``ActorNursery.start_actor()``).
|
# ``ActorNursery.start_actor()``).
|
||||||
src_err: Optional[BaseException] = None
|
src_err: Optional[BaseException] = None
|
||||||
nurse_err: Optional[BaseException] = None
|
|
||||||
|
|
||||||
# errors from this daemon actor nursery bubble up to caller
|
# errors from this daemon actor nursery bubble up to caller
|
||||||
try:
|
try:
|
||||||
|
@ -339,20 +298,14 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# same as a user code failure.
|
# same as a user code failure.
|
||||||
|
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
print('ERROR')
|
||||||
# anursery._join_procs.set()
|
# anursery._join_procs.set()
|
||||||
src_err = err
|
src_err = err
|
||||||
|
|
||||||
# with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
should_raise = await anursery._handle_err(err, is_ctx_error=True)
|
await anursery._handle_err(err)
|
||||||
|
raise
|
||||||
|
|
||||||
# XXX: raising here causes some cancellation
|
|
||||||
# / multierror tests to fail because of what appears to
|
|
||||||
# be double raise? we probably need to see how `trio`
|
|
||||||
# does this case..
|
|
||||||
if should_raise:
|
|
||||||
raise
|
|
||||||
|
|
||||||
# except trio.MultiError as err:
|
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
# nursery bubble up
|
# nursery bubble up
|
||||||
nurse_err = err
|
nurse_err = err
|
||||||
|
@ -378,15 +331,15 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# collected in ``errors`` so cancel all actors, summarize
|
# collected in ``errors`` so cancel all actors, summarize
|
||||||
# all errors and re-raise.
|
# all errors and re-raise.
|
||||||
|
|
||||||
# await breakpoint()
|
if src_err and src_err not in errors.values():
|
||||||
|
errors[actor.uid] = src_err
|
||||||
|
|
||||||
if errors:
|
if errors:
|
||||||
# if nurse_err or src_err:
|
|
||||||
if anursery._children:
|
if anursery._children:
|
||||||
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
|
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
|
||||||
# with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
# await anursery.cancel()
|
# await anursery.cancel()
|
||||||
|
|
||||||
|
|
||||||
# use `MultiError` as needed
|
# use `MultiError` as needed
|
||||||
if len(errors) > 1:
|
if len(errors) > 1:
|
||||||
raise trio.MultiError(tuple(errors.values()))
|
raise trio.MultiError(tuple(errors.values()))
|
||||||
|
|
Loading…
Reference in New Issue