forked from goodboy/tractor
Merge pull request #91 from goodboy/more_thorough_super_tests
More thorough basic supervision testsdrop_event_clear
commit
4d43f2564c
36
README.rst
36
README.rst
|
@ -10,7 +10,7 @@ An async-native "`actor model`_" built on trio_ and multiprocessing_.
|
||||||
|
|
||||||
.. _actor model: https://en.wikipedia.org/wiki/Actor_model
|
.. _actor model: https://en.wikipedia.org/wiki/Actor_model
|
||||||
.. _trio: https://github.com/python-trio/trio
|
.. _trio: https://github.com/python-trio/trio
|
||||||
.. _multiprocessing: https://docs.python.org/3/library/multiprocessing.html
|
.. _multiprocessing: https://en.wikipedia.org/wiki/Multiprocessing
|
||||||
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
|
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
|
||||||
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
||||||
.. _always propagate: https://trio.readthedocs.io/en/latest/design.html#exceptions-always-propagate
|
.. _always propagate: https://trio.readthedocs.io/en/latest/design.html#exceptions-always-propagate
|
||||||
|
@ -21,17 +21,31 @@ An async-native "`actor model`_" built on trio_ and multiprocessing_.
|
||||||
.. _chaos engineering: http://principlesofchaos.org/
|
.. _chaos engineering: http://principlesofchaos.org/
|
||||||
|
|
||||||
|
|
||||||
``tractor`` is an attempt to bring trionic_ `structured concurrency`_ to distributed multi-core Python.
|
``tractor`` is an attempt to bring trionic_ `structured concurrency`_ to
|
||||||
|
distributed multi-core Python.
|
||||||
|
|
||||||
``tractor`` lets you spawn ``trio`` *"actors"*: processes which each run a ``trio`` scheduler and task
|
``tractor`` lets you spawn ``trio`` *"actors"*: processes which each run
|
||||||
tree (also known as an `async sandwich`_). *Actors* communicate by exchanging asynchronous messages_ over
|
a ``trio`` scheduled task tree (also known as an `async sandwich`_).
|
||||||
channels_ and avoid sharing any state. This model allows for highly distributed software architecture
|
*Actors* communicate by exchanging asynchronous messages_ and avoid
|
||||||
which works just as well on multiple cores as it does over many hosts.
|
sharing any state. This model allows for highly distributed software
|
||||||
|
architecture which works just as well on multiple cores as it does over
|
||||||
|
many hosts.
|
||||||
|
|
||||||
``tractor`` is an actor-model-*like* system in the sense that it adheres to the `3 axioms`_ but does
|
``tractor`` is an actor-model-*like* system in the sense that it adheres
|
||||||
not (yet) fufill all "unrequirements_" in practice. The API and design takes inspiration from pulsar_ and
|
to the `3 axioms`_ but does not (yet) fulfil all "unrequirements_" in
|
||||||
execnet_ but attempts to be more focussed on sophistication of the lower level distributed architecture as
|
practise. It is an experiment in applying `structured concurrency`_
|
||||||
well as have first class support for streaming using `async generators`_.
|
constraints on a parallel processing system where multiple Python
|
||||||
|
processes exist over many hosts but no process can outlive its parent.
|
||||||
|
In `erlang` parlance, it is an architecture where every process has
|
||||||
|
a mandatory supervisor enforced by the type system. The API design is
|
||||||
|
almost exclusively inspired by trio_'s concepts and primitives (though
|
||||||
|
we often lag a little). As a distributed computing system `tractor`
|
||||||
|
attempts to place sophistication at the correct layer such that
|
||||||
|
concurrency primitives are powerful yet simple, making it easy to build
|
||||||
|
complex systems (you can build a "worker pool" architecture but it's
|
||||||
|
definitely not required). There is first class support for inter-actor
|
||||||
|
streaming using `async generators`_ and ongoing work toward a functional
|
||||||
|
reactive style for IPC.
|
||||||
|
|
||||||
The first step to grok ``tractor`` is to get the basics of ``trio`` down.
|
The first step to grok ``tractor`` is to get the basics of ``trio`` down.
|
||||||
A great place to start is the `trio docs`_ and this `blog post`_.
|
A great place to start is the `trio docs`_ and this `blog post`_.
|
||||||
|
@ -56,7 +70,7 @@ Its tenets non-comprehensively include:
|
||||||
|
|
||||||
- strict adherence to the `concept-in-progress`_ of *structured concurrency*
|
- strict adherence to the `concept-in-progress`_ of *structured concurrency*
|
||||||
- no spawning of processes *willy-nilly*; causality_ is paramount!
|
- no spawning of processes *willy-nilly*; causality_ is paramount!
|
||||||
- (remote) errors `always propagate`_ back to the parent / caller
|
- (remote) errors `always propagate`_ back to the parent supervisor
|
||||||
- verbatim support for ``trio``'s cancellation_ system
|
- verbatim support for ``trio``'s cancellation_ system
|
||||||
- `shared nothing architecture`_
|
- `shared nothing architecture`_
|
||||||
- no use of *proxy* objects or shared references between processes
|
- no use of *proxy* objects or shared references between processes
|
||||||
|
|
5
setup.py
5
setup.py
|
@ -32,7 +32,7 @@ setup(
|
||||||
maintainer='Tyler Goodlet',
|
maintainer='Tyler Goodlet',
|
||||||
maintainer_email='jgbt@protonmail.com',
|
maintainer_email='jgbt@protonmail.com',
|
||||||
url='https://github.com/goodboy/tractor',
|
url='https://github.com/goodboy/tractor',
|
||||||
platforms=['linux'],
|
platforms=['linux', 'windows'],
|
||||||
packages=[
|
packages=[
|
||||||
'tractor',
|
'tractor',
|
||||||
'tractor.testing',
|
'tractor.testing',
|
||||||
|
@ -53,7 +53,8 @@ setup(
|
||||||
"Programming Language :: Python :: Implementation :: CPython",
|
"Programming Language :: Python :: Implementation :: CPython",
|
||||||
"Programming Language :: Python :: Implementation :: PyPy",
|
"Programming Language :: Python :: Implementation :: PyPy",
|
||||||
"Programming Language :: Python :: 3 :: Only",
|
"Programming Language :: Python :: 3 :: Only",
|
||||||
"Programming Language :: Python :: 3.6",
|
"Programming Language :: Python :: 3.7",
|
||||||
|
"Programming Language :: Python :: 3.8",
|
||||||
"Intended Audience :: Science/Research",
|
"Intended Audience :: Science/Research",
|
||||||
"Intended Audience :: Developers",
|
"Intended Audience :: Developers",
|
||||||
"Topic :: System :: Distributed Computing",
|
"Topic :: System :: Distributed Computing",
|
||||||
|
|
|
@ -35,5 +35,7 @@ def pytest_generate_tests(metafunc):
|
||||||
from multiprocessing import get_all_start_methods
|
from multiprocessing import get_all_start_methods
|
||||||
methods = get_all_start_methods()
|
methods = get_all_start_methods()
|
||||||
if 'fork' in methods: # fork not available on windows, so check before removing
|
if 'fork' in methods: # fork not available on windows, so check before removing
|
||||||
|
# XXX: the fork method is in general incompatible with
|
||||||
|
# trio's global scheduler state
|
||||||
methods.remove('fork')
|
methods.remove('fork')
|
||||||
metafunc.parametrize("start_method", methods, scope='module')
|
metafunc.parametrize("start_method", methods, scope='module')
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
"""
|
"""
|
||||||
Cancellation and error propagation
|
Cancellation and error propagation
|
||||||
"""
|
"""
|
||||||
|
import platform
|
||||||
from itertools import repeat
|
from itertools import repeat
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -10,10 +11,20 @@ import tractor
|
||||||
from conftest import tractor_test
|
from conftest import tractor_test
|
||||||
|
|
||||||
|
|
||||||
async def assert_err():
|
async def assert_err(delay=0):
|
||||||
|
await trio.sleep(delay)
|
||||||
assert 0
|
assert 0
|
||||||
|
|
||||||
|
|
||||||
|
async def sleep_forever():
|
||||||
|
await trio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
||||||
|
async def do_nuthin():
|
||||||
|
# just nick the scheduler
|
||||||
|
await trio.sleep(0)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'args_err',
|
'args_err',
|
||||||
[
|
[
|
||||||
|
@ -77,6 +88,32 @@ def test_multierror(arb_addr):
|
||||||
tractor.run(main, arbiter_addr=arb_addr)
|
tractor.run(main, arbiter_addr=arb_addr)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('delay', (0, 0.5))
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'num_subactors', range(25, 26),
|
||||||
|
)
|
||||||
|
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
|
||||||
|
"""Verify we raise a ``trio.MultiError`` out of a nursery where
|
||||||
|
more then one actor errors and also with a delay before failure
|
||||||
|
to test failure during an ongoing spawning.
|
||||||
|
"""
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery() as nursery:
|
||||||
|
for i in range(num_subactors):
|
||||||
|
await nursery.run_in_actor(
|
||||||
|
f'errorer{i}', assert_err, delay=delay)
|
||||||
|
|
||||||
|
with pytest.raises(trio.MultiError) as exc_info:
|
||||||
|
tractor.run(main, arbiter_addr=arb_addr)
|
||||||
|
|
||||||
|
assert exc_info.type == tractor.MultiError
|
||||||
|
err = exc_info.value
|
||||||
|
assert len(err.exceptions) == num_subactors
|
||||||
|
for exc in err.exceptions:
|
||||||
|
assert isinstance(exc, tractor.RemoteActorError)
|
||||||
|
assert exc.type == AssertionError
|
||||||
|
|
||||||
|
|
||||||
def do_nothing():
|
def do_nothing():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -133,10 +170,31 @@ async def test_cancel_infinite_streamer(start_method):
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'num_actors_and_errs',
|
'num_actors_and_errs',
|
||||||
[
|
[
|
||||||
(1, tractor.RemoteActorError, AssertionError),
|
# daemon actors sit idle while single task actors error out
|
||||||
(2, tractor.MultiError, AssertionError)
|
(1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None),
|
||||||
|
(2, tractor.MultiError, AssertionError, (assert_err, {}), None),
|
||||||
|
(3, tractor.MultiError, AssertionError, (assert_err, {}), None),
|
||||||
|
|
||||||
|
# 1 daemon actor errors out while single task actors sleep forever
|
||||||
|
(3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}),
|
||||||
|
(assert_err, {}, True)),
|
||||||
|
# daemon actors error out after brief delay while single task
|
||||||
|
# actors complete quickly
|
||||||
|
(3, tractor.RemoteActorError, AssertionError,
|
||||||
|
(do_nuthin, {}), (assert_err, {'delay': 1}, True)),
|
||||||
|
# daemon complete quickly delay while single task
|
||||||
|
# actors error after brief delay
|
||||||
|
(3, tractor.MultiError, AssertionError,
|
||||||
|
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
|
||||||
|
],
|
||||||
|
ids=[
|
||||||
|
'1_run_in_actor_fails',
|
||||||
|
'2_run_in_actors_fail',
|
||||||
|
'3_run_in_actors_fail',
|
||||||
|
'1_daemon_actors_fail',
|
||||||
|
'1_daemon_actors_fail_all_run_in_actors_dun_quick',
|
||||||
|
'no_daemon_actors_fail_all_run_in_actors_sleep_then_fail',
|
||||||
],
|
],
|
||||||
ids=['one_actor', 'two_actors'],
|
|
||||||
)
|
)
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_some_cancels_all(num_actors_and_errs, start_method):
|
async def test_some_cancels_all(num_actors_and_errs, start_method):
|
||||||
|
@ -145,25 +203,50 @@ async def test_some_cancels_all(num_actors_and_errs, start_method):
|
||||||
|
|
||||||
This is the first and only supervisory strategy at the moment.
|
This is the first and only supervisory strategy at the moment.
|
||||||
"""
|
"""
|
||||||
num, first_err, err_type = num_actors_and_errs
|
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
|
||||||
try:
|
try:
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
real_actors = []
|
|
||||||
for i in range(3):
|
# spawn the same number of deamon actors which should be cancelled
|
||||||
real_actors.append(await n.start_actor(
|
dactor_portals = []
|
||||||
f'actor_{i}',
|
for i in range(num_actors):
|
||||||
|
dactor_portals.append(await n.start_actor(
|
||||||
|
f'deamon_{i}',
|
||||||
rpc_module_paths=[__name__],
|
rpc_module_paths=[__name__],
|
||||||
))
|
))
|
||||||
|
|
||||||
for i in range(num):
|
func, kwargs = ria_func
|
||||||
|
riactor_portals = []
|
||||||
|
for i in range(num_actors):
|
||||||
# start actor(s) that will fail immediately
|
# start actor(s) that will fail immediately
|
||||||
await n.run_in_actor(f'extra_{i}', assert_err)
|
riactor_portals.append(
|
||||||
|
await n.run_in_actor(f'actor_{i}', func, **kwargs))
|
||||||
|
|
||||||
|
if da_func:
|
||||||
|
func, kwargs, expect_error = da_func
|
||||||
|
for portal in dactor_portals:
|
||||||
|
# if this function fails then we should error here
|
||||||
|
# and the nursery should teardown all other actors
|
||||||
|
try:
|
||||||
|
await portal.run(__name__, func.__name__, **kwargs)
|
||||||
|
except tractor.RemoteActorError as err:
|
||||||
|
assert err.type == err_type
|
||||||
|
# we only expect this first error to propogate
|
||||||
|
# (all other daemons are cancelled before they
|
||||||
|
# can be scheduled)
|
||||||
|
num_actors = 1
|
||||||
|
# reraise so nursery teardown is triggered
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
if expect_error:
|
||||||
|
pytest.fail(
|
||||||
|
"Deamon call should fail at checkpoint?")
|
||||||
|
|
||||||
# should error here with a ``RemoteActorError`` or ``MultiError``
|
# should error here with a ``RemoteActorError`` or ``MultiError``
|
||||||
|
|
||||||
except first_err as err:
|
except first_err as err:
|
||||||
if isinstance(err, tractor.MultiError):
|
if isinstance(err, tractor.MultiError):
|
||||||
assert len(err.exceptions) == num
|
assert len(err.exceptions) == num_actors
|
||||||
for exc in err.exceptions:
|
for exc in err.exceptions:
|
||||||
if isinstance(exc, tractor.RemoteActorError):
|
if isinstance(exc, tractor.RemoteActorError):
|
||||||
assert exc.type == err_type
|
assert exc.type == err_type
|
||||||
|
@ -176,3 +259,42 @@ async def test_some_cancels_all(num_actors_and_errs, start_method):
|
||||||
assert not n._children
|
assert not n._children
|
||||||
else:
|
else:
|
||||||
pytest.fail("Should have gotten a remote assertion error?")
|
pytest.fail("Should have gotten a remote assertion error?")
|
||||||
|
|
||||||
|
|
||||||
|
async def spawn_and_error(num) -> None:
|
||||||
|
name = tractor.current_actor().name
|
||||||
|
async with tractor.open_nursery() as nursery:
|
||||||
|
for i in range(num):
|
||||||
|
await nursery.run_in_actor(
|
||||||
|
f'{name}_errorer_{i}', assert_err
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor_test
|
||||||
|
async def test_nested_multierrors(loglevel, start_method):
|
||||||
|
"""Test that failed actor sets are wrapped in `trio.MultiError`s.
|
||||||
|
This test goes only 2 nurseries deep but we should eventually have tests
|
||||||
|
for arbitrary n-depth actor trees.
|
||||||
|
"""
|
||||||
|
if platform.system() == 'Windows':
|
||||||
|
# Windows CI seems to be partifcularly fragile on Python 3.8..
|
||||||
|
num_subactors = 2
|
||||||
|
else:
|
||||||
|
# XXX: any more then this and the forkserver will
|
||||||
|
# start bailing hard...gotta look into it
|
||||||
|
num_subactors = 4
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with tractor.open_nursery() as nursery:
|
||||||
|
|
||||||
|
for i in range(num_subactors):
|
||||||
|
await nursery.run_in_actor(
|
||||||
|
f'spawner_{i}',
|
||||||
|
spawn_and_error,
|
||||||
|
num=num_subactors,
|
||||||
|
)
|
||||||
|
except trio.MultiError as err:
|
||||||
|
assert len(err.exceptions) == num_subactors
|
||||||
|
for subexc in err.exceptions:
|
||||||
|
assert isinstance(subexc, tractor.RemoteActorError)
|
||||||
|
assert subexc.type is trio.MultiError
|
||||||
|
|
|
@ -60,7 +60,7 @@ async def say_hello_use_wait(other_actor):
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
|
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
|
||||||
async def test_trynamic_trio(func):
|
async def test_trynamic_trio(func, start_method):
|
||||||
"""Main tractor entry point, the "master" process (for now
|
"""Main tractor entry point, the "master" process (for now
|
||||||
acts as the "director").
|
acts as the "director").
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -118,7 +118,7 @@ async def _invoke(
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
except Exception as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# always ship errors back to caller
|
# always ship errors back to caller
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
err_msg = pack_error(err)
|
err_msg = pack_error(err)
|
||||||
|
@ -352,7 +352,8 @@ class Actor:
|
||||||
return cid, recv_chan
|
return cid, recv_chan
|
||||||
|
|
||||||
async def _process_messages(
|
async def _process_messages(
|
||||||
self, chan: Channel,
|
self,
|
||||||
|
chan: Channel,
|
||||||
treat_as_gen: bool = False,
|
treat_as_gen: bool = False,
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
task_status=trio.TASK_STATUS_IGNORED,
|
task_status=trio.TASK_STATUS_IGNORED,
|
||||||
|
@ -461,7 +462,7 @@ class Actor:
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.error(f"{chan} form {chan.uid} broke")
|
log.error(f"{chan} form {chan.uid} broke")
|
||||||
except Exception as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# ship any "internal" exception (i.e. one from internal machinery
|
# ship any "internal" exception (i.e. one from internal machinery
|
||||||
# not from an rpc task) to parent
|
# not from an rpc task) to parent
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
|
@ -472,7 +473,7 @@ class Actor:
|
||||||
# above to trigger an error at consuming portal "checkpoints"
|
# above to trigger an error at consuming portal "checkpoints"
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
# debugging only
|
# debugging only
|
||||||
log.debug("Msg loop was cancelled")
|
log.debug(f"Msg loop was cancelled for {chan}")
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
log.debug(
|
log.debug(
|
||||||
|
|
|
@ -5,6 +5,8 @@ import importlib
|
||||||
import builtins
|
import builtins
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
_this_mod = importlib.import_module(__name__)
|
_this_mod = importlib.import_module(__name__)
|
||||||
|
|
||||||
|
@ -14,7 +16,7 @@ class RemoteActorError(Exception):
|
||||||
"Remote actor exception bundled locally"
|
"Remote actor exception bundled locally"
|
||||||
def __init__(self, message, type_str, **msgdata):
|
def __init__(self, message, type_str, **msgdata):
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
for ns in [builtins, _this_mod]:
|
for ns in [builtins, _this_mod, trio]:
|
||||||
try:
|
try:
|
||||||
self.type = getattr(ns, type_str)
|
self.type = getattr(ns, type_str)
|
||||||
break
|
break
|
||||||
|
|
|
@ -87,6 +87,7 @@ class ActorNursery:
|
||||||
event, chan = await self._actor.wait_for_peer(actor.uid)
|
event, chan = await self._actor.wait_for_peer(actor.uid)
|
||||||
portal = Portal(chan)
|
portal = Portal(chan)
|
||||||
self._children[actor.uid] = (actor, proc, portal)
|
self._children[actor.uid] = (actor, proc, portal)
|
||||||
|
|
||||||
return portal
|
return portal
|
||||||
|
|
||||||
async def run_in_actor(
|
async def run_in_actor(
|
||||||
|
@ -174,12 +175,19 @@ class ActorNursery:
|
||||||
result = await exhaust_portal(portal, actor)
|
result = await exhaust_portal(portal, actor)
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
errors.append(result)
|
errors.append(result)
|
||||||
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
log.warning(
|
||||||
|
f"Cancelling {portal.channel.uid} after error {result}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
||||||
|
|
||||||
|
# cancel the process now that we have a final result
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
# XXX: lol, this will never get run without a shield above..
|
||||||
log.warning(
|
# if cs.cancelled_caught:
|
||||||
"Result waiter was cancelled, process may have died")
|
# log.warning(
|
||||||
|
# "Result waiter was cancelled, process may have died")
|
||||||
|
|
||||||
async def wait_for_proc(
|
async def wait_for_proc(
|
||||||
proc: mp.Process,
|
proc: mp.Process,
|
||||||
|
@ -194,11 +202,12 @@ class ActorNursery:
|
||||||
# please god don't hang
|
# please god don't hang
|
||||||
proc.join()
|
proc.join()
|
||||||
log.debug(f"Joined {proc}")
|
log.debug(f"Joined {proc}")
|
||||||
|
# indicate we are no longer managing this subactor
|
||||||
self._children.pop(actor.uid)
|
self._children.pop(actor.uid)
|
||||||
|
|
||||||
# proc terminated, cancel result waiter that may have
|
# proc terminated, cancel result waiter that may have
|
||||||
# been spawned in tandem
|
# been spawned in tandem if not done already
|
||||||
if cancel_scope:
|
if cancel_scope: # and not portal._cancelled:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Cancelling existing result waiter task for {actor.uid}")
|
f"Cancelling existing result waiter task for {actor.uid}")
|
||||||
cancel_scope.cancel()
|
cancel_scope.cancel()
|
||||||
|
@ -222,11 +231,12 @@ class ActorNursery:
|
||||||
|
|
||||||
if errors:
|
if errors:
|
||||||
if not self.cancelled:
|
if not self.cancelled:
|
||||||
# halt here and expect to be called again once the nursery
|
# bubble up error(s) here and expect to be called again
|
||||||
# has been cancelled externally (ex. from within __aexit__()
|
# once the nursery has been cancelled externally (ex.
|
||||||
# if an error is captured from ``wait()`` then ``cancel()``
|
# from within __aexit__() if an error is caught around
|
||||||
# is called immediately after which in turn calls ``wait()``
|
# ``self.wait()`` then, ``self.cancel()`` is called
|
||||||
# again.)
|
# immediately, in the default supervisor strat, after
|
||||||
|
# which in turn ``self.wait()`` is called again.)
|
||||||
raise trio.MultiError(errors)
|
raise trio.MultiError(errors)
|
||||||
|
|
||||||
# wait on all `start_actor()` subactors to complete
|
# wait on all `start_actor()` subactors to complete
|
||||||
|
@ -259,7 +269,7 @@ class ActorNursery:
|
||||||
# os.kill(proc.pid, signal.SIGINT)
|
# os.kill(proc.pid, signal.SIGINT)
|
||||||
|
|
||||||
log.debug(f"Cancelling nursery")
|
log.debug(f"Cancelling nursery")
|
||||||
with trio.fail_after(3):
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
for subactor, proc, portal in self._children.values():
|
for subactor, proc, portal in self._children.values():
|
||||||
if hard_kill:
|
if hard_kill:
|
||||||
|
@ -272,6 +282,10 @@ class ActorNursery:
|
||||||
await event.wait()
|
await event.wait()
|
||||||
# channel/portal should now be up
|
# channel/portal should now be up
|
||||||
_, _, portal = self._children[subactor.uid]
|
_, _, portal = self._children[subactor.uid]
|
||||||
|
|
||||||
|
# XXX should be impossible to get here
|
||||||
|
# unless method was called from within
|
||||||
|
# shielded cancel scope.
|
||||||
if portal is None:
|
if portal is None:
|
||||||
# cancelled while waiting on the event
|
# cancelled while waiting on the event
|
||||||
# to arrive
|
# to arrive
|
||||||
|
@ -281,10 +295,18 @@ class ActorNursery:
|
||||||
else: # there's no other choice left
|
else: # there's no other choice left
|
||||||
do_hard_kill(proc)
|
do_hard_kill(proc)
|
||||||
|
|
||||||
# spawn cancel tasks
|
# spawn cancel tasks for each sub-actor
|
||||||
assert portal
|
assert portal
|
||||||
n.start_soon(portal.cancel_actor)
|
n.start_soon(portal.cancel_actor)
|
||||||
|
|
||||||
|
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||||
|
# then hard kill all sub-processes
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
log.error(f"Failed to gracefully cancel {self}, hard killing!")
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
for subactor, proc, portal in self._children.values():
|
||||||
|
n.start_soon(do_hard_kill, proc)
|
||||||
|
|
||||||
# mark ourselves as having (tried to have) cancelled all subactors
|
# mark ourselves as having (tried to have) cancelled all subactors
|
||||||
self.cancelled = True
|
self.cancelled = True
|
||||||
await self.wait()
|
await self.wait()
|
||||||
|
@ -292,6 +314,9 @@ class ActorNursery:
|
||||||
async def __aexit__(self, etype, value, tb):
|
async def __aexit__(self, etype, value, tb):
|
||||||
"""Wait on all subactor's main routines to complete.
|
"""Wait on all subactor's main routines to complete.
|
||||||
"""
|
"""
|
||||||
|
# XXX: this is effectively the (for now) lone
|
||||||
|
# cancellation/supervisor strategy (one-cancels-all)
|
||||||
|
# which exactly mimicks trio's behaviour
|
||||||
if etype is not None:
|
if etype is not None:
|
||||||
try:
|
try:
|
||||||
# XXX: hypothetically an error could be raised and then
|
# XXX: hypothetically an error could be raised and then
|
||||||
|
@ -313,16 +338,16 @@ class ActorNursery:
|
||||||
raise trio.MultiError(merr.exceptions + [value])
|
raise trio.MultiError(merr.exceptions + [value])
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
# XXX: this is effectively the (for now) lone
|
|
||||||
# cancellation/supervisor strategy which exactly
|
|
||||||
# mimicks trio's behaviour
|
|
||||||
log.debug(f"Waiting on subactors {self._children} to complete")
|
log.debug(f"Waiting on subactors {self._children} to complete")
|
||||||
try:
|
try:
|
||||||
await self.wait()
|
await self.wait()
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
log.warning(f"Nursery caught {err}, cancelling")
|
log.warning(f"Nursery cancelling due to {err}")
|
||||||
await self.cancel()
|
if self._children:
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await self.cancel()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
log.debug(f"Nursery teardown complete")
|
log.debug(f"Nursery teardown complete")
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue