forked from goodboy/tractor
Compare commits
65 Commits
master
...
zombie_lor
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | a55ea18c7d | |
Tyler Goodlet | 797bb22981 | |
Tyler Goodlet | 2c74db9cb7 | |
Tyler Goodlet | 39feb627a8 | |
Tyler Goodlet | 348423ece7 | |
Tyler Goodlet | 5eb7c4c857 | |
Tyler Goodlet | 4d30e25591 | |
Tyler Goodlet | c01d2f8aea | |
Tyler Goodlet | 8e21bb046e | |
Tyler Goodlet | 66137030d9 | |
Tyler Goodlet | cef9ab7353 | |
Tyler Goodlet | 0dcffeee0f | |
Tyler Goodlet | 8a59713d48 | |
Tyler Goodlet | 0488f5e57e | |
Tyler Goodlet | a3cdba0577 | |
Tyler Goodlet | 5048c3534f | |
Tyler Goodlet | 5df08aabb7 | |
Tyler Goodlet | 1b7cd419f2 | |
Tyler Goodlet | e32a5917a9 | |
Tyler Goodlet | 7250deb30f | |
Tyler Goodlet | 64ebb2aff4 | |
Tyler Goodlet | c02a493d8c | |
Tyler Goodlet | fb026e3747 | |
Tyler Goodlet | 2afbc3898f | |
Tyler Goodlet | f72eabd42a | |
Tyler Goodlet | 6e646a6fa6 | |
Tyler Goodlet | aa94ea5bcc | |
Tyler Goodlet | a2a4f7af09 | |
Tyler Goodlet | 6da2c3a885 | |
Tyler Goodlet | ed10f6e0c1 | |
Tyler Goodlet | b43539b252 | |
Tyler Goodlet | fc46f5b74a | |
Tyler Goodlet | efe83f78a3 | |
Tyler Goodlet | de87cb510a | |
Tyler Goodlet | e8431bffd0 | |
Tyler Goodlet | d720c6a9c2 | |
Tyler Goodlet | 732eaaf21e | |
Tyler Goodlet | c63323086c | |
Tyler Goodlet | 03ae42fa10 | |
Tyler Goodlet | 2cd3a878f0 | |
Tyler Goodlet | a237dcd020 | |
Tyler Goodlet | b4fe207369 | |
Tyler Goodlet | 9a994e2de3 | |
Tyler Goodlet | d2a810d950 | |
Tyler Goodlet | 07c2151010 | |
Tyler Goodlet | 0d825ae6d7 | |
Tyler Goodlet | 5be8c86e96 | |
Tyler Goodlet | aa069a1edc | |
Tyler Goodlet | 3c1cc90c40 | |
Tyler Goodlet | 056ca97d2a | |
Tyler Goodlet | 558ba7e008 | |
Tyler Goodlet | 1aa70da58b | |
Tyler Goodlet | 96cf4a962d | |
Tyler Goodlet | fd70965422 | |
Tyler Goodlet | 6ad819362e | |
Tyler Goodlet | 16ab14d959 | |
Tyler Goodlet | c7e03ae3b4 | |
Tyler Goodlet | 38b844fb22 | |
Tyler Goodlet | 3f8f848ce8 | |
Tyler Goodlet | 2fbc43f0c3 | |
Tyler Goodlet | 9c63cb87c7 | |
Tyler Goodlet | d7e36ad817 | |
Tyler Goodlet | 7c6f6571f1 | |
Tyler Goodlet | ebf3ad6af0 | |
Tyler Goodlet | a568d8af74 |
|
@ -423,6 +423,7 @@ channel`_!
|
|||
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
||||
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
||||
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
|
||||
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
|
||||
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
||||
.. _trio gitter channel: https://gitter.im/python-trio/general
|
||||
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
|
||||
|
@ -431,7 +432,7 @@ channel`_!
|
|||
.. _messages: https://en.wikipedia.org/wiki/Message_passing
|
||||
.. _trio docs: https://trio.readthedocs.io/en/latest/
|
||||
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
||||
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
||||
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
|
||||
.. _async generators: https://www.python.org/dev/peps/pep-0525/
|
||||
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
|
||||
|
|
|
@ -20,7 +20,7 @@ async def sleep(
|
|||
|
||||
|
||||
async def open_ctx(
|
||||
n: tractor._trionics.ActorNursery
|
||||
n: tractor._supervise.ActorNursery
|
||||
):
|
||||
|
||||
# spawn both actors
|
||||
|
|
|
@ -74,7 +74,15 @@ def pytest_configure(config):
|
|||
@pytest.fixture(scope='session', autouse=True)
|
||||
def loglevel(request):
|
||||
orig = tractor.log._default_loglevel
|
||||
level = tractor.log._default_loglevel = request.config.option.loglevel
|
||||
|
||||
level_from_cli = request.config.option.loglevel
|
||||
# disable built-in capture when user passes the `--ll` value
|
||||
# presuming they already know they want to see console logging
|
||||
# and don't need it repeated by pytest.
|
||||
if level_from_cli:
|
||||
request.config.option.showcapture = 'no'
|
||||
|
||||
level = tractor.log._default_loglevel = level_from_cli
|
||||
yield level
|
||||
tractor.log._default_loglevel = orig
|
||||
|
||||
|
|
|
@ -23,9 +23,9 @@ async def sleep_forever():
|
|||
await trio.sleep_forever()
|
||||
|
||||
|
||||
async def do_nuthin():
|
||||
async def do_nuthin(sleep=0):
|
||||
# just nick the scheduler
|
||||
await trio.sleep(0)
|
||||
await trio.sleep(sleep)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -100,6 +100,7 @@ def test_multierror(arb_addr):
|
|||
@pytest.mark.parametrize('delay', (0, 0.5))
|
||||
@pytest.mark.parametrize(
|
||||
'num_subactors', range(25, 26),
|
||||
# 'num_subactors', range(2, 3),
|
||||
)
|
||||
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
|
||||
"""Verify we raise a ``trio.MultiError`` out of a nursery where
|
||||
|
@ -122,16 +123,20 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
|
|||
trio.run(main)
|
||||
|
||||
assert exc_info.type == tractor.MultiError
|
||||
err = exc_info.value
|
||||
exceptions = err.exceptions
|
||||
multi = exc_info.value
|
||||
exceptions = multi.exceptions
|
||||
|
||||
if len(exceptions) == 2:
|
||||
# sometimes oddly now there's an embedded BrokenResourceError ?
|
||||
exceptions = exceptions[1].exceptions
|
||||
|
||||
assert len(exceptions) == num_subactors
|
||||
|
||||
# sometimes there's an embedded BrokenResourceError
|
||||
# next to the main multierror?
|
||||
for exc in exceptions:
|
||||
if hasattr(exc, 'exceptions'):
|
||||
multi = exc
|
||||
break
|
||||
|
||||
assert len(multi.exceptions) == num_subactors
|
||||
|
||||
for exc in multi.exceptions:
|
||||
assert isinstance(exc, tractor.RemoteActorError)
|
||||
assert exc.type == AssertionError
|
||||
|
||||
|
@ -218,7 +223,7 @@ async def test_cancel_infinite_streamer(start_method):
|
|||
# daemon complete quickly delay while single task
|
||||
# actors error after brief delay
|
||||
(3, tractor.MultiError, AssertionError,
|
||||
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
|
||||
(assert_err, {'delay': 1}), (do_nuthin, {'sleep': 0}, False)),
|
||||
],
|
||||
ids=[
|
||||
'1_run_in_actor_fails',
|
||||
|
@ -321,6 +326,7 @@ async def spawn_and_error(breadth, depth) -> None:
|
|||
)
|
||||
kwargs = {
|
||||
'name': f'{name}_errorer_{i}',
|
||||
# 'delay': 0.01,
|
||||
}
|
||||
await nursery.run_in_actor(*args, **kwargs)
|
||||
|
||||
|
@ -355,9 +361,13 @@ async def test_nested_multierrors(loglevel, start_method):
|
|||
depth=depth,
|
||||
)
|
||||
except trio.MultiError as err:
|
||||
_err = err
|
||||
assert len(err.exceptions) == subactor_breadth
|
||||
for subexc in err.exceptions:
|
||||
|
||||
# NOTE: use [print(f'err: {err}') for err in _err.exceptions]
|
||||
# to inspect errors from console on failure
|
||||
|
||||
# verify first level actor errors are wrapped as remote
|
||||
if platform.system() == 'Windows':
|
||||
|
||||
|
@ -381,13 +391,25 @@ async def test_nested_multierrors(loglevel, start_method):
|
|||
# on windows sometimes spawning is just too slow and
|
||||
# we get back the (sent) cancel signal instead
|
||||
if platform.system() == 'Windows':
|
||||
assert (subexc.type is trio.MultiError) or (
|
||||
subexc.type is tractor.RemoteActorError)
|
||||
assert subexc.type in (
|
||||
trio.MultiError,
|
||||
tractor.RemoteActorError,
|
||||
)
|
||||
|
||||
else:
|
||||
assert subexc.type is trio.MultiError
|
||||
assert subexc.type in (
|
||||
trio.MultiError,
|
||||
trio.Cancelled,
|
||||
# tractor.RemoteActorError,
|
||||
)
|
||||
else:
|
||||
assert (subexc.type is tractor.RemoteActorError) or (
|
||||
subexc.type is trio.Cancelled)
|
||||
assert subexc.type in (
|
||||
tractor.RemoteActorError,
|
||||
trio.Cancelled,
|
||||
)
|
||||
|
||||
else:
|
||||
pytest.fail(f'Got no error from nursery?')
|
||||
|
||||
|
||||
@no_windows
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
import asyncio
|
||||
|
||||
import pytest
|
||||
import tractor
|
||||
|
||||
async def sleep_and_err():
|
||||
await asyncio.sleep(0.1)
|
||||
assert 0
|
||||
|
||||
|
||||
async def asyncio_actor():
|
||||
assert tractor.current_actor().is_infected_aio()
|
||||
|
||||
await tractor.to_asyncio.run_task(sleep_and_err)
|
||||
|
||||
|
||||
def test_infected_simple_error(arb_addr):
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
await n.run_in_actor(asyncio_actor, infected_asyncio=True)
|
||||
|
||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||
tractor.run(main, arbiter_addr=arb_addr)
|
|
@ -180,6 +180,7 @@ def test_multi_actor_subs_arbiter_pub(
|
|||
'streamer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
name = 'streamer'
|
||||
|
||||
even_portal = await n.run_in_actor(
|
||||
subs,
|
||||
|
|
|
@ -12,7 +12,7 @@ import pytest
|
|||
import trio
|
||||
from trio.lowlevel import current_task
|
||||
import tractor
|
||||
from tractor._broadcast import broadcast_receiver, Lagged
|
||||
from tractor.trionics import broadcast_receiver, Lagged
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -432,7 +432,6 @@ def test_first_recver_is_cancelled():
|
|||
tx, rx = trio.open_memory_channel(1)
|
||||
brx = broadcast_receiver(rx, 1)
|
||||
cs = trio.CancelScope()
|
||||
sequence = list(range(3))
|
||||
|
||||
async def sub_and_recv():
|
||||
with cs:
|
||||
|
|
|
@ -13,7 +13,7 @@ from ._streaming import (
|
|||
context,
|
||||
)
|
||||
from ._discovery import get_arbiter, find_actor, wait_for_actor
|
||||
from ._trionics import open_nursery
|
||||
from ._supervise import open_nursery
|
||||
from ._state import current_actor, is_root_process
|
||||
from ._exceptions import (
|
||||
RemoteActorError,
|
||||
|
|
|
@ -49,6 +49,7 @@ async def _invoke(
|
|||
chan: Channel,
|
||||
func: typing.Callable,
|
||||
kwargs: Dict[str, Any],
|
||||
is_rpc: bool = True,
|
||||
task_status: TaskStatus[
|
||||
Union[trio.CancelScope, BaseException]
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
@ -57,7 +58,7 @@ async def _invoke(
|
|||
Invoke local func and deliver result(s) over provided channel.
|
||||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
# __tracebackhide__ = True
|
||||
treat_as_gen = False
|
||||
|
||||
# possible a traceback (not sure what typing is for this..)
|
||||
|
@ -68,6 +69,7 @@ async def _invoke(
|
|||
|
||||
ctx = Context(chan, cid)
|
||||
context: bool = False
|
||||
fname = func.__name__
|
||||
|
||||
if getattr(func, '_tractor_stream_function', False):
|
||||
# handle decorated ``@tractor.stream`` async functions
|
||||
|
@ -163,6 +165,7 @@ async def _invoke(
|
|||
await chan.send({'return': await coro, 'cid': cid})
|
||||
except trio.Cancelled as err:
|
||||
tb = err.__traceback__
|
||||
raise
|
||||
|
||||
if cs.cancelled_caught:
|
||||
|
||||
|
@ -170,7 +173,6 @@ async def _invoke(
|
|||
# so they can be unwrapped and displayed on the caller
|
||||
# side!
|
||||
|
||||
fname = func.__name__
|
||||
if ctx._cancel_called:
|
||||
msg = f'{fname} cancelled itself'
|
||||
|
||||
|
@ -191,9 +193,33 @@ async def _invoke(
|
|||
await chan.send({'functype': 'asyncfunc', 'cid': cid})
|
||||
with cancel_scope as cs:
|
||||
task_status.started(cs)
|
||||
try:
|
||||
await chan.send({'return': await coro, 'cid': cid})
|
||||
except trio.Cancelled as err:
|
||||
tb = err.__traceback__
|
||||
raise
|
||||
# await chan.send({'return': await coro, 'cid': cid})
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
if cs.cancelled_caught:
|
||||
# if cs.cancel_called:
|
||||
if cs.cancel_called:
|
||||
msg = (
|
||||
f'{fname} was remotely cancelled by its caller '
|
||||
f'{ctx.chan.uid}'
|
||||
)
|
||||
else:
|
||||
msg = f'{fname} cancelled itself'
|
||||
|
||||
raise ContextCancelled(
|
||||
msg,
|
||||
suberror_type=trio.Cancelled,
|
||||
)
|
||||
|
||||
except (
|
||||
Exception,
|
||||
trio.MultiError,
|
||||
# trio.Cancelled,
|
||||
) as err:
|
||||
|
||||
if not is_multi_cancelled(err):
|
||||
|
||||
|
@ -243,10 +269,11 @@ async def _invoke(
|
|||
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
|
||||
is_complete.set()
|
||||
except KeyError:
|
||||
if is_rpc:
|
||||
# If we're cancelled before the task returns then the
|
||||
# cancel scope will not have been inserted yet
|
||||
log.warning(
|
||||
f"Task {func} likely errored or cancelled before it started")
|
||||
f"Task {func} likely errored or cancelled before start")
|
||||
finally:
|
||||
if not actor._rpc_tasks:
|
||||
log.runtime("All RPC tasks have completed")
|
||||
|
@ -280,6 +307,9 @@ class Actor:
|
|||
_parent_main_data: Dict[str, str]
|
||||
_parent_chan_cs: Optional[trio.CancelScope] = None
|
||||
|
||||
# if started on ``asycio`` running ``trio`` in guest mode
|
||||
_infected_aio: bool = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
|
@ -353,6 +383,14 @@ class Actor:
|
|||
Tuple[Any, Any, Any, Any, Any]] = None
|
||||
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
'''
|
||||
Same principle as ``trio.CancelScope.cancel_called``.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
async def wait_for_peer(
|
||||
self, uid: Tuple[str, str]
|
||||
) -> Tuple[trio.Event, Channel]:
|
||||
|
@ -503,8 +541,8 @@ class Actor:
|
|||
log.runtime(f"Peers is {self._peers}")
|
||||
|
||||
if not self._peers: # no more channels connected
|
||||
self._no_more_peers.set()
|
||||
log.runtime("Signalling no more peer channels")
|
||||
self._no_more_peers.set()
|
||||
|
||||
# # XXX: is this necessary (GC should do it?)
|
||||
if chan.connected():
|
||||
|
@ -538,7 +576,8 @@ class Actor:
|
|||
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
|
||||
assert send_chan.cid == cid # type: ignore
|
||||
|
||||
# if 'error' in msg:
|
||||
if 'error' in msg:
|
||||
recv_chan
|
||||
# ctx = getattr(recv_chan, '_ctx', None)
|
||||
# if ctx:
|
||||
# ctx._error_from_remote_msg(msg)
|
||||
|
@ -613,6 +652,7 @@ class Actor:
|
|||
# worked out we'll likely want to use that!
|
||||
msg = None
|
||||
nursery_cancelled_before_task: bool = False
|
||||
uid = chan.uid
|
||||
|
||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||
try:
|
||||
|
@ -636,7 +676,7 @@ class Actor:
|
|||
|
||||
log.runtime(
|
||||
f"Msg loop signalled to terminate for"
|
||||
f" {chan} from {chan.uid}")
|
||||
f" {chan} from {uid}")
|
||||
|
||||
break
|
||||
|
||||
|
@ -671,16 +711,47 @@ class Actor:
|
|||
f"{ns}.{funcname}({kwargs})")
|
||||
if ns == 'self':
|
||||
func = getattr(self, funcname)
|
||||
|
||||
if funcname == 'cancel':
|
||||
# self.cancel() was called so kill this
|
||||
# msg loop and break out into
|
||||
# ``_async_main()``
|
||||
|
||||
log.cancel(
|
||||
f"{self.uid} remote cancel msg from {uid}")
|
||||
|
||||
# don't start entire actor runtime
|
||||
# cancellation if this actor is in debug
|
||||
# mode
|
||||
pdb_complete = _debug._local_pdb_complete
|
||||
if pdb_complete:
|
||||
log.cancel(
|
||||
f'{self.uid} is in debug, wait for unlock')
|
||||
await pdb_complete.wait()
|
||||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
with trio.CancelScope(shield=True):
|
||||
await _invoke(
|
||||
self, cid, chan, func, kwargs, is_rpc=False
|
||||
)
|
||||
|
||||
loop_cs.cancel()
|
||||
continue
|
||||
|
||||
if funcname == '_cancel_task':
|
||||
# XXX: a special case is made here for
|
||||
# remote calls since we don't want the
|
||||
# remote actor have to know which channel
|
||||
# the task is associated with and we can't
|
||||
# pass non-primitive types between actors.
|
||||
# This means you can use:
|
||||
# Portal.run('self', '_cancel_task, cid=did)
|
||||
# without passing the `chan` arg.
|
||||
task_cid = kwargs['cid']
|
||||
log.cancel(
|
||||
f'Actor {uid} requests cancel for {task_cid}')
|
||||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
with trio.CancelScope(shield=True):
|
||||
kwargs['chan'] = chan
|
||||
await _invoke(
|
||||
self, cid, chan, func, kwargs, is_rpc=False
|
||||
)
|
||||
continue
|
||||
else:
|
||||
# complain to client about restricted modules
|
||||
try:
|
||||
|
@ -699,14 +770,15 @@ class Actor:
|
|||
partial(_invoke, self, cid, chan, func, kwargs),
|
||||
name=funcname,
|
||||
)
|
||||
except RuntimeError:
|
||||
except (RuntimeError, trio.MultiError):
|
||||
# avoid reporting a benign race condition
|
||||
# during actor runtime teardown.
|
||||
nursery_cancelled_before_task = True
|
||||
break
|
||||
|
||||
# never allow cancelling cancel requests (results in
|
||||
# deadlock and other weird behaviour)
|
||||
if func != self.cancel:
|
||||
# if func != self.cancel:
|
||||
if isinstance(cs, Exception):
|
||||
log.warning(
|
||||
f"Task for RPC func {func} failed with"
|
||||
|
@ -719,20 +791,12 @@ class Actor:
|
|||
# cancelled gracefully if requested
|
||||
self._rpc_tasks[(chan, cid)] = (
|
||||
cs, func, trio.Event())
|
||||
else:
|
||||
# self.cancel() was called so kill this msg loop
|
||||
# and break out into ``_async_main()``
|
||||
log.warning(
|
||||
f"Actor {self.uid} was remotely cancelled; "
|
||||
"waiting on cancellation completion..")
|
||||
await self._cancel_complete.wait()
|
||||
loop_cs.cancel()
|
||||
break
|
||||
|
||||
log.runtime(
|
||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||
else:
|
||||
# channel disconnect
|
||||
|
||||
# end of async for, channel disconnect vis
|
||||
# ``trio.EndOfChannel``
|
||||
log.runtime(
|
||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||
)
|
||||
|
@ -947,6 +1011,9 @@ class Actor:
|
|||
# Blocks here as expected until the root nursery is
|
||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||
except Exception as err:
|
||||
log.info("Closing all actor lifetime contexts")
|
||||
_lifetime_stack.close()
|
||||
|
||||
if not registered_with_arbiter:
|
||||
# TODO: I guess we could try to connect back
|
||||
# to the parent through a channel and engage a debugger
|
||||
|
@ -976,11 +1043,21 @@ class Actor:
|
|||
raise
|
||||
|
||||
finally:
|
||||
log.runtime("Root nursery complete")
|
||||
log.runtime("root runtime nursery complete")
|
||||
|
||||
# tear down all lifetime contexts if not in guest mode
|
||||
# XXX: should this just be in the entrypoint?
|
||||
log.cancel("Closing all actor lifetime contexts")
|
||||
log.info("Closing all actor lifetime contexts")
|
||||
|
||||
# TODO: we can't actually do this bc the debugger
|
||||
# uses the _service_n to spawn the lock task, BUT,
|
||||
# in theory if we had the root nursery surround this finally
|
||||
# block it might be actually possible to debug THIS
|
||||
# machinery in the same way as user task code?
|
||||
# if self.name == 'brokerd.ib':
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await _debug.breakpoint()
|
||||
|
||||
_lifetime_stack.close()
|
||||
|
||||
# Unregister actor from the arbiter
|
||||
|
@ -1065,7 +1142,7 @@ class Actor:
|
|||
self._service_n.start_soon(self.cancel)
|
||||
|
||||
async def cancel(self) -> bool:
|
||||
"""Cancel this actor.
|
||||
"""Cancel this actor's runtime.
|
||||
|
||||
The "deterministic" teardown sequence in order is:
|
||||
- cancel all ongoing rpc tasks by cancel scope
|
||||
|
@ -1099,7 +1176,7 @@ class Actor:
|
|||
if self._service_n:
|
||||
self._service_n.cancel_scope.cancel()
|
||||
|
||||
log.cancel(f"{self.uid} was sucessfullly cancelled")
|
||||
log.cancel(f"{self.uid} called `Actor.cancel()`")
|
||||
self._cancel_complete.set()
|
||||
return True
|
||||
|
||||
|
@ -1158,15 +1235,21 @@ class Actor:
|
|||
registered for each.
|
||||
"""
|
||||
tasks = self._rpc_tasks
|
||||
if tasks:
|
||||
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||
for (chan, cid) in tasks.copy():
|
||||
for (
|
||||
(chan, cid),
|
||||
(scope, func, is_complete),
|
||||
) in tasks.copy().items():
|
||||
if only_chan is not None:
|
||||
if only_chan != chan:
|
||||
continue
|
||||
|
||||
# TODO: this should really done in a nursery batch
|
||||
if func != self._cancel_task:
|
||||
await self._cancel_task(cid, chan)
|
||||
|
||||
if tasks:
|
||||
log.cancel(
|
||||
f"Waiting for remaining rpc tasks to complete {tasks}")
|
||||
await self._ongoing_rpc_tasks.wait()
|
||||
|
@ -1216,6 +1299,9 @@ class Actor:
|
|||
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||
return uid
|
||||
|
||||
def is_infected_aio(self) -> bool:
|
||||
return self._infected_aio
|
||||
|
||||
|
||||
class Arbiter(Actor):
|
||||
"""A special actor who knows all the other actors and always has
|
||||
|
|
|
@ -19,12 +19,15 @@ def parse_ipaddr(arg):
|
|||
return (str(host), int(port))
|
||||
|
||||
|
||||
from ._entry import _trio_main
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--uid", type=parse_uid)
|
||||
parser.add_argument("--loglevel", type=str)
|
||||
parser.add_argument("--parent_addr", type=parse_ipaddr)
|
||||
parser.add_argument("--asyncio", action='store_true')
|
||||
args = parser.parse_args()
|
||||
|
||||
subactor = Actor(
|
||||
|
@ -36,5 +39,6 @@ if __name__ == "__main__":
|
|||
|
||||
_trio_main(
|
||||
subactor,
|
||||
parent_addr=args.parent_addr
|
||||
parent_addr=args.parent_addr,
|
||||
infect_asyncio=args.asyncio,
|
||||
)
|
|
@ -0,0 +1,50 @@
|
|||
'''
|
||||
Actor cluster helpers.
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from multiprocessing import cpu_count
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
|
||||
@acm
|
||||
async def open_actor_cluster(
|
||||
|
||||
modules: list[str],
|
||||
count: int = cpu_count(),
|
||||
names: Optional[list[str]] = None,
|
||||
|
||||
) -> AsyncGenerator[
|
||||
list[str],
|
||||
dict[str, tractor.Portal]
|
||||
]:
|
||||
|
||||
portals: dict[str, tractor.Portal] = {}
|
||||
uid = tractor.current_actor().uid
|
||||
|
||||
if not names:
|
||||
suffix = '_'.join(uid)
|
||||
names = [f'worker_{i}.' + suffix for i in range(count)]
|
||||
|
||||
if not len(names) == count:
|
||||
raise ValueError(
|
||||
'Number of names is {len(names)} but count it {count}')
|
||||
|
||||
async with tractor.open_nursery() as an:
|
||||
async with trio.open_nursery() as n:
|
||||
for index, key in zip(range(count), names):
|
||||
|
||||
async def start(i) -> None:
|
||||
key = f'worker_{i}.' + '_'.join(uid)
|
||||
portals[key] = await an.start_actor(
|
||||
enable_modules=modules,
|
||||
name=key,
|
||||
)
|
||||
|
||||
n.start_soon(start, index)
|
||||
|
||||
assert len(portals) == count
|
||||
yield portals
|
|
@ -2,6 +2,7 @@
|
|||
Multi-core debugging for da peeps!
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import bdb
|
||||
import sys
|
||||
from functools import partial
|
||||
|
@ -20,12 +21,14 @@ from ._exceptions import is_multi_cancelled
|
|||
try:
|
||||
# wtf: only exported when installed in dev mode?
|
||||
import pdbpp
|
||||
|
||||
except ImportError:
|
||||
# pdbpp is installed in regular mode...it monkey patches stuff
|
||||
import pdb
|
||||
assert pdb.xpm, "pdbpp is not installed?" # type: ignore
|
||||
pdbpp = pdb
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
|
@ -85,6 +88,23 @@ class PdbwTeardown(pdbpp.Pdb):
|
|||
_pdb_release_hook()
|
||||
|
||||
|
||||
def _mk_pdb() -> PdbwTeardown:
|
||||
|
||||
# XXX: setting these flags on the pdb instance are absolutely
|
||||
# critical to having ctrl-c work in the ``trio`` standard way! The
|
||||
# stdlib's pdb supports entering the current sync frame on a SIGINT,
|
||||
# with ``trio`` we pretty much never want this and if we did we can
|
||||
# handle it in the ``tractor`` task runtime.
|
||||
# global pdb
|
||||
|
||||
pdb = PdbwTeardown()
|
||||
pdb.nosigint = True
|
||||
pdb.allow_kbdint = True
|
||||
opts = (allow_kbdint, nosigint) = pdb.allow_kbdint, pdb.nosigint
|
||||
print(f'`pdbp` was configured with {opts}')
|
||||
return pdb
|
||||
|
||||
|
||||
# TODO: will be needed whenever we get to true remote debugging.
|
||||
# XXX see https://github.com/goodboy/tractor/issues/130
|
||||
|
||||
|
@ -219,7 +239,8 @@ async def _hijack_stdin_for_child(
|
|||
subactor_uid: Tuple[str, str]
|
||||
|
||||
) -> str:
|
||||
'''Hijack the tty in the root process of an actor tree such that
|
||||
'''
|
||||
Hijack the tty in the root process of an actor tree such that
|
||||
the pdbpp debugger console can be allocated to a sub-actor for repl
|
||||
bossing.
|
||||
|
||||
|
@ -254,6 +275,8 @@ async def _hijack_stdin_for_child(
|
|||
# assert await stream.receive() == 'pdb_unlock'
|
||||
|
||||
except (
|
||||
# BaseException,
|
||||
trio.MultiError,
|
||||
trio.BrokenResourceError,
|
||||
trio.Cancelled, # by local cancellation
|
||||
trio.ClosedResourceError, # by self._rx_chan
|
||||
|
@ -268,8 +291,9 @@ async def _hijack_stdin_for_child(
|
|||
|
||||
if isinstance(err, trio.Cancelled):
|
||||
raise
|
||||
|
||||
log.debug(f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
||||
finally:
|
||||
log.pdb("TTY lock released, remote task:"
|
||||
f"{task_name}:{subactor_uid}")
|
||||
|
||||
return "pdb_unlock_complete"
|
||||
|
||||
|
@ -326,7 +350,7 @@ async def _breakpoint(
|
|||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
log.error('opened stream')
|
||||
log.debug('opened stream')
|
||||
# unblock local caller
|
||||
task_status.started()
|
||||
|
||||
|
@ -343,6 +367,7 @@ async def _breakpoint(
|
|||
|
||||
except tractor.ContextCancelled:
|
||||
log.warning('Root actor cancelled debug lock')
|
||||
raise
|
||||
|
||||
finally:
|
||||
log.debug(f"Exiting debugger for actor {actor}")
|
||||
|
@ -407,11 +432,14 @@ async def _breakpoint(
|
|||
'Root actor attempting to shield-acquire active tty lock'
|
||||
f' owned by {_global_actor_in_debug}')
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
stats = _debug_lock.statistics()
|
||||
if stats.owner:
|
||||
print(f'LOCK STATS: {stats}')
|
||||
|
||||
# must shield here to avoid hitting a ``Cancelled`` and
|
||||
# a child getting stuck bc we clobbered the tty
|
||||
with trio.CancelScope(shield=True):
|
||||
await _debug_lock.acquire()
|
||||
|
||||
else:
|
||||
# may be cancelled
|
||||
await _debug_lock.acquire()
|
||||
|
@ -437,21 +465,6 @@ async def _breakpoint(
|
|||
debug_func(actor)
|
||||
|
||||
|
||||
def _mk_pdb() -> PdbwTeardown:
|
||||
|
||||
# XXX: setting these flags on the pdb instance are absolutely
|
||||
# critical to having ctrl-c work in the ``trio`` standard way! The
|
||||
# stdlib's pdb supports entering the current sync frame on a SIGINT,
|
||||
# with ``trio`` we pretty much never want this and if we did we can
|
||||
# handle it in the ``tractor`` task runtime.
|
||||
|
||||
pdb = PdbwTeardown()
|
||||
pdb.allow_kbdint = True
|
||||
pdb.nosigint = True
|
||||
|
||||
return pdb
|
||||
|
||||
|
||||
def _set_trace(actor=None):
|
||||
pdb = _mk_pdb()
|
||||
|
||||
|
@ -524,3 +537,57 @@ async def _maybe_enter_pm(err):
|
|||
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
async def maybe_wait_for_debugger() -> None:
|
||||
|
||||
global _no_remote_has_tty, _global_actor_in_debug
|
||||
|
||||
# If we error in the root but the debugger is
|
||||
# engaged we don't want to prematurely kill (and
|
||||
# thus clobber access to) the local tty since it
|
||||
# will make the pdb repl unusable.
|
||||
# Instead try to wait for pdb to be released before
|
||||
# tearing down.
|
||||
if (
|
||||
_state.debug_mode() and
|
||||
is_root_process()
|
||||
):
|
||||
|
||||
# TODO: could this make things more deterministic?
|
||||
# wait to see if a sub-actor task will be
|
||||
# scheduled and grab the tty lock on the next
|
||||
# tick?
|
||||
# await trio.testing.wait_all_tasks_blocked()
|
||||
|
||||
sub_in_debug = None
|
||||
if _global_actor_in_debug:
|
||||
sub_in_debug = tuple(_global_actor_in_debug)
|
||||
|
||||
for _ in range(1):
|
||||
with trio.CancelScope(shield=True):
|
||||
|
||||
log.pdb(
|
||||
'Polling for debug lock'
|
||||
)
|
||||
await trio.sleep(0.01)
|
||||
|
||||
debug_complete = _no_remote_has_tty
|
||||
if (
|
||||
(debug_complete and
|
||||
not debug_complete.is_set())
|
||||
):
|
||||
log.warning(
|
||||
'Root has errored but pdb is in use by '
|
||||
f'child {sub_in_debug}\n'
|
||||
'Waiting on tty lock to release..')
|
||||
|
||||
await debug_complete.wait()
|
||||
|
||||
await trio.sleep(0.01)
|
||||
continue
|
||||
else:
|
||||
log.warning(
|
||||
'Root acquired DEBUGGER'
|
||||
)
|
||||
return
|
||||
|
|
|
@ -9,6 +9,7 @@ import trio # type: ignore
|
|||
|
||||
from .log import get_console_log, get_logger
|
||||
from . import _state
|
||||
from .to_asyncio import run_as_asyncio_guest
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -20,6 +21,7 @@ def _mp_main(
|
|||
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
||||
start_method: str,
|
||||
parent_addr: Tuple[str, int] = None,
|
||||
infect_asyncio: bool = False,
|
||||
) -> None:
|
||||
"""The routine called *after fork* which invokes a fresh ``trio.run``
|
||||
"""
|
||||
|
@ -45,6 +47,10 @@ def _mp_main(
|
|||
parent_addr=parent_addr
|
||||
)
|
||||
try:
|
||||
if infect_asyncio:
|
||||
actor._infected_aio = True
|
||||
run_as_asyncio_guest(trio_main)
|
||||
else:
|
||||
trio.run(trio_main)
|
||||
except KeyboardInterrupt:
|
||||
pass # handle it the same way trio does?
|
||||
|
@ -57,15 +63,17 @@ def _trio_main(
|
|||
actor: 'Actor', # type: ignore
|
||||
*,
|
||||
parent_addr: Tuple[str, int] = None,
|
||||
infect_asyncio: bool = False,
|
||||
) -> None:
|
||||
"""Entry point for a `trio_run_in_process` subactor.
|
||||
"""
|
||||
# Disable sigint handling in children;
|
||||
# we don't need it thanks to our cancellation machinery.
|
||||
# signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
Entry point for a `trio_run_in_process` subactor.
|
||||
|
||||
"""
|
||||
log.info(f"Started new trio process for {actor.uid}")
|
||||
|
||||
# Disable sigint handling in children?
|
||||
# signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
if actor.loglevel is not None:
|
||||
log.info(
|
||||
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
||||
|
@ -83,6 +91,10 @@ def _trio_main(
|
|||
)
|
||||
|
||||
try:
|
||||
if infect_asyncio:
|
||||
actor._infected_aio = True
|
||||
run_as_asyncio_guest(trio_main)
|
||||
else:
|
||||
trio.run(trio_main)
|
||||
except KeyboardInterrupt:
|
||||
log.warning(f"Actor {actor.uid} received KBI")
|
||||
|
|
|
@ -84,6 +84,15 @@ class Portal:
|
|||
] = None
|
||||
self._streams: Set[ReceiveMsgStream] = set()
|
||||
self.actor = current_actor()
|
||||
self._cancel_called: bool = False
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
'''
|
||||
Same principle as ``trio.CancelScope.cancel_called``.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
async def _submit(
|
||||
self,
|
||||
|
@ -129,6 +138,7 @@ class Portal:
|
|||
resptype: str,
|
||||
first_msg: dict
|
||||
) -> Any:
|
||||
# __tracebackhide__ = True
|
||||
assert resptype == 'asyncfunc' # single response
|
||||
|
||||
msg = await recv_chan.receive()
|
||||
|
@ -140,8 +150,11 @@ class Portal:
|
|||
raise unpack_error(msg, self.channel)
|
||||
|
||||
async def result(self) -> Any:
|
||||
"""Return the result(s) from the remote actor's "main" task.
|
||||
"""
|
||||
Return the result(s) from the remote actor's "main" task.
|
||||
|
||||
"""
|
||||
# __tracebackhide__ = True
|
||||
# Check for non-rpc errors slapped on the
|
||||
# channel for which we always raise
|
||||
exc = self.channel._exc
|
||||
|
@ -193,9 +206,16 @@ class Portal:
|
|||
# we'll need to .aclose all those channels here
|
||||
await self._cancel_streams()
|
||||
|
||||
async def cancel_actor(self):
|
||||
"""Cancel the actor on the other end of this portal.
|
||||
"""
|
||||
async def cancel_actor(self) -> None:
|
||||
'''
|
||||
Cancel the actor on the other end of this portal.
|
||||
|
||||
That means cancelling the "actor runtime" not just any one
|
||||
task that's running there.
|
||||
|
||||
'''
|
||||
self._cancel_called = True
|
||||
|
||||
if not self.channel.connected():
|
||||
log.cancel("This portal is already closed can't cancel")
|
||||
return False
|
||||
|
@ -203,8 +223,8 @@ class Portal:
|
|||
await self._cancel_streams()
|
||||
|
||||
log.cancel(
|
||||
f"Sending actor cancel request to {self.channel.uid} on "
|
||||
f"{self.channel}")
|
||||
f"Sending runtime cancel msg to {self.channel.uid} @ "
|
||||
f"{self.channel.raddr}")
|
||||
try:
|
||||
# send cancel cmd - might not get response
|
||||
# XXX: sure would be nice to make this work with a proper shield
|
||||
|
|
|
@ -238,7 +238,7 @@ def run(
|
|||
|
||||
|
||||
def run_daemon(
|
||||
rpc_module_paths: List[str],
|
||||
enable_modules: List[str],
|
||||
**kwargs
|
||||
) -> None:
|
||||
"""Spawn daemon actor which will respond to RPC.
|
||||
|
@ -247,9 +247,9 @@ def run_daemon(
|
|||
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
|
||||
is meant to run forever responding to RPC requests.
|
||||
"""
|
||||
kwargs['rpc_module_paths'] = list(rpc_module_paths)
|
||||
kwargs['enable_modules'] = list(enable_modules)
|
||||
|
||||
for path in rpc_module_paths:
|
||||
for path in enable_modules:
|
||||
importlib.import_module(path)
|
||||
|
||||
return run(partial(trio.sleep, float('inf')), **kwargs)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""
|
||||
Machinery for actor process spawning using multiple backends.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import sys
|
||||
import multiprocessing as mp
|
||||
import platform
|
||||
|
@ -8,7 +9,6 @@ from typing import Any, Dict, Optional
|
|||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from async_generator import asynccontextmanager
|
||||
|
||||
try:
|
||||
from multiprocessing import semaphore_tracker # type: ignore
|
||||
|
@ -31,7 +31,12 @@ from .log import get_logger
|
|||
from ._portal import Portal
|
||||
from ._actor import Actor
|
||||
from ._entry import _mp_main
|
||||
from ._exceptions import ActorFailure
|
||||
from ._exceptions import (
|
||||
ActorFailure,
|
||||
RemoteActorError,
|
||||
ContextCancelled,
|
||||
)
|
||||
from ._debug import maybe_wait_for_debugger, breakpoint
|
||||
|
||||
|
||||
log = get_logger('tractor')
|
||||
|
@ -90,95 +95,173 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
|
|||
return _ctx
|
||||
|
||||
|
||||
async def exhaust_portal(
|
||||
portal: Portal,
|
||||
actor: Actor
|
||||
) -> Any:
|
||||
"""Pull final result from portal (assuming it has one).
|
||||
async def result_from_portal(
|
||||
|
||||
If the main task is an async generator do our best to consume
|
||||
what's left of it.
|
||||
"""
|
||||
try:
|
||||
log.debug(f"Waiting on final result from {actor.uid}")
|
||||
|
||||
# XXX: streams should never be reaped here since they should
|
||||
# always be established and shutdown using a context manager api
|
||||
final = await portal.result()
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
# we reraise in the parent task via a ``trio.MultiError``
|
||||
return err
|
||||
except trio.Cancelled as err:
|
||||
# lol, of course we need this too ;P
|
||||
# TODO: merge with above?
|
||||
log.warning(f"Cancelled result waiter for {portal.actor.uid}")
|
||||
return err
|
||||
else:
|
||||
log.debug(f"Returning final result: {final}")
|
||||
return final
|
||||
|
||||
|
||||
async def cancel_on_completion(
|
||||
portal: Portal,
|
||||
actor: Actor,
|
||||
|
||||
errors: Dict[Tuple[str, str], Exception],
|
||||
cancel_on_result: bool = False,
|
||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
) -> None:
|
||||
"""Cancel actor gracefully once it's "main" portal's
|
||||
|
||||
) -> tuple[Optional[Any], Optional[BaseException]]:
|
||||
"""
|
||||
Cancel actor gracefully once it's "main" portal's
|
||||
result arrives.
|
||||
|
||||
Should only be called for actors spawned with `run_in_actor()`.
|
||||
"""
|
||||
with trio.CancelScope() as cs:
|
||||
|
||||
"""
|
||||
# __tracebackhide__ = True
|
||||
|
||||
uid = portal.channel.uid
|
||||
remote_result = None
|
||||
is_remote_result = None
|
||||
|
||||
# cancel control is explicityl done by the caller
|
||||
with trio.CancelScope() as cs:
|
||||
task_status.started(cs)
|
||||
|
||||
# if this call errors we store the exception for later
|
||||
# in ``errors`` which will be reraised inside
|
||||
# a MultiError and we still send out a cancel request
|
||||
result = await exhaust_portal(portal, actor)
|
||||
if isinstance(result, Exception):
|
||||
errors[actor.uid] = result
|
||||
log.warning(
|
||||
f"Cancelling {portal.channel.uid} after error {result}"
|
||||
)
|
||||
# result = await exhaust_portal(portal, actor)
|
||||
try:
|
||||
log.info(f"Waiting on final result from {actor.uid}")
|
||||
|
||||
else:
|
||||
log.runtime(
|
||||
f"Cancelling {portal.channel.uid} gracefully "
|
||||
f"after result {result}")
|
||||
# XXX: streams should never be reaped here since they should
|
||||
# always be established and shutdown using a context manager api
|
||||
result = await portal.result()
|
||||
is_remote_result = True
|
||||
log.info(f"Returning final result: {result}")
|
||||
|
||||
# cancel the process now that we have a final result
|
||||
await portal.cancel_actor()
|
||||
except RemoteActorError as rerr:
|
||||
# this includes real remote errors as well as
|
||||
# `ContextCancelled`
|
||||
is_remote_result = True
|
||||
result = rerr
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
# we reraise in the parent task via a ``trio.MultiError``
|
||||
is_remote_result = False
|
||||
result = err
|
||||
# errors[actor.uid] = err
|
||||
# raise
|
||||
|
||||
if cs.cancelled_caught:
|
||||
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
|
||||
|
||||
return result, is_remote_result
|
||||
|
||||
# except trio.Cancelled as err:
|
||||
# # lol, of course we need this too ;P
|
||||
# # TODO: merge with above?
|
||||
# log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
|
||||
# result = err
|
||||
# # errors[actor.uid] = err
|
||||
# raise
|
||||
|
||||
|
||||
# return result
|
||||
|
||||
|
||||
async def do_hard_kill(
|
||||
|
||||
proc: trio.Process,
|
||||
timeout: float,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Hard kill a process with timeout.
|
||||
|
||||
'''
|
||||
log.debug(f"Hard killing {proc}")
|
||||
# NOTE: this timeout used to do nothing since we were shielding
|
||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||
# never release until the process exits, now it acts as
|
||||
# a hard-kill time ultimatum.
|
||||
with trio.move_on_after(3) as cs:
|
||||
with trio.move_on_after(timeout) as cs:
|
||||
|
||||
# NOTE: This ``__aexit__()`` shields internally.
|
||||
async with proc: # calls ``trio.Process.aclose()``
|
||||
# NOTE: This ``__aexit__()`` shields internally and originally
|
||||
# would tear down stdstreams via ``trio.Process.aclose()``.
|
||||
async with proc:
|
||||
log.debug(f"Terminating {proc}")
|
||||
|
||||
if cs.cancelled_caught:
|
||||
|
||||
# this is a "softer" kill that we should probably use
|
||||
# eventually and let the zombie lord do the `.kill()`
|
||||
# proc.terminate()
|
||||
|
||||
# XXX: should pretty much never get here unless we have
|
||||
# to move the bits from ``proc.__aexit__()`` out and
|
||||
# into here.
|
||||
log.critical(f"HARD KILLING {proc}")
|
||||
log.critical(f"{timeout} timeout, HARD KILLING {proc}")
|
||||
proc.kill()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def spawn_subactor(
|
||||
subactor: 'Actor',
|
||||
async def reap_proc(
|
||||
|
||||
proc: trio.Process,
|
||||
uid: tuple[str, str],
|
||||
terminate_after: Optional[float] = None,
|
||||
hard_kill_after: int = 0.1,
|
||||
|
||||
) -> None:
|
||||
with trio.move_on_after(terminate_after or float('inf')) as cs:
|
||||
# Wait for proc termination but **dont' yet** do
|
||||
# any out-of-ipc-land termination / process
|
||||
# killing. This is a "light" (cancellable) join,
|
||||
# the hard join is below after timeout
|
||||
await proc.wait()
|
||||
log.info(f'Proc for {uid} terminated gracefully')
|
||||
|
||||
if cs.cancelled_caught and terminate_after is not float('inf'):
|
||||
# Always "hard" join lingering sub procs since no
|
||||
# actor zombies are allowed!
|
||||
log.warning(
|
||||
# f'Failed to gracefully terminate {subactor.uid}')
|
||||
f'Failed to gracefully terminate {proc}\n'
|
||||
f"Attempting to hard kill {proc}")
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
# XXX: do this **after**
|
||||
# cancellation/tearfown to avoid killing the
|
||||
# process too early since trio does this
|
||||
# internally on ``__aexit__()``
|
||||
await do_hard_kill(proc, hard_kill_after)
|
||||
|
||||
|
||||
async def new_proc(
|
||||
|
||||
name: str,
|
||||
actor_nursery: 'ActorNursery', # type: ignore # noqa
|
||||
subactor: Actor,
|
||||
errors: Dict[Tuple[str, str], Exception],
|
||||
|
||||
# passed through to actor main
|
||||
bind_addr: Tuple[str, int],
|
||||
parent_addr: Tuple[str, int],
|
||||
):
|
||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||
|
||||
*,
|
||||
|
||||
graceful_kill_timeout: int = 3,
|
||||
infect_asyncio: bool = False,
|
||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||
|
||||
) -> None:
|
||||
"""
|
||||
Create a new ``multiprocessing.Process`` using the
|
||||
spawn method as configured using ``try_set_start_method()``.
|
||||
|
||||
"""
|
||||
# mark the new actor with the global spawn method
|
||||
subactor._spawn_method = _spawn_method
|
||||
uid = subactor.uid
|
||||
|
||||
if _spawn_method == 'trio':
|
||||
|
||||
spawn_cmd = [
|
||||
sys.executable,
|
||||
"-m",
|
||||
|
@ -190,7 +273,7 @@ async def spawn_subactor(
|
|||
# the OS; it otherwise can be passed via the parent channel if
|
||||
# we prefer in the future (for privacy).
|
||||
"--uid",
|
||||
str(subactor.uid),
|
||||
str(uid),
|
||||
# Address the child must connect to on startup
|
||||
"--parent_addr",
|
||||
str(parent_addr)
|
||||
|
@ -202,57 +285,46 @@ async def spawn_subactor(
|
|||
subactor.loglevel
|
||||
]
|
||||
|
||||
# Tell child to run in guest mode on top of ``asyncio`` loop
|
||||
if infect_asyncio:
|
||||
spawn_cmd.append("--asyncio")
|
||||
|
||||
proc = await trio.open_process(spawn_cmd)
|
||||
|
||||
log.info(f"Started {proc}")
|
||||
portal: Optional[Portal] = None
|
||||
|
||||
# handle cancellation during child connect-back, kill
|
||||
# any cancelled spawn sequence immediately.
|
||||
try:
|
||||
yield proc
|
||||
|
||||
finally:
|
||||
log.runtime(f"Attempting to kill {proc}")
|
||||
|
||||
# XXX: do this **after** cancellation/tearfown
|
||||
# to avoid killing the process too early
|
||||
# since trio does this internally on ``__aexit__()``
|
||||
|
||||
await do_hard_kill(proc)
|
||||
|
||||
|
||||
async def new_proc(
|
||||
name: str,
|
||||
actor_nursery: 'ActorNursery', # type: ignore # noqa
|
||||
subactor: Actor,
|
||||
errors: Dict[Tuple[str, str], Exception],
|
||||
# passed through to actor main
|
||||
bind_addr: Tuple[str, int],
|
||||
parent_addr: Tuple[str, int],
|
||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||
*,
|
||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||
) -> None:
|
||||
"""Create a new ``multiprocessing.Process`` using the
|
||||
spawn method as configured using ``try_set_start_method()``.
|
||||
"""
|
||||
cancel_scope = None
|
||||
|
||||
# mark the new actor with the global spawn method
|
||||
subactor._spawn_method = _spawn_method
|
||||
|
||||
if _spawn_method == 'trio':
|
||||
async with trio.open_nursery() as nursery:
|
||||
async with spawn_subactor(
|
||||
subactor,
|
||||
parent_addr,
|
||||
) as proc:
|
||||
log.runtime(f"Started {proc}")
|
||||
|
||||
# wait for actor to spawn and connect back to us
|
||||
# channel should have handshake completed by the
|
||||
# local actor by the time we get a ref to it
|
||||
event, chan = await actor_nursery._actor.wait_for_peer(
|
||||
subactor.uid)
|
||||
|
||||
except trio.Cancelled:
|
||||
# reap un-contacted process which are started
|
||||
# but never setup a connection to parent.
|
||||
log.warning(f'Spawning aborted due to cancel {proc}')
|
||||
with trio.CancelScope(shield=True):
|
||||
await do_hard_kill(proc, 0.1)
|
||||
|
||||
# TODO: should we have a custom error for this maybe derived
|
||||
# from ``subprocess``?
|
||||
raise
|
||||
|
||||
# the child successfully connected back to us.
|
||||
actor_nursery_cancel_called = None
|
||||
portal = Portal(chan)
|
||||
actor_nursery._children[subactor.uid] = (
|
||||
subactor, proc, portal)
|
||||
|
||||
# track child in current nursery
|
||||
curr_actor = current_actor()
|
||||
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
||||
|
||||
try:
|
||||
# send additional init params
|
||||
await chan.send({
|
||||
"_parent_main_data": subactor._parent_main_data,
|
||||
|
@ -263,54 +335,266 @@ async def new_proc(
|
|||
"_runtime_vars": _runtime_vars,
|
||||
})
|
||||
|
||||
# track subactor in current nursery
|
||||
curr_actor = current_actor()
|
||||
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
||||
|
||||
# resume caller at next checkpoint now that child is up
|
||||
task_status.started(portal)
|
||||
|
||||
# wait for ActorNursery.wait() to be called
|
||||
# this either completes or is cancelled and should only
|
||||
# **and always** be set once the actor nursery has errored
|
||||
# or exitted.
|
||||
with trio.CancelScope(shield=True):
|
||||
await actor_nursery._join_procs.wait()
|
||||
|
||||
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||
cancel_scope = await nursery.start(
|
||||
cancel_on_completion,
|
||||
except (
|
||||
BaseException
|
||||
# trio.Cancelled,
|
||||
# KeyboardInterrupt,
|
||||
# trio.MultiError,
|
||||
# RuntimeError,
|
||||
) as cerr:
|
||||
|
||||
log.exception(f'Relaying unexpected {cerr} to nursery')
|
||||
await breakpoint()
|
||||
|
||||
# sending IPC-msg level cancel requests is expected to be
|
||||
# managed by the nursery.
|
||||
with trio.CancelScope(shield=True):
|
||||
await actor_nursery._handle_err(err, portal=portal)
|
||||
|
||||
if portal.channel.connected():
|
||||
if ria:
|
||||
# this may raise which we want right?
|
||||
await result_from_portal(
|
||||
portal,
|
||||
subactor,
|
||||
errors
|
||||
errors,
|
||||
# True, # cancel_on_result
|
||||
)
|
||||
|
||||
# Wait for proc termination but **dont' yet** call
|
||||
# ``trio.Process.__aexit__()`` (it tears down stdio
|
||||
# which will kill any waiting remote pdb trace).
|
||||
# Graceful reap attempt - 2 cases:
|
||||
# - actor nursery was cancelled in which case
|
||||
# we want to try a soft reap of the actor via
|
||||
# ipc cancellation and then failing that do a hard
|
||||
# reap.
|
||||
# - this is normal termination and we must wait indefinitely
|
||||
# for ria to return and daemon actors to be cancelled
|
||||
reaping_cancelled: bool = False
|
||||
ria = portal in actor_nursery._cancel_after_result_on_exit
|
||||
result = None
|
||||
|
||||
# TODO: No idea how we can enforce zombie
|
||||
# reaping more stringently without the shield
|
||||
# we used to have below...
|
||||
# this is the soft reap sequence. we can
|
||||
# either collect results:
|
||||
# - ria actors get them them via ``Portal.result()``
|
||||
# - we wait forever on daemon actors until they're
|
||||
# cancelled by user code via ``Portal.cancel_actor()``
|
||||
# or ``ActorNursery.cancel(). in the latter case
|
||||
# we have to expect another cancel here since
|
||||
# the task spawning nurseries will both be cacelled
|
||||
# by ``ActorNursery.cancel()``.
|
||||
|
||||
# with trio.CancelScope(shield=True):
|
||||
# async with proc:
|
||||
# OR, we're cancelled while collecting results, which
|
||||
# case we need to try another soft cancel and reap attempt.
|
||||
try:
|
||||
log.cancel(f'Starting soft actor reap for {uid}')
|
||||
cancel_scope = None
|
||||
reap_timeout = None
|
||||
|
||||
# Always "hard" join sub procs since no actor zombies
|
||||
# are allowed!
|
||||
if portal.channel.connected() and ria:
|
||||
|
||||
# this is a "light" (cancellable) join, the hard join is
|
||||
# in the enclosing scope (see above).
|
||||
await proc.wait()
|
||||
result, is_remote = await result_from_portal(
|
||||
portal,
|
||||
subactor,
|
||||
errors,
|
||||
# True, # cancel_on_result
|
||||
)
|
||||
if is_remote:
|
||||
if isinstance(result, RemoteActorError):
|
||||
# errors[actor.uid] = result
|
||||
if (
|
||||
portal.cancel_called and
|
||||
isinstance(result, ContextCancelled)
|
||||
):
|
||||
log.cancel(f'{uid} received expected cancel')
|
||||
errors[uid] = result
|
||||
|
||||
log.debug(f"Joined {proc}")
|
||||
# pop child entry to indicate we no longer managing this subactor
|
||||
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
|
||||
# fall through to below soft proc reap
|
||||
reap_timeout = 0.5
|
||||
|
||||
# cancel result waiter that may have been spawned in
|
||||
# tandem if not done already
|
||||
if cancel_scope:
|
||||
else:
|
||||
log.warning(
|
||||
"Cancelling existing result waiter task for "
|
||||
f"{subactor.uid}")
|
||||
cancel_scope.cancel()
|
||||
f"Cancelling single-task-run {uid} after remote error {result}"
|
||||
)
|
||||
|
||||
# likely a real remote error propagation
|
||||
# so pass up to nursery strat
|
||||
should_raise = await actor_nursery._handle_err(
|
||||
result,
|
||||
portal=portal,
|
||||
)
|
||||
|
||||
# propagate up to spawn nursery to be
|
||||
# grouped into any multierror.
|
||||
# if should_raise:
|
||||
# raise result
|
||||
|
||||
else:
|
||||
log.runtime(
|
||||
f"Cancelling {uid} gracefully "
|
||||
f"after one-time-task result {result}")
|
||||
|
||||
# an actor that was `.run_in_actor()` executes a single task
|
||||
# and delivers the result, then we cancel it.
|
||||
# TODO: likely in the future we should just implement this using
|
||||
# the new `open_context()` IPC api, since it's the more general
|
||||
# api and can represent this form.
|
||||
# XXX: do we need this?
|
||||
# await maybe_wait_for_debugger()
|
||||
await portal.cancel_actor()
|
||||
|
||||
else:
|
||||
log.exception(
|
||||
f"Cancelling single-task-run {uid} after local error"
|
||||
)
|
||||
raise result
|
||||
|
||||
# soft & cancellable
|
||||
await reap_proc(proc, uid, terminate_after=reap_timeout)
|
||||
|
||||
# except (
|
||||
# ContextCancelled,
|
||||
# ) as err:
|
||||
# if portal.cancel_called:
|
||||
# log.cancel('{uid} received expected cancel')
|
||||
|
||||
# # soft & cancellable
|
||||
# await reap_proc(proc, uid, terminate_after=0.1)
|
||||
|
||||
# except (
|
||||
# RemoteActorError,
|
||||
# ) as err:
|
||||
# reaping_cancelled = err
|
||||
# log.exception(f'{uid} remote error')
|
||||
# await actor_nursery._handle_err(err, portal=portal)
|
||||
|
||||
except (
|
||||
trio.Cancelled,
|
||||
) as err:
|
||||
|
||||
# NOTE: for now we pack the cancelleds and expect the actor
|
||||
# nursery to re-raise them in a multierror but we could
|
||||
# have also let them bubble up through the spawn nursery.
|
||||
|
||||
# in theory it's more correct to raise any
|
||||
# `ContextCancelled` errors we get back from the
|
||||
# `Portal.cancel_actor()` call and in that error
|
||||
# have meta-data about whether we timeout out or
|
||||
# actually got a cancel message back from the remote task.
|
||||
|
||||
# IF INSTEAD we raised *here* then this logic has to be
|
||||
# handled inside the oca supervisor block and the spawn_n
|
||||
# task cancelleds would have to be replaced with the remote
|
||||
# task `ContextCancelled`s, *if* they ever arrive.
|
||||
errors[uid] = err
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await breakpoint()
|
||||
|
||||
if actor_nursery.cancel_called:
|
||||
log.cancel(f'{uid} soft reap cancelled by nursery')
|
||||
else:
|
||||
if not actor_nursery._spawn_n.cancel_scope.cancel_called:
|
||||
# this would be pretty weird and unexpected
|
||||
await breakpoint()
|
||||
|
||||
# actor nursery wasn't cancelled before the spawn
|
||||
# nursery was which likely means that there was
|
||||
# an error in the actor nursery enter and the
|
||||
# spawn nursery cancellation "beat" the call to
|
||||
# .cancel()? that's a bug right?
|
||||
|
||||
# saw this with settings bugs in the ordermode pane in
|
||||
# piker.
|
||||
log.exception(f'{uid} soft wait error?')
|
||||
raise RuntimeError(
|
||||
'Task spawn nursery cancelled before actor nursery?')
|
||||
|
||||
finally:
|
||||
if reaping_cancelled:
|
||||
assert actor_nursery.cancel_called
|
||||
if actor_nursery.cancelled:
|
||||
log.cancel(f'Nursery cancelled during soft wait for {uid}')
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
await maybe_wait_for_debugger()
|
||||
|
||||
# XXX: we should probably just
|
||||
# check for a `ContextCancelled` on portals
|
||||
# here and fill them in over `trio.Cancelled` right?
|
||||
|
||||
# hard reap sequence with timeouts
|
||||
if proc.poll() is None:
|
||||
log.cancel(f'Attempting hard reap for {uid}')
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
|
||||
# hard reap sequence
|
||||
# ``Portal.cancel_actor()`` is expected to have
|
||||
# been called by the supervising nursery so we
|
||||
# do **not** call it here.
|
||||
|
||||
await reap_proc(
|
||||
proc,
|
||||
uid,
|
||||
# this is the same as previous timeout
|
||||
# setting before rewriting this spawn
|
||||
# section
|
||||
terminate_after=3,
|
||||
)
|
||||
|
||||
|
||||
# if somehow the hard reap didn't collect the child then
|
||||
# we send in the big gunz.
|
||||
while proc.poll() is None:
|
||||
log.critical(
|
||||
f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n'
|
||||
f'{proc}'
|
||||
)
|
||||
with trio.CancelScope(shield=True):
|
||||
await reap_proc(
|
||||
proc,
|
||||
uid,
|
||||
terminate_after=0.1,
|
||||
)
|
||||
|
||||
log.info(f"Joined {proc}")
|
||||
|
||||
# 2 cases:
|
||||
# - the actor terminated gracefully
|
||||
# - we're cancelled and likely need to re-raise
|
||||
|
||||
# pop child entry to indicate we no longer managing this
|
||||
# subactor
|
||||
subactor, proc, portal = actor_nursery._children.pop(
|
||||
subactor.uid)
|
||||
|
||||
if not actor_nursery._children:
|
||||
# all subactor children have completed
|
||||
log.cancel(f"{uid} reports all children complete!")
|
||||
|
||||
actor_nursery._all_children_reaped.set()
|
||||
|
||||
spawn_n = actor_nursery._spawn_n
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await breakpoint()
|
||||
if not spawn_n._closed:
|
||||
# the parent task that opened the actor nursery
|
||||
# hasn't yet closed it so we cancel that task now.
|
||||
spawn_n.cancel_scope.cancel()
|
||||
|
||||
# not entirely sure why we need this.. but without it
|
||||
# the reaping cancelled error is never reported upwards
|
||||
# to the spawn nursery?
|
||||
# if reaping_cancelled:
|
||||
# raise reaping_cancelled
|
||||
|
||||
else:
|
||||
# `multiprocessing`
|
||||
# async with trio.open_nursery() as nursery:
|
||||
|
@ -323,6 +607,7 @@ async def new_proc(
|
|||
bind_addr=bind_addr,
|
||||
parent_addr=parent_addr,
|
||||
_runtime_vars=_runtime_vars,
|
||||
infect_asyncio=infect_asyncio,
|
||||
task_status=task_status,
|
||||
)
|
||||
|
||||
|
@ -338,6 +623,7 @@ async def mp_new_proc(
|
|||
parent_addr: Tuple[str, int],
|
||||
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||
*,
|
||||
infect_asyncio: bool = False,
|
||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||
|
||||
) -> None:
|
||||
|
@ -383,6 +669,7 @@ async def mp_new_proc(
|
|||
fs_info,
|
||||
start_method,
|
||||
parent_addr,
|
||||
infect_asyncio,
|
||||
),
|
||||
# daemon=True,
|
||||
name=name,
|
||||
|
@ -420,6 +707,10 @@ async def mp_new_proc(
|
|||
# while user code is still doing it's thing. Only after the
|
||||
# nursery block closes do we allow subactor results to be
|
||||
# awaited and reported upwards to the supervisor.
|
||||
|
||||
# no shield is required here (vs. above on the trio backend)
|
||||
# since debug mode is not supported on mp.
|
||||
with trio.CancelScope(shield=True):
|
||||
await actor_nursery._join_procs.wait()
|
||||
|
||||
finally:
|
||||
|
@ -435,13 +726,23 @@ async def mp_new_proc(
|
|||
try:
|
||||
# async with trio.open_nursery() as n:
|
||||
# n.cancel_scope.shield = True
|
||||
cancel_scope = await nursery.start(
|
||||
cancel_on_completion,
|
||||
print('soft mp reap')
|
||||
# cancel_scope = await nursery.start(
|
||||
result = await result_from_portal(
|
||||
portal,
|
||||
subactor,
|
||||
errors
|
||||
errors,
|
||||
# True,
|
||||
)
|
||||
except trio.Cancelled as err:
|
||||
|
||||
# except trio.Cancelled as err:
|
||||
except BaseException as err:
|
||||
|
||||
log.exception('hard mp reap')
|
||||
with trio.CancelScope(shield=True):
|
||||
await actor_nursery._handle_err(err, portal=portal)
|
||||
print('sent to nursery')
|
||||
|
||||
cancel_exc = err
|
||||
|
||||
# if the reaping task was cancelled we may have hit
|
||||
|
@ -451,23 +752,34 @@ async def mp_new_proc(
|
|||
reaping_cancelled = True
|
||||
|
||||
if proc.is_alive():
|
||||
with trio.CancelScope(shield=True):
|
||||
print('hard reaping')
|
||||
with trio.move_on_after(0.1) as cs:
|
||||
cs.shield = True
|
||||
await proc_waiter(proc)
|
||||
|
||||
if cs.cancelled_caught:
|
||||
print('pwning mp proc')
|
||||
proc.terminate()
|
||||
finally:
|
||||
|
||||
if not reaping_cancelled and proc.is_alive():
|
||||
await proc_waiter(proc)
|
||||
# if not reaping_cancelled and proc.is_alive():
|
||||
# await proc_waiter(proc)
|
||||
|
||||
# TODO: timeout block here?
|
||||
proc.join()
|
||||
|
||||
log.debug(f"Joined {proc}")
|
||||
|
||||
# pop child entry to indicate we are no longer managing subactor
|
||||
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
|
||||
|
||||
if not actor_nursery._children:
|
||||
# all subactor children have completed
|
||||
# log.cancel(f"{uid} reports all children complete!")
|
||||
actor_nursery._all_children_reaped.set()
|
||||
|
||||
|
||||
# cancel result waiter that may have been spawned in
|
||||
# tandem if not done already
|
||||
if cancel_scope:
|
||||
|
@ -476,6 +788,7 @@ async def mp_new_proc(
|
|||
f"{subactor.uid}")
|
||||
cancel_scope.cancel()
|
||||
|
||||
elif reaping_cancelled: # let the cancellation bubble up
|
||||
if reaping_cancelled: # let the cancellation bubble up
|
||||
print('raising')
|
||||
assert cancel_exc
|
||||
raise cancel_exc
|
||||
|
|
|
@ -19,8 +19,8 @@ import trio
|
|||
from ._ipc import Channel
|
||||
from ._exceptions import unpack_error, ContextCancelled
|
||||
from ._state import current_actor
|
||||
from ._broadcast import broadcast_receiver, BroadcastReceiver
|
||||
from .log import get_logger
|
||||
from .trionics import broadcast_receiver, BroadcastReceiver
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
|
|
@ -0,0 +1,457 @@
|
|||
"""
|
||||
``trio`` inspired apis and helpers
|
||||
"""
|
||||
from functools import partial
|
||||
import inspect
|
||||
import multiprocessing as mp
|
||||
from typing import Tuple, List, Dict, Optional
|
||||
import typing
|
||||
import warnings
|
||||
|
||||
import trio
|
||||
from async_generator import asynccontextmanager
|
||||
|
||||
from . import _debug
|
||||
from ._debug import maybe_wait_for_debugger, breakpoint
|
||||
from ._state import current_actor, is_main_process, is_root_process
|
||||
from .log import get_logger, get_loglevel
|
||||
from ._actor import Actor
|
||||
from ._portal import Portal
|
||||
from ._exceptions import is_multi_cancelled
|
||||
from ._root import open_root_actor
|
||||
from . import _state
|
||||
from . import _spawn
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
|
||||
|
||||
|
||||
class ActorNursery:
|
||||
"""Spawn scoped subprocess actors.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
actor: Actor,
|
||||
spawn_nursery: trio.Nursery,
|
||||
errors: Dict[Tuple[str, str], Exception],
|
||||
) -> None:
|
||||
# self.supervisor = supervisor # TODO
|
||||
self._actor: Actor = actor
|
||||
self._spawn_n = spawn_nursery
|
||||
self._children: Dict[
|
||||
Tuple[str, str],
|
||||
Tuple[Actor, mp.Process, Optional[Portal]]
|
||||
] = {}
|
||||
# portals spawned with ``run_in_actor()`` are
|
||||
# cancelled when their "main" result arrives
|
||||
self._cancel_after_result_on_exit: set = set()
|
||||
self.cancelled: bool = False
|
||||
self._cancel_called: bool = False
|
||||
self._join_procs = trio.Event()
|
||||
self._all_children_reaped = trio.Event()
|
||||
self.errors = errors
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
'''
|
||||
Same principle as ``trio.CancelScope.cancel_called``.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
async def start_actor(
|
||||
self,
|
||||
name: str,
|
||||
*,
|
||||
bind_addr: Tuple[str, int] = _default_bind_addr,
|
||||
rpc_module_paths: List[str] = None,
|
||||
enable_modules: List[str] = None,
|
||||
loglevel: str = None, # set log level per subactor
|
||||
nursery: trio.Nursery = None,
|
||||
infect_asyncio: bool = False,
|
||||
debug_mode: Optional[bool] = None,
|
||||
) -> Portal:
|
||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||
|
||||
# configure and pass runtime state
|
||||
_rtv = _state._runtime_vars.copy()
|
||||
_rtv['_is_root'] = False
|
||||
|
||||
# allow setting debug policy per actor
|
||||
if debug_mode is not None:
|
||||
_rtv['_debug_mode'] = debug_mode
|
||||
|
||||
enable_modules = enable_modules or []
|
||||
|
||||
if rpc_module_paths:
|
||||
warnings.warn(
|
||||
"`rpc_module_paths` is now deprecated, use "
|
||||
" `enable_modules` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
enable_modules.extend(rpc_module_paths)
|
||||
|
||||
subactor = Actor(
|
||||
name,
|
||||
# modules allowed to invoked funcs from
|
||||
enable_modules=enable_modules,
|
||||
loglevel=loglevel,
|
||||
arbiter_addr=current_actor()._arb_addr,
|
||||
)
|
||||
parent_addr = self._actor.accept_addr
|
||||
assert parent_addr
|
||||
|
||||
# start a task to spawn a process
|
||||
# blocks until process has been started and a portal setup
|
||||
nursery = nursery or self._spawn_n
|
||||
|
||||
# XXX: the type ignore is actually due to a `mypy` bug
|
||||
return await nursery.start( # type: ignore
|
||||
partial(
|
||||
_spawn.new_proc,
|
||||
name,
|
||||
self,
|
||||
subactor,
|
||||
self.errors,
|
||||
bind_addr,
|
||||
parent_addr,
|
||||
_rtv, # run time vars
|
||||
infect_asyncio=infect_asyncio,
|
||||
)
|
||||
)
|
||||
|
||||
async def run_in_actor(
|
||||
self,
|
||||
fn: typing.Callable,
|
||||
*,
|
||||
name: Optional[str] = None,
|
||||
bind_addr: Tuple[str, int] = _default_bind_addr,
|
||||
rpc_module_paths: Optional[List[str]] = None,
|
||||
enable_modules: List[str] = None,
|
||||
loglevel: str = None, # set log level per subactor
|
||||
infect_asyncio: bool = False,
|
||||
**kwargs, # explicit args to ``fn``
|
||||
) -> Portal:
|
||||
"""Spawn a new actor, run a lone task, then terminate the actor and
|
||||
return its result.
|
||||
|
||||
Actors spawned using this method are kept alive at nursery teardown
|
||||
until the task spawned by executing ``fn`` completes at which point
|
||||
the actor is terminated.
|
||||
"""
|
||||
mod_path = fn.__module__
|
||||
|
||||
if name is None:
|
||||
# use the explicit function name if not provided
|
||||
name = fn.__name__
|
||||
|
||||
portal = await self.start_actor(
|
||||
name,
|
||||
enable_modules=[mod_path] + (
|
||||
enable_modules or rpc_module_paths or []
|
||||
),
|
||||
bind_addr=bind_addr,
|
||||
loglevel=loglevel,
|
||||
# use the run_in_actor nursery
|
||||
nursery=self._spawn_n,
|
||||
infect_asyncio=infect_asyncio,
|
||||
)
|
||||
|
||||
# XXX: don't allow stream funcs
|
||||
if not (
|
||||
inspect.iscoroutinefunction(fn) and
|
||||
not getattr(fn, '_tractor_stream_function', False)
|
||||
):
|
||||
raise TypeError(f'{fn} must be an async function!')
|
||||
|
||||
# this marks the actor to be cancelled after its portal result
|
||||
# is retreived, see logic in `open_nursery()` below.
|
||||
self._cancel_after_result_on_exit.add(portal)
|
||||
await portal._submit_for_result(
|
||||
mod_path,
|
||||
fn.__name__,
|
||||
**kwargs
|
||||
)
|
||||
return portal
|
||||
|
||||
async def cancel(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Cancel this nursery by instructing each subactor to cancel
|
||||
itself and wait for all subactors to terminate.
|
||||
|
||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
||||
directly without any far end graceful ``trio`` cancellation.
|
||||
|
||||
"""
|
||||
# entries may be poppsed by the spawning backend as
|
||||
# actors cancel individually
|
||||
childs = self._children.copy()
|
||||
|
||||
if self.cancel_called:
|
||||
log.warning(
|
||||
f'Nursery with children {len(childs)} already cancelled')
|
||||
return
|
||||
|
||||
log.cancel(
|
||||
f'Cancelling nursery in {self._actor.uid} with children\n'
|
||||
f'{childs.keys()}'
|
||||
)
|
||||
self._cancel_called = True
|
||||
|
||||
# wake up all spawn tasks to move on as those nursery
|
||||
# has ``__aexit__()``-ed
|
||||
self._join_procs.set()
|
||||
|
||||
await maybe_wait_for_debugger()
|
||||
|
||||
# one-cancels-all strat
|
||||
try:
|
||||
async with trio.open_nursery() as cancel_sender:
|
||||
for subactor, proc, portal in childs.values():
|
||||
if not portal.cancel_called and portal.channel.connected():
|
||||
cancel_sender.start_soon(portal.cancel_actor)
|
||||
|
||||
except trio.MultiError as err:
|
||||
_err = err
|
||||
log.exception(f'{self} errors during cancel')
|
||||
# await breakpoint()
|
||||
# # LOL, ok so multiprocessing requires this for some reason..
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await trio.lowlevel.checkpoint()
|
||||
|
||||
# cancel all spawner tasks
|
||||
# self._spawn_n.cancel_scope.cancel()
|
||||
self.cancelled = True
|
||||
|
||||
async def _handle_err(
|
||||
self,
|
||||
err: BaseException,
|
||||
portal: Optional[Portal] = None,
|
||||
is_ctx_error: bool = False,
|
||||
|
||||
) -> bool:
|
||||
# XXX: hypothetically an error could be
|
||||
# raised and then a cancel signal shows up
|
||||
# slightly after in which case the `else:`
|
||||
# block here might not complete? For now,
|
||||
# shield both.
|
||||
if is_ctx_error:
|
||||
assert not portal
|
||||
uid = self._actor.uid
|
||||
else:
|
||||
uid = portal.channel.uid
|
||||
|
||||
if err not in self.errors.values():
|
||||
self.errors[uid] = err
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
etype = type(err)
|
||||
|
||||
if etype in (
|
||||
trio.Cancelled,
|
||||
KeyboardInterrupt
|
||||
) or (
|
||||
is_multi_cancelled(err)
|
||||
):
|
||||
log.cancel(
|
||||
f"Nursery for {current_actor().uid} "
|
||||
f"was cancelled with {etype}")
|
||||
else:
|
||||
log.error(
|
||||
f"Nursery for {current_actor().uid} "
|
||||
f"errored from {uid} with\n{err}")
|
||||
|
||||
# cancel all subactors
|
||||
await self.cancel()
|
||||
|
||||
return True
|
||||
|
||||
log.warning(f'Skipping duplicate error for {uid}')
|
||||
return False
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _open_and_supervise_one_cancels_all_nursery(
|
||||
actor: Actor,
|
||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||
|
||||
# the collection of errors retreived from spawned sub-actors
|
||||
errors: Dict[Tuple[str, str], Exception] = {}
|
||||
|
||||
# This is the outermost level "deamon actor" nursery. It is awaited
|
||||
# **after** the below inner "run in actor nursery". This allows for
|
||||
# handling errors that are generated by the inner nursery in
|
||||
# a supervisor strategy **before** blocking indefinitely to wait for
|
||||
# actors spawned in "daemon mode" (aka started using
|
||||
# ``ActorNursery.start_actor()``).
|
||||
src_err: Optional[BaseException] = None
|
||||
nurse_err: Optional[BaseException] = None
|
||||
|
||||
# errors from this daemon actor nursery bubble up to caller
|
||||
try:
|
||||
async with trio.open_nursery() as spawn_n:
|
||||
# try:
|
||||
|
||||
# This is the inner level "run in actor" nursery. It is
|
||||
# awaited first since actors spawned in this way (using
|
||||
# ``ActorNusery.run_in_actor()``) are expected to only
|
||||
# return a single result and then complete (i.e. be canclled
|
||||
# gracefully). Errors collected from these actors are
|
||||
# immediately raised for handling by a supervisor strategy.
|
||||
# As such if the strategy propagates any error(s) upwards
|
||||
# the above "daemon actor" nursery will be notified.
|
||||
|
||||
anursery = ActorNursery(
|
||||
actor,
|
||||
spawn_n,
|
||||
errors
|
||||
)
|
||||
# spawning of actors happens in the caller's scope
|
||||
# after we yield upwards
|
||||
try:
|
||||
yield anursery
|
||||
|
||||
log.runtime(
|
||||
f"Waiting on subactors {anursery._children} "
|
||||
"to complete"
|
||||
)
|
||||
|
||||
# signal all process monitor tasks to conduct
|
||||
# hard join phase.
|
||||
# await maybe_wait_for_debugger()
|
||||
# log.error('joing trigger NORMAL')
|
||||
anursery._join_procs.set()
|
||||
|
||||
# NOTE: there are 2 cases for error propagation:
|
||||
# - an actor which is ``.run_in_actor()`` invoked
|
||||
# runs a single task and reports the error upwards
|
||||
# - the top level task which opened this nursery (in the
|
||||
# parent actor) raises. In this case the raise can come
|
||||
# from a variety of places:
|
||||
# - user task code unrelated to the nursery/child actors
|
||||
# - a ``RemoteActorError`` propagated up through the
|
||||
# portal api from a child actor which will look the exact
|
||||
# same as a user code failure.
|
||||
|
||||
except BaseException as err:
|
||||
# anursery._join_procs.set()
|
||||
src_err = err
|
||||
|
||||
# with trio.CancelScope(shield=True):
|
||||
should_raise = await anursery._handle_err(err, is_ctx_error=True)
|
||||
|
||||
# XXX: raising here causes some cancellation
|
||||
# / multierror tests to fail because of what appears to
|
||||
# be double raise? we probably need to see how `trio`
|
||||
# does this case..
|
||||
if should_raise:
|
||||
raise
|
||||
|
||||
# except trio.MultiError as err:
|
||||
except BaseException as err:
|
||||
# nursery bubble up
|
||||
nurse_err = err
|
||||
|
||||
# do not double cancel subactors
|
||||
if not anursery.cancelled:
|
||||
await anursery._handle_err(err)
|
||||
|
||||
raise
|
||||
|
||||
finally:
|
||||
if anursery._children:
|
||||
log.cancel(f'Waiting on remaining children {anursery._children}')
|
||||
with trio.CancelScope(shield=True):
|
||||
await anursery._all_children_reaped.wait()
|
||||
|
||||
log.cancel(f'All children complete for {anursery}')
|
||||
|
||||
# No errors were raised while awaiting ".run_in_actor()"
|
||||
# actors but those actors may have returned remote errors as
|
||||
# results (meaning they errored remotely and have relayed
|
||||
# those errors back to this parent actor). The errors are
|
||||
# collected in ``errors`` so cancel all actors, summarize
|
||||
# all errors and re-raise.
|
||||
|
||||
# await breakpoint()
|
||||
if errors:
|
||||
# if nurse_err or src_err:
|
||||
if anursery._children:
|
||||
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await anursery.cancel()
|
||||
|
||||
|
||||
# use `MultiError` as needed
|
||||
if len(errors) > 1:
|
||||
raise trio.MultiError(tuple(errors.values()))
|
||||
else:
|
||||
raise list(errors.values())[0]
|
||||
|
||||
log.cancel(f'{anursery} terminated gracefully')
|
||||
|
||||
# XXX" honestly no idea why this is needed but sure..
|
||||
if isinstance(src_err, KeyboardInterrupt) and anursery.cancelled:
|
||||
raise src_err
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_nursery(
|
||||
**kwargs,
|
||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||
"""
|
||||
Create and yield a new ``ActorNursery`` to be used for spawning
|
||||
structured concurrent subactors.
|
||||
|
||||
When an actor is spawned a new trio task is started which
|
||||
invokes one of the process spawning backends to create and start
|
||||
a new subprocess. These tasks are started by one of two nurseries
|
||||
detailed below. The reason for spawning processes from within
|
||||
a new task is because ``trio_run_in_process`` itself creates a new
|
||||
internal nursery and the same task that opens a nursery **must**
|
||||
close it. It turns out this approach is probably more correct
|
||||
anyway since it is more clear from the following nested nurseries
|
||||
which cancellation scopes correspond to each spawned subactor set.
|
||||
|
||||
"""
|
||||
implicit_runtime = False
|
||||
|
||||
actor = current_actor(err_on_no_runtime=False)
|
||||
|
||||
try:
|
||||
if actor is None and is_main_process():
|
||||
|
||||
# if we are the parent process start the
|
||||
# actor runtime implicitly
|
||||
log.info("Starting actor runtime!")
|
||||
|
||||
# mark us for teardown on exit
|
||||
implicit_runtime = True
|
||||
|
||||
async with open_root_actor(**kwargs) as actor:
|
||||
assert actor is current_actor()
|
||||
|
||||
# try:
|
||||
async with _open_and_supervise_one_cancels_all_nursery(
|
||||
actor
|
||||
) as anursery:
|
||||
yield anursery
|
||||
|
||||
else: # sub-nursery case
|
||||
|
||||
async with _open_and_supervise_one_cancels_all_nursery(
|
||||
actor
|
||||
) as anursery:
|
||||
yield anursery
|
||||
|
||||
finally:
|
||||
log.debug("Nursery teardown complete")
|
||||
|
||||
# shutdown runtime if it was started
|
||||
if implicit_runtime:
|
||||
log.info("Shutting down actor tree")
|
|
@ -1,440 +0,0 @@
|
|||
"""
|
||||
``trio`` inspired apis and helpers
|
||||
"""
|
||||
from functools import partial
|
||||
import inspect
|
||||
import multiprocessing as mp
|
||||
from typing import Tuple, List, Dict, Optional
|
||||
import typing
|
||||
import warnings
|
||||
|
||||
import trio
|
||||
from async_generator import asynccontextmanager
|
||||
|
||||
from . import _debug
|
||||
from ._state import current_actor, is_main_process, is_root_process
|
||||
from .log import get_logger, get_loglevel
|
||||
from ._actor import Actor
|
||||
from ._portal import Portal
|
||||
from ._exceptions import is_multi_cancelled
|
||||
from ._root import open_root_actor
|
||||
from . import _state
|
||||
from . import _spawn
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
|
||||
|
||||
|
||||
class ActorNursery:
|
||||
"""Spawn scoped subprocess actors.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
actor: Actor,
|
||||
ria_nursery: trio.Nursery,
|
||||
da_nursery: trio.Nursery,
|
||||
errors: Dict[Tuple[str, str], Exception],
|
||||
) -> None:
|
||||
# self.supervisor = supervisor # TODO
|
||||
self._actor: Actor = actor
|
||||
self._ria_nursery = ria_nursery
|
||||
self._da_nursery = da_nursery
|
||||
self._children: Dict[
|
||||
Tuple[str, str],
|
||||
Tuple[Actor, mp.Process, Optional[Portal]]
|
||||
] = {}
|
||||
# portals spawned with ``run_in_actor()`` are
|
||||
# cancelled when their "main" result arrives
|
||||
self._cancel_after_result_on_exit: set = set()
|
||||
self.cancelled: bool = False
|
||||
self._join_procs = trio.Event()
|
||||
self.errors = errors
|
||||
|
||||
async def start_actor(
|
||||
self,
|
||||
name: str,
|
||||
*,
|
||||
bind_addr: Tuple[str, int] = _default_bind_addr,
|
||||
rpc_module_paths: List[str] = None,
|
||||
enable_modules: List[str] = None,
|
||||
loglevel: str = None, # set log level per subactor
|
||||
nursery: trio.Nursery = None,
|
||||
) -> Portal:
|
||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||
|
||||
# configure and pass runtime state
|
||||
_rtv = _state._runtime_vars.copy()
|
||||
_rtv['_is_root'] = False
|
||||
|
||||
enable_modules = enable_modules or []
|
||||
|
||||
if rpc_module_paths:
|
||||
warnings.warn(
|
||||
"`rpc_module_paths` is now deprecated, use "
|
||||
" `enable_modules` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
enable_modules.extend(rpc_module_paths)
|
||||
|
||||
subactor = Actor(
|
||||
name,
|
||||
# modules allowed to invoked funcs from
|
||||
enable_modules=enable_modules,
|
||||
loglevel=loglevel,
|
||||
arbiter_addr=current_actor()._arb_addr,
|
||||
)
|
||||
parent_addr = self._actor.accept_addr
|
||||
assert parent_addr
|
||||
|
||||
# start a task to spawn a process
|
||||
# blocks until process has been started and a portal setup
|
||||
nursery = nursery or self._da_nursery
|
||||
|
||||
# XXX: the type ignore is actually due to a `mypy` bug
|
||||
return await nursery.start( # type: ignore
|
||||
partial(
|
||||
_spawn.new_proc,
|
||||
name,
|
||||
self,
|
||||
subactor,
|
||||
self.errors,
|
||||
bind_addr,
|
||||
parent_addr,
|
||||
_rtv, # run time vars
|
||||
)
|
||||
)
|
||||
|
||||
async def run_in_actor(
|
||||
self,
|
||||
fn: typing.Callable,
|
||||
*,
|
||||
name: Optional[str] = None,
|
||||
bind_addr: Tuple[str, int] = _default_bind_addr,
|
||||
rpc_module_paths: Optional[List[str]] = None,
|
||||
enable_modules: List[str] = None,
|
||||
loglevel: str = None, # set log level per subactor
|
||||
**kwargs, # explicit args to ``fn``
|
||||
) -> Portal:
|
||||
"""Spawn a new actor, run a lone task, then terminate the actor and
|
||||
return its result.
|
||||
|
||||
Actors spawned using this method are kept alive at nursery teardown
|
||||
until the task spawned by executing ``fn`` completes at which point
|
||||
the actor is terminated.
|
||||
"""
|
||||
mod_path = fn.__module__
|
||||
|
||||
if name is None:
|
||||
# use the explicit function name if not provided
|
||||
name = fn.__name__
|
||||
|
||||
portal = await self.start_actor(
|
||||
name,
|
||||
enable_modules=[mod_path] + (
|
||||
enable_modules or rpc_module_paths or []
|
||||
),
|
||||
bind_addr=bind_addr,
|
||||
loglevel=loglevel,
|
||||
# use the run_in_actor nursery
|
||||
nursery=self._ria_nursery,
|
||||
)
|
||||
|
||||
# XXX: don't allow stream funcs
|
||||
if not (
|
||||
inspect.iscoroutinefunction(fn) and
|
||||
not getattr(fn, '_tractor_stream_function', False)
|
||||
):
|
||||
raise TypeError(f'{fn} must be an async function!')
|
||||
|
||||
# this marks the actor to be cancelled after its portal result
|
||||
# is retreived, see logic in `open_nursery()` below.
|
||||
self._cancel_after_result_on_exit.add(portal)
|
||||
await portal._submit_for_result(
|
||||
mod_path,
|
||||
fn.__name__,
|
||||
**kwargs
|
||||
)
|
||||
return portal
|
||||
|
||||
async def cancel(self, hard_kill: bool = False) -> None:
|
||||
"""Cancel this nursery by instructing each subactor to cancel
|
||||
itself and wait for all subactors to terminate.
|
||||
|
||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
||||
directly without any far end graceful ``trio`` cancellation.
|
||||
"""
|
||||
self.cancelled = True
|
||||
|
||||
log.cancel(f"Cancelling nursery in {self._actor.uid}")
|
||||
with trio.move_on_after(3) as cs:
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
|
||||
for subactor, proc, portal in self._children.values():
|
||||
|
||||
# TODO: are we ever even going to use this or
|
||||
# is the spawning backend responsible for such
|
||||
# things? I'm thinking latter.
|
||||
if hard_kill:
|
||||
proc.terminate()
|
||||
|
||||
else:
|
||||
if portal is None: # actor hasn't fully spawned yet
|
||||
event = self._actor._peer_connected[subactor.uid]
|
||||
log.warning(
|
||||
f"{subactor.uid} wasn't finished spawning?")
|
||||
|
||||
await event.wait()
|
||||
|
||||
# channel/portal should now be up
|
||||
_, _, portal = self._children[subactor.uid]
|
||||
|
||||
# XXX should be impossible to get here
|
||||
# unless method was called from within
|
||||
# shielded cancel scope.
|
||||
if portal is None:
|
||||
# cancelled while waiting on the event
|
||||
# to arrive
|
||||
chan = self._actor._peers[subactor.uid][-1]
|
||||
if chan:
|
||||
portal = Portal(chan)
|
||||
else: # there's no other choice left
|
||||
proc.terminate()
|
||||
|
||||
# spawn cancel tasks for each sub-actor
|
||||
assert portal
|
||||
nursery.start_soon(portal.cancel_actor)
|
||||
|
||||
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||
# then hard kill all sub-processes
|
||||
if cs.cancelled_caught:
|
||||
log.error(
|
||||
f"Failed to cancel {self}\nHard killing process tree!")
|
||||
for subactor, proc, portal in self._children.values():
|
||||
log.warning(f"Hard killing process {proc}")
|
||||
proc.terminate()
|
||||
|
||||
# mark ourselves as having (tried to have) cancelled all subactors
|
||||
self._join_procs.set()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _open_and_supervise_one_cancels_all_nursery(
|
||||
actor: Actor,
|
||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||
|
||||
# the collection of errors retreived from spawned sub-actors
|
||||
errors: Dict[Tuple[str, str], Exception] = {}
|
||||
|
||||
# This is the outermost level "deamon actor" nursery. It is awaited
|
||||
# **after** the below inner "run in actor nursery". This allows for
|
||||
# handling errors that are generated by the inner nursery in
|
||||
# a supervisor strategy **before** blocking indefinitely to wait for
|
||||
# actors spawned in "daemon mode" (aka started using
|
||||
# ``ActorNursery.start_actor()``).
|
||||
|
||||
# errors from this daemon actor nursery bubble up to caller
|
||||
async with trio.open_nursery() as da_nursery:
|
||||
try:
|
||||
# This is the inner level "run in actor" nursery. It is
|
||||
# awaited first since actors spawned in this way (using
|
||||
# ``ActorNusery.run_in_actor()``) are expected to only
|
||||
# return a single result and then complete (i.e. be canclled
|
||||
# gracefully). Errors collected from these actors are
|
||||
# immediately raised for handling by a supervisor strategy.
|
||||
# As such if the strategy propagates any error(s) upwards
|
||||
# the above "daemon actor" nursery will be notified.
|
||||
async with trio.open_nursery() as ria_nursery:
|
||||
|
||||
anursery = ActorNursery(
|
||||
actor,
|
||||
ria_nursery,
|
||||
da_nursery,
|
||||
errors
|
||||
)
|
||||
try:
|
||||
# spawning of actors happens in the caller's scope
|
||||
# after we yield upwards
|
||||
yield anursery
|
||||
|
||||
log.runtime(
|
||||
f"Waiting on subactors {anursery._children} "
|
||||
"to complete"
|
||||
)
|
||||
|
||||
# Last bit before first nursery block ends in the case
|
||||
# where we didn't error in the caller's scope
|
||||
|
||||
# signal all process monitor tasks to conduct
|
||||
# hard join phase.
|
||||
anursery._join_procs.set()
|
||||
|
||||
except BaseException as err:
|
||||
|
||||
# If we error in the root but the debugger is
|
||||
# engaged we don't want to prematurely kill (and
|
||||
# thus clobber access to) the local tty since it
|
||||
# will make the pdb repl unusable.
|
||||
# Instead try to wait for pdb to be released before
|
||||
# tearing down.
|
||||
if is_root_process():
|
||||
|
||||
# TODO: could this make things more deterministic?
|
||||
# wait to see if a sub-actor task will be
|
||||
# scheduled and grab the tty lock on the next
|
||||
# tick?
|
||||
# await trio.testing.wait_all_tasks_blocked()
|
||||
|
||||
debug_complete = _debug._no_remote_has_tty
|
||||
if (
|
||||
debug_complete and
|
||||
not debug_complete.is_set()
|
||||
):
|
||||
log.warning(
|
||||
'Root has errored but pdb is in use by '
|
||||
f'child {_debug._global_actor_in_debug}\n'
|
||||
'Waiting on tty lock to release..')
|
||||
|
||||
# with trio.CancelScope(shield=True):
|
||||
await debug_complete.wait()
|
||||
|
||||
# if the caller's scope errored then we activate our
|
||||
# one-cancels-all supervisor strategy (don't
|
||||
# worry more are coming).
|
||||
anursery._join_procs.set()
|
||||
|
||||
try:
|
||||
# XXX: hypothetically an error could be
|
||||
# raised and then a cancel signal shows up
|
||||
# slightly after in which case the `else:`
|
||||
# block here might not complete? For now,
|
||||
# shield both.
|
||||
with trio.CancelScope(shield=True):
|
||||
etype = type(err)
|
||||
if etype in (
|
||||
trio.Cancelled,
|
||||
KeyboardInterrupt
|
||||
) or (
|
||||
is_multi_cancelled(err)
|
||||
):
|
||||
log.cancel(
|
||||
f"Nursery for {current_actor().uid} "
|
||||
f"was cancelled with {etype}")
|
||||
else:
|
||||
log.exception(
|
||||
f"Nursery for {current_actor().uid} "
|
||||
f"errored with {err}, ")
|
||||
|
||||
# cancel all subactors
|
||||
await anursery.cancel()
|
||||
|
||||
except trio.MultiError as merr:
|
||||
# If we receive additional errors while waiting on
|
||||
# remaining subactors that were cancelled,
|
||||
# aggregate those errors with the original error
|
||||
# that triggered this teardown.
|
||||
if err not in merr.exceptions:
|
||||
raise trio.MultiError(merr.exceptions + [err])
|
||||
else:
|
||||
raise
|
||||
|
||||
# ria_nursery scope end
|
||||
|
||||
# XXX: do we need a `trio.Cancelled` catch here as well?
|
||||
# this is the catch around the ``.run_in_actor()`` nursery
|
||||
except (
|
||||
|
||||
Exception,
|
||||
trio.MultiError,
|
||||
trio.Cancelled
|
||||
|
||||
) as err:
|
||||
|
||||
# If actor-local error was raised while waiting on
|
||||
# ".run_in_actor()" actors then we also want to cancel all
|
||||
# remaining sub-actors (due to our lone strategy:
|
||||
# one-cancels-all).
|
||||
log.cancel(f"Nursery cancelling due to {err}")
|
||||
if anursery._children:
|
||||
with trio.CancelScope(shield=True):
|
||||
await anursery.cancel()
|
||||
raise
|
||||
finally:
|
||||
# No errors were raised while awaiting ".run_in_actor()"
|
||||
# actors but those actors may have returned remote errors as
|
||||
# results (meaning they errored remotely and have relayed
|
||||
# those errors back to this parent actor). The errors are
|
||||
# collected in ``errors`` so cancel all actors, summarize
|
||||
# all errors and re-raise.
|
||||
if errors:
|
||||
if anursery._children:
|
||||
with trio.CancelScope(shield=True):
|
||||
await anursery.cancel()
|
||||
|
||||
# use `MultiError` as needed
|
||||
if len(errors) > 1:
|
||||
raise trio.MultiError(tuple(errors.values()))
|
||||
else:
|
||||
raise list(errors.values())[0]
|
||||
|
||||
# ria_nursery scope end - nursery checkpoint
|
||||
|
||||
# after nursery exit
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_nursery(
|
||||
**kwargs,
|
||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||
"""Create and yield a new ``ActorNursery`` to be used for spawning
|
||||
structured concurrent subactors.
|
||||
|
||||
When an actor is spawned a new trio task is started which
|
||||
invokes one of the process spawning backends to create and start
|
||||
a new subprocess. These tasks are started by one of two nurseries
|
||||
detailed below. The reason for spawning processes from within
|
||||
a new task is because ``trio_run_in_process`` itself creates a new
|
||||
internal nursery and the same task that opens a nursery **must**
|
||||
close it. It turns out this approach is probably more correct
|
||||
anyway since it is more clear from the following nested nurseries
|
||||
which cancellation scopes correspond to each spawned subactor set.
|
||||
"""
|
||||
implicit_runtime = False
|
||||
|
||||
actor = current_actor(err_on_no_runtime=False)
|
||||
|
||||
try:
|
||||
if actor is None and is_main_process():
|
||||
|
||||
# if we are the parent process start the
|
||||
# actor runtime implicitly
|
||||
log.info("Starting actor runtime!")
|
||||
|
||||
# mark us for teardown on exit
|
||||
implicit_runtime = True
|
||||
|
||||
async with open_root_actor(**kwargs) as actor:
|
||||
assert actor is current_actor()
|
||||
|
||||
# try:
|
||||
async with _open_and_supervise_one_cancels_all_nursery(
|
||||
actor
|
||||
) as anursery:
|
||||
yield anursery
|
||||
|
||||
else: # sub-nursery case
|
||||
|
||||
async with _open_and_supervise_one_cancels_all_nursery(
|
||||
actor
|
||||
) as anursery:
|
||||
yield anursery
|
||||
|
||||
finally:
|
||||
log.debug("Nursery teardown complete")
|
||||
|
||||
# shutdown runtime if it was started
|
||||
if implicit_runtime:
|
||||
log.info("Shutting down actor tree")
|
|
@ -121,6 +121,7 @@ def pub(
|
|||
wrapped: typing.Callable = None,
|
||||
*,
|
||||
tasks: Set[str] = set(),
|
||||
send_on_connect: Any = None,
|
||||
):
|
||||
"""Publisher async generator decorator.
|
||||
|
||||
|
@ -206,7 +207,7 @@ def pub(
|
|||
|
||||
# handle the decorator not called with () case
|
||||
if wrapped is None:
|
||||
return partial(pub, tasks=tasks)
|
||||
return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
|
||||
|
||||
task2lock: Dict[str, trio.StrictFIFOLock] = {}
|
||||
|
||||
|
@ -249,6 +250,11 @@ def pub(
|
|||
|
||||
try:
|
||||
modify_subs(topics2ctxs, topics, ctx)
|
||||
|
||||
# if specified send the startup message back to consumer
|
||||
if send_on_connect is not None:
|
||||
await ctx.send_yield(send_on_connect)
|
||||
|
||||
# block and let existing feed task deliver
|
||||
# stream data until it is cancelled in which case
|
||||
# the next waiting task will take over and spawn it again
|
||||
|
|
|
@ -0,0 +1,300 @@
|
|||
'''
|
||||
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
|
||||
|
||||
'''
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager as acm
|
||||
import inspect
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
AsyncIterator,
|
||||
Awaitable,
|
||||
Optional,
|
||||
)
|
||||
|
||||
import trio
|
||||
|
||||
from .log import get_logger, get_console_log
|
||||
from ._state import current_actor
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
__all__ = ['run_task', 'run_as_asyncio_guest']
|
||||
|
||||
|
||||
# async def consume_asyncgen(
|
||||
# to_trio: trio.MemorySendChannel,
|
||||
# coro: AsyncIterator,
|
||||
# ) -> None:
|
||||
# """Stream async generator results back to ``trio``.
|
||||
|
||||
# ``from_trio`` might eventually be used here for
|
||||
# bidirectional streaming.
|
||||
# """
|
||||
# async for item in coro:
|
||||
# to_trio.send_nowait(item)
|
||||
|
||||
|
||||
def _run_asyncio_task(
|
||||
func: Callable,
|
||||
*,
|
||||
qsize: int = 1,
|
||||
# _treat_as_stream: bool = False,
|
||||
provide_channels: bool = False,
|
||||
**kwargs,
|
||||
|
||||
) -> Any:
|
||||
"""
|
||||
Run an ``asyncio`` async function or generator in a task, return
|
||||
or stream the result back to ``trio``.
|
||||
|
||||
"""
|
||||
if not current_actor().is_infected_aio():
|
||||
raise RuntimeError("`infect_asyncio` mode is not enabled!?")
|
||||
|
||||
# ITC (inter task comms)
|
||||
from_trio = asyncio.Queue(qsize) # type: ignore
|
||||
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
|
||||
|
||||
from_aio._err = None
|
||||
|
||||
args = tuple(inspect.getfullargspec(func).args)
|
||||
|
||||
if getattr(func, '_tractor_steam_function', None):
|
||||
# the assumption is that the target async routine accepts the
|
||||
# send channel then it intends to yield more then one return
|
||||
# value otherwise it would just return ;P
|
||||
# _treat_as_stream = True
|
||||
assert qsize > 1
|
||||
|
||||
if provide_channels:
|
||||
assert 'to_trio' in args
|
||||
|
||||
# allow target func to accept/stream results manually by name
|
||||
if 'to_trio' in args:
|
||||
kwargs['to_trio'] = to_trio
|
||||
|
||||
if 'from_trio' in args:
|
||||
kwargs['from_trio'] = from_trio
|
||||
|
||||
coro = func(**kwargs)
|
||||
|
||||
cancel_scope = trio.CancelScope()
|
||||
aio_task_complete = trio.Event()
|
||||
aio_err: Optional[BaseException] = None
|
||||
|
||||
async def wait_on_coro_final_result(
|
||||
to_trio: trio.MemorySendChannel,
|
||||
coro: Awaitable,
|
||||
aio_task_complete: trio.Event,
|
||||
|
||||
) -> None:
|
||||
"""
|
||||
Await ``coro`` and relay result back to ``trio``.
|
||||
|
||||
"""
|
||||
nonlocal aio_err
|
||||
orig = result = id(coro)
|
||||
try:
|
||||
result = await coro
|
||||
except BaseException as err:
|
||||
aio_err = err
|
||||
from_aio._err = aio_err
|
||||
finally:
|
||||
aio_task_complete.set()
|
||||
if result != orig and aio_err is None:
|
||||
to_trio.send_nowait(result)
|
||||
|
||||
# start the asyncio task we submitted from trio
|
||||
if inspect.isawaitable(coro):
|
||||
task = asyncio.create_task(
|
||||
wait_on_coro_final_result(to_trio, coro, aio_task_complete)
|
||||
)
|
||||
|
||||
# elif inspect.isasyncgen(coro):
|
||||
# task = asyncio.create_task(consume_asyncgen(to_trio, coro))
|
||||
|
||||
else:
|
||||
raise TypeError(f"No support for invoking {coro}")
|
||||
|
||||
def cancel_trio(task):
|
||||
"""Cancel the calling ``trio`` task on error.
|
||||
"""
|
||||
nonlocal aio_err
|
||||
try:
|
||||
aio_err = task.exception()
|
||||
except asyncio.CancelledError as cerr:
|
||||
aio_err = cerr
|
||||
|
||||
if aio_err:
|
||||
log.exception(f"asyncio task errorred:\n{aio_err}")
|
||||
from_aio._err = aio_err
|
||||
cancel_scope.cancel()
|
||||
from_aio.close()
|
||||
|
||||
task.add_done_callback(cancel_trio)
|
||||
|
||||
return task, from_aio, to_trio, cancel_scope, aio_task_complete
|
||||
|
||||
|
||||
async def run_task(
|
||||
func: Callable,
|
||||
*,
|
||||
|
||||
qsize: int = 2**10,
|
||||
# _treat_as_stream: bool = False,
|
||||
**kwargs,
|
||||
|
||||
) -> Any:
|
||||
"""Run an ``asyncio`` async function or generator in a task, return
|
||||
or stream the result back to ``trio``.
|
||||
|
||||
"""
|
||||
# simple async func
|
||||
try:
|
||||
task, from_aio, to_trio, cs, _ = _run_asyncio_task(
|
||||
func,
|
||||
qsize=1,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# return single value
|
||||
with cs:
|
||||
# naively expect the mem chan api to do the job
|
||||
# of handling cross-framework cancellations / errors
|
||||
return await from_aio.receive()
|
||||
|
||||
if cs.cancelled_caught:
|
||||
# always raise from any captured asyncio error
|
||||
if from_aio._err:
|
||||
raise from_aio._err
|
||||
|
||||
# Do we need this?
|
||||
except BaseException as err:
|
||||
|
||||
aio_err = from_aio._err
|
||||
|
||||
if aio_err is not None:
|
||||
# always raise from any captured asyncio error
|
||||
raise err from aio_err
|
||||
else:
|
||||
raise
|
||||
|
||||
# except trio.Cancelled:
|
||||
# raise
|
||||
finally:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
|
||||
|
||||
# TODO: explicit api for the streaming case where
|
||||
# we pull from the mem chan in an async generator?
|
||||
# This ends up looking more like our ``Portal.open_stream_from()``
|
||||
# NB: code below is untested.
|
||||
|
||||
# async def _start_and_sync_aio_task(
|
||||
# from_trio,
|
||||
# to_trio,
|
||||
# from_aio,
|
||||
|
||||
|
||||
@acm
|
||||
async def open_channel_from(
|
||||
|
||||
target: Callable[[Any, ...], Any],
|
||||
**kwargs,
|
||||
|
||||
) -> AsyncIterator[Any]:
|
||||
|
||||
try:
|
||||
task, from_aio, to_trio, cs, aio_task_complete = _run_asyncio_task(
|
||||
target,
|
||||
qsize=2**8,
|
||||
provide_channels=True,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
with cs:
|
||||
# sync to "started()" call.
|
||||
first = await from_aio.receive()
|
||||
# stream values upward
|
||||
async with from_aio:
|
||||
yield first, from_aio
|
||||
# await aio_task_complete.wait()
|
||||
|
||||
except BaseException as err:
|
||||
|
||||
aio_err = from_aio._err
|
||||
|
||||
if aio_err is not None:
|
||||
# always raise from any captured asyncio error
|
||||
raise err from aio_err
|
||||
else:
|
||||
raise
|
||||
|
||||
finally:
|
||||
if cs.cancelled_caught:
|
||||
# always raise from any captured asyncio error
|
||||
if from_aio._err:
|
||||
raise from_aio._err
|
||||
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
|
||||
|
||||
def run_as_asyncio_guest(
|
||||
trio_main: Callable,
|
||||
) -> None:
|
||||
"""Entry for an "infected ``asyncio`` actor".
|
||||
|
||||
Uh, oh. :o
|
||||
|
||||
It looks like your event loop has caught a case of the ``trio``s.
|
||||
|
||||
:()
|
||||
|
||||
Don't worry, we've heard you'll barely notice. You might hallucinate
|
||||
a few more propagating errors and feel like your digestion has
|
||||
slowed but if anything get's too bad your parents will know about
|
||||
it.
|
||||
|
||||
:)
|
||||
|
||||
"""
|
||||
# Disable sigint handling in children?
|
||||
# import signal
|
||||
# signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
get_console_log('runtime')
|
||||
|
||||
async def aio_main(trio_main):
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
trio_done_fut = asyncio.Future()
|
||||
|
||||
def trio_done_callback(main_outcome):
|
||||
|
||||
print(f"trio_main finished: {main_outcome!r}")
|
||||
trio_done_fut.set_result(main_outcome)
|
||||
|
||||
# start the infection: run trio on the asyncio loop in "guest mode"
|
||||
log.info(f"Infecting asyncio process with {trio_main}")
|
||||
|
||||
trio.lowlevel.start_guest_run(
|
||||
trio_main,
|
||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||
done_callback=trio_done_callback,
|
||||
)
|
||||
(await trio_done_fut).unwrap()
|
||||
|
||||
# might as well if it's installed.
|
||||
try:
|
||||
import uvloop
|
||||
loop = uvloop.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
asyncio.run(aio_main(trio_main))
|
|
@ -0,0 +1,14 @@
|
|||
'''
|
||||
Sugary patterns for trio + tractor designs.
|
||||
|
||||
'''
|
||||
from ._mngrs import async_enter_all
|
||||
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
|
||||
|
||||
|
||||
__all__ = [
|
||||
'async_enter_all',
|
||||
'broadcast_receiver',
|
||||
'BroadcastReceiver',
|
||||
'Lagged',
|
||||
]
|
|
@ -0,0 +1,64 @@
|
|||
'''
|
||||
Async context manager primitives with hard ``trio``-aware semantics
|
||||
|
||||
'''
|
||||
from typing import AsyncContextManager
|
||||
from typing import TypeVar
|
||||
from contextlib import asynccontextmanager as acm
|
||||
|
||||
import trio
|
||||
|
||||
|
||||
# A regular invariant generic type
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
async def _enter_and_sleep(
|
||||
|
||||
mngr: AsyncContextManager[T],
|
||||
to_yield: dict[int, T],
|
||||
all_entered: trio.Event,
|
||||
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> T:
|
||||
'''Open the async context manager deliver it's value
|
||||
to this task's spawner and sleep until cancelled.
|
||||
|
||||
'''
|
||||
async with mngr as value:
|
||||
to_yield[id(mngr)] = value
|
||||
|
||||
if all(to_yield.values()):
|
||||
all_entered.set()
|
||||
|
||||
# sleep until cancelled
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
@acm
|
||||
async def async_enter_all(
|
||||
|
||||
*mngrs: tuple[AsyncContextManager[T]],
|
||||
|
||||
) -> tuple[T]:
|
||||
|
||||
to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
|
||||
|
||||
all_entered = trio.Event()
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
for mngr in mngrs:
|
||||
n.start_soon(
|
||||
_enter_and_sleep,
|
||||
mngr,
|
||||
to_yield,
|
||||
all_entered,
|
||||
)
|
||||
|
||||
# deliver control once all managers have started up
|
||||
await all_entered.wait()
|
||||
yield tuple(to_yield.values())
|
||||
|
||||
# tear down all sleeper tasks thus triggering individual
|
||||
# mngr ``__aexit__()``s.
|
||||
n.cancel_scope.cancel()
|
Loading…
Reference in New Issue