Compare commits

...

65 Commits

Author SHA1 Message Date
Tyler Goodlet a55ea18c7d Support a delay in daemon actor noops 2021-10-12 12:05:03 -04:00
Tyler Goodlet 797bb22981 Lol, fix sub-actor case 2021-10-12 12:03:57 -04:00
Tyler Goodlet 2c74db9cb7 Relay `ContextCancelled` for `Portal.run()` cancelled remote tasks 2021-10-12 12:03:15 -04:00
Tyler Goodlet 39feb627a8 Disable frame hides in portal temporarily 2021-10-12 12:02:04 -04:00
Tyler Goodlet 348423ece7 Let `ActorNursery` choose whether to raise remote errors
- Don't raise inside `result_from_portal()` and instead return a flag that
  indicates whether the error was remote or not.
- Stick the soft reap sequence outside a `finally:`.
- do error tracking in `ActorNursery._handle_err() -> bool:` to avoid duplicate
  raises on close.
- add `ActorNursery.cancel_called: bool`
- accept a cancelled soft reap and toss in some logging for now to begin
  figuring out races with the spawner nursery vs. the enter block being
  the source of an error that causes actor nursery cancellation.
- cancel the spawn nursery if all procs complete but the nursery hasn't
  been closed (pretty sure this isn't correct nor working.. the nursery
  should always be closed in order for the join procs event to have
  arrived).
- tossed in some code for the mp backend but none of it works (or is
  tested) and needs to be rewritten like the trio spawner likely.

  All still very WIP in case that wasn't clear XD
2021-10-12 11:38:19 -04:00
Tyler Goodlet 5eb7c4c857 Disable showing log capture when `--ll` is passed 2021-10-11 21:53:54 -04:00
Tyler Goodlet 4d30e25591 Don't alert nursery on expected cancel result 2021-10-10 16:47:17 -04:00
Tyler Goodlet c01d2f8aea Don't double request a cancelled actor 2021-10-10 16:46:29 -04:00
Tyler Goodlet 8e21bb046e Add `Portal.cancel_called` 2021-10-10 16:43:32 -04:00
Tyler Goodlet 66137030d9 Try out always delivering `ContextCancelled`
Previously, on a task cancel request there was no real response other
then the `None` returned from the `Actor._cancel_task()` method and
sometimes this might get lost if the cancel task was cancelled by
a runtime cancel request (i.e. an "actor cancel"). Instead let's try
always checking if the task's cancel scope is cancelled and if so relay
back to the caller a `ContextCancelled` which can then be explicitly
handled by actor nursery machinery as well as individual cancel APIs
(`Portal.cancel_actor()`, and maybe later if we decide to expose the
`tractor.Context` on every `Portal.run()` call).

Also,
- fix up a bunch of cancellation related logging
- add an `Actor.cancel_called` flag much like `trio`'s cancel scope
2021-10-10 16:36:19 -04:00
Tyler Goodlet cef9ab7353 Handle varied multierror order with broker resource err 2021-10-10 13:07:15 -04:00
Tyler Goodlet 0dcffeee0f Tweaks to get us down to 3 failed cancel tests
The remaining errors all have to do with not getting the exact same
format as previous of collected `.run_in_actor()` errors as `MultiError`s.
Not even sure at this point if the whole collect single task results and
bubble should be a thing but trying to keep the support for now I guess.

There's still issues with a hang in the pub sub tests and the one
debugger test has a different outcome due to the root getting the lock
from the breakpoint forever child too quickly.

- go back to raising portal result-that-are-errors in the spawn task
- go back to shielding the nursery close / proc join event
  - report any error on this shielded join and relay to nursery handler
    method (which should be customizable in the future for alternate
    strats then OCA) as well try to collect ria (run in actor) result
- drop async (via nursery) ria result collection, just do it sync with
  the soft `proc.wait()` reap immediately after, which should work
  presuming that the ipc connection will break on process termination
  anyway and it'll mean no multierror to deal with and no cancel scope
  to manage on the ria reaper task.
2021-10-10 13:06:27 -04:00
Tyler Goodlet 8a59713d48 Re-route errors from spawn tasks and mngr task to handler 2021-10-10 11:54:19 -04:00
Tyler Goodlet 0488f5e57e Drop to a 2 polls for root debugging check 2021-10-10 11:52:24 -04:00
Tyler Goodlet a3cdba0577 Do immediate remote task cancels
As for `Actor.cancel()` requests, do the same for
`Actor._cancel_task()` but use `_invoke()` to ensure
correct msg transactions with caller. Don't cancel task
cancels on a cancel-all-tasks operation in attempt at
more determinism.
2021-10-10 11:42:32 -04:00
Tyler Goodlet 5048c3534f Re-raise KBI's i guess? 2021-10-08 21:01:51 -04:00
Tyler Goodlet 5df08aabb7 Hide some portal stack layers in tracebacks 2021-10-08 18:53:54 -04:00
Tyler Goodlet 1b7cd419f2 Drop old portal helper 2021-10-08 18:20:57 -04:00
Tyler Goodlet e32a5917a9 Don't whine about ; it ain't rpc 2021-10-08 18:20:08 -04:00
Tyler Goodlet 7250deb30f Make OCA nursery **not** a multiplexed mindfuck 2021-10-08 18:18:00 -04:00
Tyler Goodlet 64ebb2aff4 WIP rework trio spanwer to include cancellation logic; not correct yet.. 2021-10-08 18:14:44 -04:00
Tyler Goodlet c02a493d8c Add a maybe-open-debugger helper 2021-10-08 18:13:55 -04:00
Tyler Goodlet fb026e3747 First draft: `.to_asyncio.open_channel_from()` 2021-10-07 23:14:34 -04:00
Tyler Goodlet 2afbc3898f Make actor runtime cancellation immediate 2021-10-07 23:13:47 -04:00
Tyler Goodlet f72eabd42a Drop breakpoint owned lock 2021-10-06 17:05:58 -04:00
Tyler Goodlet 6e646a6fa6 Always cancel the asyncio task? 2021-10-06 17:05:58 -04:00
Tyler Goodlet aa94ea5bcc WIP, add back in root shield, print out pdb sigint opts 2021-10-06 17:05:58 -04:00
Tyler Goodlet a2a4f7af09 Test non-shielding root lock acquire on breakpoint entry 2021-10-06 17:05:58 -04:00
Tyler Goodlet 6da2c3a885 Drop old implementation cruft 2021-10-06 17:05:58 -04:00
Tyler Goodlet ed10f6e0c1 Fix error propagation on asyncio streaming tasks 2021-10-06 17:05:58 -04:00
Tyler Goodlet b43539b252 Drop bad .close() call 2021-10-06 17:05:58 -04:00
Tyler Goodlet fc46f5b74a Proxy asyncio cancelleds as well 2021-10-06 17:05:58 -04:00
Tyler Goodlet efe83f78a3 Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-10-06 17:05:58 -04:00
Tyler Goodlet de87cb510a WIP redo asyncio async gen streaming 2021-10-06 17:05:58 -04:00
Tyler Goodlet e8431bffd0 Support asyncio actors with the trio spawner backend 2021-10-06 17:05:58 -04:00
Tyler Goodlet d720c6a9c2 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-10-06 17:05:58 -04:00
Tyler Goodlet 732eaaf21e Support asyncio actors with the trio spawner backend 2021-10-06 17:05:58 -04:00
Tyler Goodlet c63323086c Link to SC on wikipedia 2021-10-06 17:05:58 -04:00
Tyler Goodlet 03ae42fa10 Add per actor debug mode toggle 2021-10-06 17:05:58 -04:00
Tyler Goodlet 2cd3a878f0 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-10-06 17:05:58 -04:00
Tyler Goodlet a237dcd020 Pass func refs 2021-10-06 17:05:58 -04:00
Tyler Goodlet b4fe207369 Add initial infected asyncio error propagation test 2021-10-06 17:05:58 -04:00
Tyler Goodlet 9a994e2de3 Raise any asyncio errors if in trio task on cancel 2021-10-06 17:05:58 -04:00
Tyler Goodlet d2a810d950 Raise from asyncio error; fixes mypy 2021-10-06 17:05:58 -04:00
Tyler Goodlet 07c2151010 Tweak log msg 2021-10-06 17:05:58 -04:00
Tyler Goodlet 0d825ae6d7 Log error 2021-10-06 17:05:58 -04:00
Tyler Goodlet 5be8c86e96 Support asyncio actors with the trio spawner backend 2021-10-06 17:05:58 -04:00
Tyler Goodlet aa069a1edc Revert removal of `infect_asyncio` in nursery start methods 2021-10-06 17:05:58 -04:00
Tyler Goodlet 3c1cc90c40 Attempt to make mypy happy.. 2021-10-06 17:05:58 -04:00
Tyler Goodlet 056ca97d2a Add an obnoxious error message on internal failures 2021-10-06 17:05:58 -04:00
Tyler Goodlet 558ba7e008 Wow, fix all the broken async func invoking code..
Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
2021-10-06 17:05:58 -04:00
Tyler Goodlet 1aa70da58b Drop entrypoints from `Actor` 2021-10-06 17:05:58 -04:00
Tyler Goodlet 96cf4a962d Move asyncio guest mode entrypoint to `to_asyncio`
The function is useful if you want to run the "main process" under
`asyncio`. Until `trio` core wraps this better we'll keep our own copy
in the interim (there's a new "inside-out-guest" mode almost on
mainline so hang tight).
2021-10-06 17:05:58 -04:00
Tyler Goodlet fd70965422 Propagate any spawned `asyncio` task error upwards
This should mostly maintain top level SC principles for any task spawned
using `tractor.to_asyncio.run()`. When the `asyncio` task completes make
sure to cancel the pertaining `trio` cancel scope and raise any error
that may have resulted.

Resolves #120
2021-10-06 17:05:58 -04:00
Tyler Goodlet 6ad819362e Add a @pub kwarg to allow specifying a "startup response message" 2021-10-06 17:05:58 -04:00
Tyler Goodlet 16ab14d959 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-10-06 17:05:58 -04:00
Tyler Goodlet c7e03ae3b4 Fix *args-like type annot 2021-10-06 17:02:38 -04:00
Tyler Goodlet 38b844fb22 Lul, fix everything for cluster helper 2021-10-06 17:02:38 -04:00
Tyler Goodlet 3f8f848ce8 Fix type path to new `_supervise` mod 2021-10-06 17:02:38 -04:00
Tyler Goodlet 2fbc43f0c3 Expose `Lagged` for broadcasting 2021-10-06 17:02:38 -04:00
Tyler Goodlet 9c63cb87c7 Fix top level nursery import 2021-10-06 17:02:38 -04:00
Tyler Goodlet d7e36ad817 Add an async actor cluster spawner prototype 2021-10-06 17:02:38 -04:00
Tyler Goodlet 7c6f6571f1 Move broadcast channel parts into trionics 2021-10-06 17:02:38 -04:00
Tyler Goodlet ebf3ad6af0 Start `trionics` sub-pkg with `async_enter_all()`
Since it seems we're building out more and more higher level primitives
in order to support certain parallel style actor trees and messaging
patterns (eg. task broadcast channels), we might as well start a new
sub-package for purely `trio` constructions. We hereby dub this
the realm of `trionics` (like electronics but for trios instead of
electrons).

To kick things off, add an `async_enter_all()` concurrent
exit-stack-like context manager API which will concurrently spawn
a sequence of provided async context managers and deliver their ordered
results but with proper support for `trio` cancellation semantics.
The stdlib's `AsyncExitStack` is not compatible with nurseries not
`trio` tasks (which are cancelled) since as task will be suspended on
the stack after push and does not ever hit a checkpoint until the stack
is closed.
2021-10-06 17:02:38 -04:00
Tyler Goodlet a568d8af74 Rename the nursery module to `_supervise` 2021-10-06 17:02:38 -04:00
24 changed files with 1750 additions and 742 deletions

View File

@ -423,6 +423,7 @@ channel`_!
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general .. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org .. _matrix channel: https://matrix.to/#/!tractor:matrix.org
@ -431,7 +432,7 @@ channel`_!
.. _messages: https://en.wikipedia.org/wiki/Message_passing .. _messages: https://en.wikipedia.org/wiki/Message_passing
.. _trio docs: https://trio.readthedocs.io/en/latest/ .. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel .. _trio-parallel: https://github.com/richardsheridan/trio-parallel

View File

@ -20,7 +20,7 @@ async def sleep(
async def open_ctx( async def open_ctx(
n: tractor._trionics.ActorNursery n: tractor._supervise.ActorNursery
): ):
# spawn both actors # spawn both actors

View File

@ -74,7 +74,15 @@ def pytest_configure(config):
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
def loglevel(request): def loglevel(request):
orig = tractor.log._default_loglevel orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
level_from_cli = request.config.option.loglevel
# disable built-in capture when user passes the `--ll` value
# presuming they already know they want to see console logging
# and don't need it repeated by pytest.
if level_from_cli:
request.config.option.showcapture = 'no'
level = tractor.log._default_loglevel = level_from_cli
yield level yield level
tractor.log._default_loglevel = orig tractor.log._default_loglevel = orig

View File

@ -23,9 +23,9 @@ async def sleep_forever():
await trio.sleep_forever() await trio.sleep_forever()
async def do_nuthin(): async def do_nuthin(sleep=0):
# just nick the scheduler # just nick the scheduler
await trio.sleep(0) await trio.sleep(sleep)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -100,6 +100,7 @@ def test_multierror(arb_addr):
@pytest.mark.parametrize('delay', (0, 0.5)) @pytest.mark.parametrize('delay', (0, 0.5))
@pytest.mark.parametrize( @pytest.mark.parametrize(
'num_subactors', range(25, 26), 'num_subactors', range(25, 26),
# 'num_subactors', range(2, 3),
) )
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``trio.MultiError`` out of a nursery where """Verify we raise a ``trio.MultiError`` out of a nursery where
@ -122,16 +123,20 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
trio.run(main) trio.run(main)
assert exc_info.type == tractor.MultiError assert exc_info.type == tractor.MultiError
err = exc_info.value multi = exc_info.value
exceptions = err.exceptions exceptions = multi.exceptions
if len(exceptions) == 2: if len(exceptions) == 2:
# sometimes oddly now there's an embedded BrokenResourceError ? # sometimes there's an embedded BrokenResourceError
exceptions = exceptions[1].exceptions # next to the main multierror?
assert len(exceptions) == num_subactors
for exc in exceptions: for exc in exceptions:
if hasattr(exc, 'exceptions'):
multi = exc
break
assert len(multi.exceptions) == num_subactors
for exc in multi.exceptions:
assert isinstance(exc, tractor.RemoteActorError) assert isinstance(exc, tractor.RemoteActorError)
assert exc.type == AssertionError assert exc.type == AssertionError
@ -218,7 +223,7 @@ async def test_cancel_infinite_streamer(start_method):
# daemon complete quickly delay while single task # daemon complete quickly delay while single task
# actors error after brief delay # actors error after brief delay
(3, tractor.MultiError, AssertionError, (3, tractor.MultiError, AssertionError,
(assert_err, {'delay': 1}), (do_nuthin, {}, False)), (assert_err, {'delay': 1}), (do_nuthin, {'sleep': 0}, False)),
], ],
ids=[ ids=[
'1_run_in_actor_fails', '1_run_in_actor_fails',
@ -321,6 +326,7 @@ async def spawn_and_error(breadth, depth) -> None:
) )
kwargs = { kwargs = {
'name': f'{name}_errorer_{i}', 'name': f'{name}_errorer_{i}',
# 'delay': 0.01,
} }
await nursery.run_in_actor(*args, **kwargs) await nursery.run_in_actor(*args, **kwargs)
@ -355,9 +361,13 @@ async def test_nested_multierrors(loglevel, start_method):
depth=depth, depth=depth,
) )
except trio.MultiError as err: except trio.MultiError as err:
_err = err
assert len(err.exceptions) == subactor_breadth assert len(err.exceptions) == subactor_breadth
for subexc in err.exceptions: for subexc in err.exceptions:
# NOTE: use [print(f'err: {err}') for err in _err.exceptions]
# to inspect errors from console on failure
# verify first level actor errors are wrapped as remote # verify first level actor errors are wrapped as remote
if platform.system() == 'Windows': if platform.system() == 'Windows':
@ -381,13 +391,25 @@ async def test_nested_multierrors(loglevel, start_method):
# on windows sometimes spawning is just too slow and # on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead # we get back the (sent) cancel signal instead
if platform.system() == 'Windows': if platform.system() == 'Windows':
assert (subexc.type is trio.MultiError) or ( assert subexc.type in (
subexc.type is tractor.RemoteActorError) trio.MultiError,
tractor.RemoteActorError,
)
else: else:
assert subexc.type is trio.MultiError assert subexc.type in (
trio.MultiError,
trio.Cancelled,
# tractor.RemoteActorError,
)
else: else:
assert (subexc.type is tractor.RemoteActorError) or ( assert subexc.type in (
subexc.type is trio.Cancelled) tractor.RemoteActorError,
trio.Cancelled,
)
else:
pytest.fail(f'Got no error from nursery?')
@no_windows @no_windows

View File

@ -0,0 +1,24 @@
import asyncio
import pytest
import tractor
async def sleep_and_err():
await asyncio.sleep(0.1)
assert 0
async def asyncio_actor():
assert tractor.current_actor().is_infected_aio()
await tractor.to_asyncio.run_task(sleep_and_err)
def test_infected_simple_error(arb_addr):
async def main():
async with tractor.open_nursery() as n:
await n.run_in_actor(asyncio_actor, infected_asyncio=True)
with pytest.raises(tractor.RemoteActorError) as excinfo:
tractor.run(main, arbiter_addr=arb_addr)

View File

@ -180,6 +180,7 @@ def test_multi_actor_subs_arbiter_pub(
'streamer', 'streamer',
enable_modules=[__name__], enable_modules=[__name__],
) )
name = 'streamer'
even_portal = await n.run_in_actor( even_portal = await n.run_in_actor(
subs, subs,

View File

@ -12,7 +12,7 @@ import pytest
import trio import trio
from trio.lowlevel import current_task from trio.lowlevel import current_task
import tractor import tractor
from tractor._broadcast import broadcast_receiver, Lagged from tractor.trionics import broadcast_receiver, Lagged
@tractor.context @tractor.context
@ -432,7 +432,6 @@ def test_first_recver_is_cancelled():
tx, rx = trio.open_memory_channel(1) tx, rx = trio.open_memory_channel(1)
brx = broadcast_receiver(rx, 1) brx = broadcast_receiver(rx, 1)
cs = trio.CancelScope() cs = trio.CancelScope()
sequence = list(range(3))
async def sub_and_recv(): async def sub_and_recv():
with cs: with cs:

View File

@ -13,7 +13,7 @@ from ._streaming import (
context, context,
) )
from ._discovery import get_arbiter, find_actor, wait_for_actor from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._trionics import open_nursery from ._supervise import open_nursery
from ._state import current_actor, is_root_process from ._state import current_actor, is_root_process
from ._exceptions import ( from ._exceptions import (
RemoteActorError, RemoteActorError,

View File

@ -49,6 +49,7 @@ async def _invoke(
chan: Channel, chan: Channel,
func: typing.Callable, func: typing.Callable,
kwargs: Dict[str, Any], kwargs: Dict[str, Any],
is_rpc: bool = True,
task_status: TaskStatus[ task_status: TaskStatus[
Union[trio.CancelScope, BaseException] Union[trio.CancelScope, BaseException]
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
@ -57,7 +58,7 @@ async def _invoke(
Invoke local func and deliver result(s) over provided channel. Invoke local func and deliver result(s) over provided channel.
''' '''
__tracebackhide__ = True # __tracebackhide__ = True
treat_as_gen = False treat_as_gen = False
# possible a traceback (not sure what typing is for this..) # possible a traceback (not sure what typing is for this..)
@ -68,6 +69,7 @@ async def _invoke(
ctx = Context(chan, cid) ctx = Context(chan, cid)
context: bool = False context: bool = False
fname = func.__name__
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
@ -163,6 +165,7 @@ async def _invoke(
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err: except trio.Cancelled as err:
tb = err.__traceback__ tb = err.__traceback__
raise
if cs.cancelled_caught: if cs.cancelled_caught:
@ -170,7 +173,6 @@ async def _invoke(
# so they can be unwrapped and displayed on the caller # so they can be unwrapped and displayed on the caller
# side! # side!
fname = func.__name__
if ctx._cancel_called: if ctx._cancel_called:
msg = f'{fname} cancelled itself' msg = f'{fname} cancelled itself'
@ -191,9 +193,33 @@ async def _invoke(
await chan.send({'functype': 'asyncfunc', 'cid': cid}) await chan.send({'functype': 'asyncfunc', 'cid': cid})
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
try:
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err:
tb = err.__traceback__
raise
# await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: if cs.cancelled_caught:
# if cs.cancel_called:
if cs.cancel_called:
msg = (
f'{fname} was remotely cancelled by its caller '
f'{ctx.chan.uid}'
)
else:
msg = f'{fname} cancelled itself'
raise ContextCancelled(
msg,
suberror_type=trio.Cancelled,
)
except (
Exception,
trio.MultiError,
# trio.Cancelled,
) as err:
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
@ -243,10 +269,11 @@ async def _invoke(
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
is_complete.set() is_complete.set()
except KeyError: except KeyError:
if is_rpc:
# If we're cancelled before the task returns then the # If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
log.warning( log.warning(
f"Task {func} likely errored or cancelled before it started") f"Task {func} likely errored or cancelled before start")
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed") log.runtime("All RPC tasks have completed")
@ -280,6 +307,9 @@ class Actor:
_parent_main_data: Dict[str, str] _parent_main_data: Dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None _parent_chan_cs: Optional[trio.CancelScope] = None
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
def __init__( def __init__(
self, self,
name: str, name: str,
@ -353,6 +383,14 @@ class Actor:
Tuple[Any, Any, Any, Any, Any]] = None Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def wait_for_peer( async def wait_for_peer(
self, uid: Tuple[str, str] self, uid: Tuple[str, str]
) -> Tuple[trio.Event, Channel]: ) -> Tuple[trio.Event, Channel]:
@ -503,8 +541,8 @@ class Actor:
log.runtime(f"Peers is {self._peers}") log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected if not self._peers: # no more channels connected
self._no_more_peers.set()
log.runtime("Signalling no more peer channels") log.runtime("Signalling no more peer channels")
self._no_more_peers.set()
# # XXX: is this necessary (GC should do it?) # # XXX: is this necessary (GC should do it?)
if chan.connected(): if chan.connected():
@ -538,7 +576,8 @@ class Actor:
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
assert send_chan.cid == cid # type: ignore assert send_chan.cid == cid # type: ignore
# if 'error' in msg: if 'error' in msg:
recv_chan
# ctx = getattr(recv_chan, '_ctx', None) # ctx = getattr(recv_chan, '_ctx', None)
# if ctx: # if ctx:
# ctx._error_from_remote_msg(msg) # ctx._error_from_remote_msg(msg)
@ -613,6 +652,7 @@ class Actor:
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
msg = None msg = None
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
uid = chan.uid
log.runtime(f"Entering msg loop for {chan} from {chan.uid}") log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
@ -636,7 +676,7 @@ class Actor:
log.runtime( log.runtime(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {uid}")
break break
@ -671,16 +711,47 @@ class Actor:
f"{ns}.{funcname}({kwargs})") f"{ns}.{funcname}({kwargs})")
if ns == 'self': if ns == 'self':
func = getattr(self, funcname) func = getattr(self, funcname)
if funcname == 'cancel':
# self.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
log.cancel(
f"{self.uid} remote cancel msg from {uid}")
# don't start entire actor runtime
# cancellation if this actor is in debug
# mode
pdb_complete = _debug._local_pdb_complete
if pdb_complete:
log.cancel(
f'{self.uid} is in debug, wait for unlock')
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
await _invoke(
self, cid, chan, func, kwargs, is_rpc=False
)
loop_cs.cancel()
continue
if funcname == '_cancel_task': if funcname == '_cancel_task':
# XXX: a special case is made here for task_cid = kwargs['cid']
# remote calls since we don't want the log.cancel(
# remote actor have to know which channel f'Actor {uid} requests cancel for {task_cid}')
# the task is associated with and we can't
# pass non-primitive types between actors. # we immediately start the runtime machinery
# This means you can use: # shutdown
# Portal.run('self', '_cancel_task, cid=did) with trio.CancelScope(shield=True):
# without passing the `chan` arg.
kwargs['chan'] = chan kwargs['chan'] = chan
await _invoke(
self, cid, chan, func, kwargs, is_rpc=False
)
continue
else: else:
# complain to client about restricted modules # complain to client about restricted modules
try: try:
@ -699,14 +770,15 @@ class Actor:
partial(_invoke, self, cid, chan, func, kwargs), partial(_invoke, self, cid, chan, func, kwargs),
name=funcname, name=funcname,
) )
except RuntimeError: except (RuntimeError, trio.MultiError):
# avoid reporting a benign race condition # avoid reporting a benign race condition
# during actor runtime teardown. # during actor runtime teardown.
nursery_cancelled_before_task = True nursery_cancelled_before_task = True
break
# never allow cancelling cancel requests (results in # never allow cancelling cancel requests (results in
# deadlock and other weird behaviour) # deadlock and other weird behaviour)
if func != self.cancel: # if func != self.cancel:
if isinstance(cs, Exception): if isinstance(cs, Exception):
log.warning( log.warning(
f"Task for RPC func {func} failed with" f"Task for RPC func {func} failed with"
@ -719,20 +791,12 @@ class Actor:
# cancelled gracefully if requested # cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = ( self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event()) cs, func, trio.Event())
else:
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.warning(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel()
break
log.runtime( log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
else:
# channel disconnect # end of async for, channel disconnect vis
# ``trio.EndOfChannel``
log.runtime( log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks" f"{chan} for {chan.uid} disconnected, cancelling tasks"
) )
@ -947,6 +1011,9 @@ class Actor:
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err: except Exception as err:
log.info("Closing all actor lifetime contexts")
_lifetime_stack.close()
if not registered_with_arbiter: if not registered_with_arbiter:
# TODO: I guess we could try to connect back # TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger # to the parent through a channel and engage a debugger
@ -976,11 +1043,21 @@ class Actor:
raise raise
finally: finally:
log.runtime("Root nursery complete") log.runtime("root runtime nursery complete")
# tear down all lifetime contexts if not in guest mode # tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint? # XXX: should this just be in the entrypoint?
log.cancel("Closing all actor lifetime contexts") log.info("Closing all actor lifetime contexts")
# TODO: we can't actually do this bc the debugger
# uses the _service_n to spawn the lock task, BUT,
# in theory if we had the root nursery surround this finally
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if self.name == 'brokerd.ib':
# with trio.CancelScope(shield=True):
# await _debug.breakpoint()
_lifetime_stack.close() _lifetime_stack.close()
# Unregister actor from the arbiter # Unregister actor from the arbiter
@ -1065,7 +1142,7 @@ class Actor:
self._service_n.start_soon(self.cancel) self._service_n.start_soon(self.cancel)
async def cancel(self) -> bool: async def cancel(self) -> bool:
"""Cancel this actor. """Cancel this actor's runtime.
The "deterministic" teardown sequence in order is: The "deterministic" teardown sequence in order is:
- cancel all ongoing rpc tasks by cancel scope - cancel all ongoing rpc tasks by cancel scope
@ -1099,7 +1176,7 @@ class Actor:
if self._service_n: if self._service_n:
self._service_n.cancel_scope.cancel() self._service_n.cancel_scope.cancel()
log.cancel(f"{self.uid} was sucessfullly cancelled") log.cancel(f"{self.uid} called `Actor.cancel()`")
self._cancel_complete.set() self._cancel_complete.set()
return True return True
@ -1158,15 +1235,21 @@ class Actor:
registered for each. registered for each.
""" """
tasks = self._rpc_tasks tasks = self._rpc_tasks
if tasks:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid) in tasks.copy(): for (
(chan, cid),
(scope, func, is_complete),
) in tasks.copy().items():
if only_chan is not None: if only_chan is not None:
if only_chan != chan: if only_chan != chan:
continue continue
# TODO: this should really done in a nursery batch # TODO: this should really done in a nursery batch
if func != self._cancel_task:
await self._cancel_task(cid, chan) await self._cancel_task(cid, chan)
if tasks:
log.cancel( log.cancel(
f"Waiting for remaining rpc tasks to complete {tasks}") f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
@ -1216,6 +1299,9 @@ class Actor:
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid return uid
def is_infected_aio(self) -> bool:
return self._infected_aio
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who knows all the other actors and always has

View File

@ -19,12 +19,15 @@ def parse_ipaddr(arg):
return (str(host), int(port)) return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid) parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str) parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr) parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args() args = parser.parse_args()
subactor = Actor( subactor = Actor(
@ -36,5 +39,6 @@ if __name__ == "__main__":
_trio_main( _trio_main(
subactor, subactor,
parent_addr=args.parent_addr parent_addr=args.parent_addr,
infect_asyncio=args.asyncio,
) )

View File

@ -0,0 +1,50 @@
'''
Actor cluster helpers.
'''
from contextlib import asynccontextmanager as acm
from multiprocessing import cpu_count
from typing import AsyncGenerator, Optional
import trio
import tractor
@acm
async def open_actor_cluster(
modules: list[str],
count: int = cpu_count(),
names: Optional[list[str]] = None,
) -> AsyncGenerator[
list[str],
dict[str, tractor.Portal]
]:
portals: dict[str, tractor.Portal] = {}
uid = tractor.current_actor().uid
if not names:
suffix = '_'.join(uid)
names = [f'worker_{i}.' + suffix for i in range(count)]
if not len(names) == count:
raise ValueError(
'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery() as an:
async with trio.open_nursery() as n:
for index, key in zip(range(count), names):
async def start(i) -> None:
key = f'worker_{i}.' + '_'.join(uid)
portals[key] = await an.start_actor(
enable_modules=modules,
name=key,
)
n.start_soon(start, index)
assert len(portals) == count
yield portals

View File

@ -2,6 +2,7 @@
Multi-core debugging for da peeps! Multi-core debugging for da peeps!
""" """
from __future__ import annotations
import bdb import bdb
import sys import sys
from functools import partial from functools import partial
@ -20,12 +21,14 @@ from ._exceptions import is_multi_cancelled
try: try:
# wtf: only exported when installed in dev mode? # wtf: only exported when installed in dev mode?
import pdbpp import pdbpp
except ImportError: except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff # pdbpp is installed in regular mode...it monkey patches stuff
import pdb import pdb
assert pdb.xpm, "pdbpp is not installed?" # type: ignore assert pdb.xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb pdbpp = pdb
log = get_logger(__name__) log = get_logger(__name__)
@ -85,6 +88,23 @@ class PdbwTeardown(pdbpp.Pdb):
_pdb_release_hook() _pdb_release_hook()
def _mk_pdb() -> PdbwTeardown:
# XXX: setting these flags on the pdb instance are absolutely
# critical to having ctrl-c work in the ``trio`` standard way! The
# stdlib's pdb supports entering the current sync frame on a SIGINT,
# with ``trio`` we pretty much never want this and if we did we can
# handle it in the ``tractor`` task runtime.
# global pdb
pdb = PdbwTeardown()
pdb.nosigint = True
pdb.allow_kbdint = True
opts = (allow_kbdint, nosigint) = pdb.allow_kbdint, pdb.nosigint
print(f'`pdbp` was configured with {opts}')
return pdb
# TODO: will be needed whenever we get to true remote debugging. # TODO: will be needed whenever we get to true remote debugging.
# XXX see https://github.com/goodboy/tractor/issues/130 # XXX see https://github.com/goodboy/tractor/issues/130
@ -219,7 +239,8 @@ async def _hijack_stdin_for_child(
subactor_uid: Tuple[str, str] subactor_uid: Tuple[str, str]
) -> str: ) -> str:
'''Hijack the tty in the root process of an actor tree such that '''
Hijack the tty in the root process of an actor tree such that
the pdbpp debugger console can be allocated to a sub-actor for repl the pdbpp debugger console can be allocated to a sub-actor for repl
bossing. bossing.
@ -254,6 +275,8 @@ async def _hijack_stdin_for_child(
# assert await stream.receive() == 'pdb_unlock' # assert await stream.receive() == 'pdb_unlock'
except ( except (
# BaseException,
trio.MultiError,
trio.BrokenResourceError, trio.BrokenResourceError,
trio.Cancelled, # by local cancellation trio.Cancelled, # by local cancellation
trio.ClosedResourceError, # by self._rx_chan trio.ClosedResourceError, # by self._rx_chan
@ -268,8 +291,9 @@ async def _hijack_stdin_for_child(
if isinstance(err, trio.Cancelled): if isinstance(err, trio.Cancelled):
raise raise
finally:
log.debug(f"TTY lock released, remote task: {task_name}:{subactor_uid}") log.pdb("TTY lock released, remote task:"
f"{task_name}:{subactor_uid}")
return "pdb_unlock_complete" return "pdb_unlock_complete"
@ -326,7 +350,7 @@ async def _breakpoint(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
log.error('opened stream') log.debug('opened stream')
# unblock local caller # unblock local caller
task_status.started() task_status.started()
@ -343,6 +367,7 @@ async def _breakpoint(
except tractor.ContextCancelled: except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock') log.warning('Root actor cancelled debug lock')
raise
finally: finally:
log.debug(f"Exiting debugger for actor {actor}") log.debug(f"Exiting debugger for actor {actor}")
@ -407,11 +432,14 @@ async def _breakpoint(
'Root actor attempting to shield-acquire active tty lock' 'Root actor attempting to shield-acquire active tty lock'
f' owned by {_global_actor_in_debug}') f' owned by {_global_actor_in_debug}')
with trio.CancelScope(shield=True): stats = _debug_lock.statistics()
if stats.owner:
print(f'LOCK STATS: {stats}')
# must shield here to avoid hitting a ``Cancelled`` and # must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty # a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True):
await _debug_lock.acquire() await _debug_lock.acquire()
else: else:
# may be cancelled # may be cancelled
await _debug_lock.acquire() await _debug_lock.acquire()
@ -437,21 +465,6 @@ async def _breakpoint(
debug_func(actor) debug_func(actor)
def _mk_pdb() -> PdbwTeardown:
# XXX: setting these flags on the pdb instance are absolutely
# critical to having ctrl-c work in the ``trio`` standard way! The
# stdlib's pdb supports entering the current sync frame on a SIGINT,
# with ``trio`` we pretty much never want this and if we did we can
# handle it in the ``tractor`` task runtime.
pdb = PdbwTeardown()
pdb.allow_kbdint = True
pdb.nosigint = True
return pdb
def _set_trace(actor=None): def _set_trace(actor=None):
pdb = _mk_pdb() pdb = _mk_pdb()
@ -524,3 +537,57 @@ async def _maybe_enter_pm(err):
else: else:
return False return False
async def maybe_wait_for_debugger() -> None:
global _no_remote_has_tty, _global_actor_in_debug
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
if (
_state.debug_mode() and
is_root_process()
):
# TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be
# scheduled and grab the tty lock on the next
# tick?
# await trio.testing.wait_all_tasks_blocked()
sub_in_debug = None
if _global_actor_in_debug:
sub_in_debug = tuple(_global_actor_in_debug)
for _ in range(1):
with trio.CancelScope(shield=True):
log.pdb(
'Polling for debug lock'
)
await trio.sleep(0.01)
debug_complete = _no_remote_has_tty
if (
(debug_complete and
not debug_complete.is_set())
):
log.warning(
'Root has errored but pdb is in use by '
f'child {sub_in_debug}\n'
'Waiting on tty lock to release..')
await debug_complete.wait()
await trio.sleep(0.01)
continue
else:
log.warning(
'Root acquired DEBUGGER'
)
return

View File

@ -9,6 +9,7 @@ import trio # type: ignore
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from . import _state from . import _state
from .to_asyncio import run_as_asyncio_guest
log = get_logger(__name__) log = get_logger(__name__)
@ -20,6 +21,7 @@ def _mp_main(
forkserver_info: Tuple[Any, Any, Any, Any, Any], forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str, start_method: str,
parent_addr: Tuple[str, int] = None, parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None: ) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run`` """The routine called *after fork* which invokes a fresh ``trio.run``
""" """
@ -45,6 +47,10 @@ def _mp_main(
parent_addr=parent_addr parent_addr=parent_addr
) )
try: try:
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main) trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
pass # handle it the same way trio does? pass # handle it the same way trio does?
@ -57,15 +63,17 @@ def _trio_main(
actor: 'Actor', # type: ignore actor: 'Actor', # type: ignore
*, *,
parent_addr: Tuple[str, int] = None, parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor.
""" """
# Disable sigint handling in children; Entry point for a `trio_run_in_process` subactor.
# we don't need it thanks to our cancellation machinery.
# signal.signal(signal.SIGINT, signal.SIG_IGN)
"""
log.info(f"Started new trio process for {actor.uid}") log.info(f"Started new trio process for {actor.uid}")
# Disable sigint handling in children?
# signal.signal(signal.SIGINT, signal.SIG_IGN)
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -83,6 +91,10 @@ def _trio_main(
) )
try: try:
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main) trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI") log.warning(f"Actor {actor.uid} received KBI")

View File

@ -84,6 +84,15 @@ class Portal:
] = None ] = None
self._streams: Set[ReceiveMsgStream] = set() self._streams: Set[ReceiveMsgStream] = set()
self.actor = current_actor() self.actor = current_actor()
self._cancel_called: bool = False
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def _submit( async def _submit(
self, self,
@ -129,6 +138,7 @@ class Portal:
resptype: str, resptype: str,
first_msg: dict first_msg: dict
) -> Any: ) -> Any:
# __tracebackhide__ = True
assert resptype == 'asyncfunc' # single response assert resptype == 'asyncfunc' # single response
msg = await recv_chan.receive() msg = await recv_chan.receive()
@ -140,8 +150,11 @@ class Portal:
raise unpack_error(msg, self.channel) raise unpack_error(msg, self.channel)
async def result(self) -> Any: async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task.
""" """
Return the result(s) from the remote actor's "main" task.
"""
# __tracebackhide__ = True
# Check for non-rpc errors slapped on the # Check for non-rpc errors slapped on the
# channel for which we always raise # channel for which we always raise
exc = self.channel._exc exc = self.channel._exc
@ -193,9 +206,16 @@ class Portal:
# we'll need to .aclose all those channels here # we'll need to .aclose all those channels here
await self._cancel_streams() await self._cancel_streams()
async def cancel_actor(self): async def cancel_actor(self) -> None:
"""Cancel the actor on the other end of this portal. '''
""" Cancel the actor on the other end of this portal.
That means cancelling the "actor runtime" not just any one
task that's running there.
'''
self._cancel_called = True
if not self.channel.connected(): if not self.channel.connected():
log.cancel("This portal is already closed can't cancel") log.cancel("This portal is already closed can't cancel")
return False return False
@ -203,8 +223,8 @@ class Portal:
await self._cancel_streams() await self._cancel_streams()
log.cancel( log.cancel(
f"Sending actor cancel request to {self.channel.uid} on " f"Sending runtime cancel msg to {self.channel.uid} @ "
f"{self.channel}") f"{self.channel.raddr}")
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield # XXX: sure would be nice to make this work with a proper shield

View File

@ -238,7 +238,7 @@ def run(
def run_daemon( def run_daemon(
rpc_module_paths: List[str], enable_modules: List[str],
**kwargs **kwargs
) -> None: ) -> None:
"""Spawn daemon actor which will respond to RPC. """Spawn daemon actor which will respond to RPC.
@ -247,9 +247,9 @@ def run_daemon(
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests. is meant to run forever responding to RPC requests.
""" """
kwargs['rpc_module_paths'] = list(rpc_module_paths) kwargs['enable_modules'] = list(enable_modules)
for path in rpc_module_paths: for path in enable_modules:
importlib.import_module(path) importlib.import_module(path)
return run(partial(trio.sleep, float('inf')), **kwargs) return run(partial(trio.sleep, float('inf')), **kwargs)

View File

@ -1,6 +1,7 @@
""" """
Machinery for actor process spawning using multiple backends. Machinery for actor process spawning using multiple backends.
""" """
from __future__ import annotations
import sys import sys
import multiprocessing as mp import multiprocessing as mp
import platform import platform
@ -8,7 +9,6 @@ from typing import Any, Dict, Optional
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from async_generator import asynccontextmanager
try: try:
from multiprocessing import semaphore_tracker # type: ignore from multiprocessing import semaphore_tracker # type: ignore
@ -31,7 +31,12 @@ from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor from ._actor import Actor
from ._entry import _mp_main from ._entry import _mp_main
from ._exceptions import ActorFailure from ._exceptions import (
ActorFailure,
RemoteActorError,
ContextCancelled,
)
from ._debug import maybe_wait_for_debugger, breakpoint
log = get_logger('tractor') log = get_logger('tractor')
@ -90,95 +95,173 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
return _ctx return _ctx
async def exhaust_portal( async def result_from_portal(
portal: Portal,
actor: Actor
) -> Any:
"""Pull final result from portal (assuming it has one).
If the main task is an async generator do our best to consume
what's left of it.
"""
try:
log.debug(f"Waiting on final result from {actor.uid}")
# XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api
final = await portal.result()
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
except trio.Cancelled as err:
# lol, of course we need this too ;P
# TODO: merge with above?
log.warning(f"Cancelled result waiter for {portal.actor.uid}")
return err
else:
log.debug(f"Returning final result: {final}")
return final
async def cancel_on_completion(
portal: Portal, portal: Portal,
actor: Actor, actor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: Dict[Tuple[str, str], Exception],
cancel_on_result: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's ) -> tuple[Optional[Any], Optional[BaseException]]:
"""
Cancel actor gracefully once it's "main" portal's
result arrives. result arrives.
Should only be called for actors spawned with `run_in_actor()`. Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
"""
# __tracebackhide__ = True
uid = portal.channel.uid
remote_result = None
is_remote_result = None
# cancel control is explicityl done by the caller
with trio.CancelScope() as cs:
task_status.started(cs) task_status.started(cs)
# if this call errors we store the exception for later # if this call errors we store the exception for later
# in ``errors`` which will be reraised inside # in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request # a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor) # result = await exhaust_portal(portal, actor)
if isinstance(result, Exception): try:
errors[actor.uid] = result log.info(f"Waiting on final result from {actor.uid}")
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)
else: # XXX: streams should never be reaped here since they should
log.runtime( # always be established and shutdown using a context manager api
f"Cancelling {portal.channel.uid} gracefully " result = await portal.result()
f"after result {result}") is_remote_result = True
log.info(f"Returning final result: {result}")
# cancel the process now that we have a final result except RemoteActorError as rerr:
await portal.cancel_actor() # this includes real remote errors as well as
# `ContextCancelled`
is_remote_result = True
result = rerr
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
is_remote_result = False
result = err
# errors[actor.uid] = err
# raise
if cs.cancelled_caught:
log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
return result, is_remote_result
# except trio.Cancelled as err:
# # lol, of course we need this too ;P
# # TODO: merge with above?
# log.warning(f"Cancelled `Portal.result()` waiter for {uid}")
# result = err
# # errors[actor.uid] = err
# raise
# return result
async def do_hard_kill( async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
timeout: float,
) -> None: ) -> None:
'''
Hard kill a process with timeout.
'''
log.debug(f"Hard killing {proc}")
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as # never release until the process exits, now it acts as
# a hard-kill time ultimatum. # a hard-kill time ultimatum.
with trio.move_on_after(3) as cs: with trio.move_on_after(timeout) as cs:
# NOTE: This ``__aexit__()`` shields internally. # NOTE: This ``__aexit__()`` shields internally and originally
async with proc: # calls ``trio.Process.aclose()`` # would tear down stdstreams via ``trio.Process.aclose()``.
async with proc:
log.debug(f"Terminating {proc}") log.debug(f"Terminating {proc}")
if cs.cancelled_caught: if cs.cancelled_caught:
# this is a "softer" kill that we should probably use
# eventually and let the zombie lord do the `.kill()`
# proc.terminate()
# XXX: should pretty much never get here unless we have # XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and # to move the bits from ``proc.__aexit__()`` out and
# into here. # into here.
log.critical(f"HARD KILLING {proc}") log.critical(f"{timeout} timeout, HARD KILLING {proc}")
proc.kill() proc.kill()
@asynccontextmanager async def reap_proc(
async def spawn_subactor(
subactor: 'Actor', proc: trio.Process,
uid: tuple[str, str],
terminate_after: Optional[float] = None,
hard_kill_after: int = 0.1,
) -> None:
with trio.move_on_after(terminate_after or float('inf')) as cs:
# Wait for proc termination but **dont' yet** do
# any out-of-ipc-land termination / process
# killing. This is a "light" (cancellable) join,
# the hard join is below after timeout
await proc.wait()
log.info(f'Proc for {uid} terminated gracefully')
if cs.cancelled_caught and terminate_after is not float('inf'):
# Always "hard" join lingering sub procs since no
# actor zombies are allowed!
log.warning(
# f'Failed to gracefully terminate {subactor.uid}')
f'Failed to gracefully terminate {proc}\n'
f"Attempting to hard kill {proc}")
with trio.CancelScope(shield=True):
# XXX: do this **after**
# cancellation/tearfown to avoid killing the
# process too early since trio does this
# internally on ``__aexit__()``
await do_hard_kill(proc, hard_kill_after)
async def new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor,
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
): _runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
graceful_kill_timeout: int = 3,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""
Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
uid = subactor.uid
if _spawn_method == 'trio':
spawn_cmd = [ spawn_cmd = [
sys.executable, sys.executable,
"-m", "-m",
@ -190,7 +273,7 @@ async def spawn_subactor(
# the OS; it otherwise can be passed via the parent channel if # the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy). # we prefer in the future (for privacy).
"--uid", "--uid",
str(subactor.uid), str(uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
str(parent_addr) str(parent_addr)
@ -202,57 +285,46 @@ async def spawn_subactor(
subactor.loglevel subactor.loglevel
] ]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
proc = await trio.open_process(spawn_cmd) proc = await trio.open_process(spawn_cmd)
log.info(f"Started {proc}")
portal: Optional[Portal] = None
# handle cancellation during child connect-back, kill
# any cancelled spawn sequence immediately.
try: try:
yield proc
finally:
log.runtime(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early
# since trio does this internally on ``__aexit__()``
await do_hard_kill(proc)
async def new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor,
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
cancel_scope = None
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
if _spawn_method == 'trio':
async with trio.open_nursery() as nursery:
async with spawn_subactor(
subactor,
parent_addr,
) as proc:
log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid) subactor.uid)
except trio.Cancelled:
# reap un-contacted process which are started
# but never setup a connection to parent.
log.warning(f'Spawning aborted due to cancel {proc}')
with trio.CancelScope(shield=True):
await do_hard_kill(proc, 0.1)
# TODO: should we have a custom error for this maybe derived
# from ``subprocess``?
raise
# the child successfully connected back to us.
actor_nursery_cancel_called = None
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = ( actor_nursery._children[subactor.uid] = (
subactor, proc, portal) subactor, proc, portal)
# track child in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
try:
# send additional init params # send additional init params
await chan.send({ await chan.send({
"_parent_main_data": subactor._parent_main_data, "_parent_main_data": subactor._parent_main_data,
@ -263,54 +335,266 @@ async def new_proc(
"_runtime_vars": _runtime_vars, "_runtime_vars": _runtime_vars,
}) })
# track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up # resume caller at next checkpoint now that child is up
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # this either completes or is cancelled and should only
# **and always** be set once the actor nursery has errored
# or exitted.
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait() await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit: except (
cancel_scope = await nursery.start( BaseException
cancel_on_completion, # trio.Cancelled,
# KeyboardInterrupt,
# trio.MultiError,
# RuntimeError,
) as cerr:
log.exception(f'Relaying unexpected {cerr} to nursery')
await breakpoint()
# sending IPC-msg level cancel requests is expected to be
# managed by the nursery.
with trio.CancelScope(shield=True):
await actor_nursery._handle_err(err, portal=portal)
if portal.channel.connected():
if ria:
# this may raise which we want right?
await result_from_portal(
portal, portal,
subactor, subactor,
errors errors,
# True, # cancel_on_result
) )
# Wait for proc termination but **dont' yet** call # Graceful reap attempt - 2 cases:
# ``trio.Process.__aexit__()`` (it tears down stdio # - actor nursery was cancelled in which case
# which will kill any waiting remote pdb trace). # we want to try a soft reap of the actor via
# ipc cancellation and then failing that do a hard
# reap.
# - this is normal termination and we must wait indefinitely
# for ria to return and daemon actors to be cancelled
reaping_cancelled: bool = False
ria = portal in actor_nursery._cancel_after_result_on_exit
result = None
# TODO: No idea how we can enforce zombie # this is the soft reap sequence. we can
# reaping more stringently without the shield # either collect results:
# we used to have below... # - ria actors get them them via ``Portal.result()``
# - we wait forever on daemon actors until they're
# cancelled by user code via ``Portal.cancel_actor()``
# or ``ActorNursery.cancel(). in the latter case
# we have to expect another cancel here since
# the task spawning nurseries will both be cacelled
# by ``ActorNursery.cancel()``.
# with trio.CancelScope(shield=True): # OR, we're cancelled while collecting results, which
# async with proc: # case we need to try another soft cancel and reap attempt.
try:
log.cancel(f'Starting soft actor reap for {uid}')
cancel_scope = None
reap_timeout = None
# Always "hard" join sub procs since no actor zombies if portal.channel.connected() and ria:
# are allowed!
# this is a "light" (cancellable) join, the hard join is result, is_remote = await result_from_portal(
# in the enclosing scope (see above). portal,
await proc.wait() subactor,
errors,
# True, # cancel_on_result
)
if is_remote:
if isinstance(result, RemoteActorError):
# errors[actor.uid] = result
if (
portal.cancel_called and
isinstance(result, ContextCancelled)
):
log.cancel(f'{uid} received expected cancel')
errors[uid] = result
log.debug(f"Joined {proc}") # fall through to below soft proc reap
# pop child entry to indicate we no longer managing this subactor reap_timeout = 0.5
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# cancel result waiter that may have been spawned in else:
# tandem if not done already
if cancel_scope:
log.warning( log.warning(
"Cancelling existing result waiter task for " f"Cancelling single-task-run {uid} after remote error {result}"
f"{subactor.uid}") )
cancel_scope.cancel()
# likely a real remote error propagation
# so pass up to nursery strat
should_raise = await actor_nursery._handle_err(
result,
portal=portal,
)
# propagate up to spawn nursery to be
# grouped into any multierror.
# if should_raise:
# raise result
else:
log.runtime(
f"Cancelling {uid} gracefully "
f"after one-time-task result {result}")
# an actor that was `.run_in_actor()` executes a single task
# and delivers the result, then we cancel it.
# TODO: likely in the future we should just implement this using
# the new `open_context()` IPC api, since it's the more general
# api and can represent this form.
# XXX: do we need this?
# await maybe_wait_for_debugger()
await portal.cancel_actor()
else:
log.exception(
f"Cancelling single-task-run {uid} after local error"
)
raise result
# soft & cancellable
await reap_proc(proc, uid, terminate_after=reap_timeout)
# except (
# ContextCancelled,
# ) as err:
# if portal.cancel_called:
# log.cancel('{uid} received expected cancel')
# # soft & cancellable
# await reap_proc(proc, uid, terminate_after=0.1)
# except (
# RemoteActorError,
# ) as err:
# reaping_cancelled = err
# log.exception(f'{uid} remote error')
# await actor_nursery._handle_err(err, portal=portal)
except (
trio.Cancelled,
) as err:
# NOTE: for now we pack the cancelleds and expect the actor
# nursery to re-raise them in a multierror but we could
# have also let them bubble up through the spawn nursery.
# in theory it's more correct to raise any
# `ContextCancelled` errors we get back from the
# `Portal.cancel_actor()` call and in that error
# have meta-data about whether we timeout out or
# actually got a cancel message back from the remote task.
# IF INSTEAD we raised *here* then this logic has to be
# handled inside the oca supervisor block and the spawn_n
# task cancelleds would have to be replaced with the remote
# task `ContextCancelled`s, *if* they ever arrive.
errors[uid] = err
# with trio.CancelScope(shield=True):
# await breakpoint()
if actor_nursery.cancel_called:
log.cancel(f'{uid} soft reap cancelled by nursery')
else:
if not actor_nursery._spawn_n.cancel_scope.cancel_called:
# this would be pretty weird and unexpected
await breakpoint()
# actor nursery wasn't cancelled before the spawn
# nursery was which likely means that there was
# an error in the actor nursery enter and the
# spawn nursery cancellation "beat" the call to
# .cancel()? that's a bug right?
# saw this with settings bugs in the ordermode pane in
# piker.
log.exception(f'{uid} soft wait error?')
raise RuntimeError(
'Task spawn nursery cancelled before actor nursery?')
finally:
if reaping_cancelled:
assert actor_nursery.cancel_called
if actor_nursery.cancelled:
log.cancel(f'Nursery cancelled during soft wait for {uid}')
with trio.CancelScope(shield=True):
await maybe_wait_for_debugger()
# XXX: we should probably just
# check for a `ContextCancelled` on portals
# here and fill them in over `trio.Cancelled` right?
# hard reap sequence with timeouts
if proc.poll() is None:
log.cancel(f'Attempting hard reap for {uid}')
with trio.CancelScope(shield=True):
# hard reap sequence
# ``Portal.cancel_actor()`` is expected to have
# been called by the supervising nursery so we
# do **not** call it here.
await reap_proc(
proc,
uid,
# this is the same as previous timeout
# setting before rewriting this spawn
# section
terminate_after=3,
)
# if somehow the hard reap didn't collect the child then
# we send in the big gunz.
while proc.poll() is None:
log.critical(
f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n'
f'{proc}'
)
with trio.CancelScope(shield=True):
await reap_proc(
proc,
uid,
terminate_after=0.1,
)
log.info(f"Joined {proc}")
# 2 cases:
# - the actor terminated gracefully
# - we're cancelled and likely need to re-raise
# pop child entry to indicate we no longer managing this
# subactor
subactor, proc, portal = actor_nursery._children.pop(
subactor.uid)
if not actor_nursery._children:
# all subactor children have completed
log.cancel(f"{uid} reports all children complete!")
actor_nursery._all_children_reaped.set()
spawn_n = actor_nursery._spawn_n
# with trio.CancelScope(shield=True):
# await breakpoint()
if not spawn_n._closed:
# the parent task that opened the actor nursery
# hasn't yet closed it so we cancel that task now.
spawn_n.cancel_scope.cancel()
# not entirely sure why we need this.. but without it
# the reaping cancelled error is never reported upwards
# to the spawn nursery?
# if reaping_cancelled:
# raise reaping_cancelled
else: else:
# `multiprocessing` # `multiprocessing`
# async with trio.open_nursery() as nursery: # async with trio.open_nursery() as nursery:
@ -323,6 +607,7 @@ async def new_proc(
bind_addr=bind_addr, bind_addr=bind_addr,
parent_addr=parent_addr, parent_addr=parent_addr,
_runtime_vars=_runtime_vars, _runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status, task_status=task_status,
) )
@ -338,6 +623,7 @@ async def mp_new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
@ -383,6 +669,7 @@ async def mp_new_proc(
fs_info, fs_info,
start_method, start_method,
parent_addr, parent_addr,
infect_asyncio,
), ),
# daemon=True, # daemon=True,
name=name, name=name,
@ -420,6 +707,10 @@ async def mp_new_proc(
# while user code is still doing it's thing. Only after the # while user code is still doing it's thing. Only after the
# nursery block closes do we allow subactor results to be # nursery block closes do we allow subactor results to be
# awaited and reported upwards to the supervisor. # awaited and reported upwards to the supervisor.
# no shield is required here (vs. above on the trio backend)
# since debug mode is not supported on mp.
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait() await actor_nursery._join_procs.wait()
finally: finally:
@ -435,13 +726,23 @@ async def mp_new_proc(
try: try:
# async with trio.open_nursery() as n: # async with trio.open_nursery() as n:
# n.cancel_scope.shield = True # n.cancel_scope.shield = True
cancel_scope = await nursery.start( print('soft mp reap')
cancel_on_completion, # cancel_scope = await nursery.start(
result = await result_from_portal(
portal, portal,
subactor, subactor,
errors errors,
# True,
) )
except trio.Cancelled as err:
# except trio.Cancelled as err:
except BaseException as err:
log.exception('hard mp reap')
with trio.CancelScope(shield=True):
await actor_nursery._handle_err(err, portal=portal)
print('sent to nursery')
cancel_exc = err cancel_exc = err
# if the reaping task was cancelled we may have hit # if the reaping task was cancelled we may have hit
@ -451,23 +752,34 @@ async def mp_new_proc(
reaping_cancelled = True reaping_cancelled = True
if proc.is_alive(): if proc.is_alive():
with trio.CancelScope(shield=True):
print('hard reaping')
with trio.move_on_after(0.1) as cs: with trio.move_on_after(0.1) as cs:
cs.shield = True cs.shield = True
await proc_waiter(proc) await proc_waiter(proc)
if cs.cancelled_caught: if cs.cancelled_caught:
print('pwning mp proc')
proc.terminate() proc.terminate()
finally:
if not reaping_cancelled and proc.is_alive(): # if not reaping_cancelled and proc.is_alive():
await proc_waiter(proc) # await proc_waiter(proc)
# TODO: timeout block here? # TODO: timeout block here?
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor # pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid) subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
if not actor_nursery._children:
# all subactor children have completed
# log.cancel(f"{uid} reports all children complete!")
actor_nursery._all_children_reaped.set()
# cancel result waiter that may have been spawned in # cancel result waiter that may have been spawned in
# tandem if not done already # tandem if not done already
if cancel_scope: if cancel_scope:
@ -476,6 +788,7 @@ async def mp_new_proc(
f"{subactor.uid}") f"{subactor.uid}")
cancel_scope.cancel() cancel_scope.cancel()
elif reaping_cancelled: # let the cancellation bubble up if reaping_cancelled: # let the cancellation bubble up
print('raising')
assert cancel_exc assert cancel_exc
raise cancel_exc raise cancel_exc

View File

@ -19,8 +19,8 @@ import trio
from ._ipc import Channel from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor from ._state import current_actor
from ._broadcast import broadcast_receiver, BroadcastReceiver
from .log import get_logger from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver
log = get_logger(__name__) log = get_logger(__name__)

View File

@ -0,0 +1,457 @@
"""
``trio`` inspired apis and helpers
"""
from functools import partial
import inspect
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional
import typing
import warnings
import trio
from async_generator import asynccontextmanager
from . import _debug
from ._debug import maybe_wait_for_debugger, breakpoint
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
from ._exceptions import is_multi_cancelled
from ._root import open_root_actor
from . import _state
from . import _spawn
log = get_logger(__name__)
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(
self,
actor: Actor,
spawn_nursery: trio.Nursery,
errors: Dict[Tuple[str, str], Exception],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
self._spawn_n = spawn_nursery
self._children: Dict[
Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]]
] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
self._cancel_called: bool = False
self._join_procs = trio.Event()
self._all_children_reaped = trio.Event()
self.errors = errors
@property
def cancel_called(self) -> bool:
'''
Same principle as ``trio.CancelScope.cancel_called``.
'''
return self._cancel_called
async def start_actor(
self,
name: str,
*,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: List[str] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
infect_asyncio: bool = False,
debug_mode: Optional[bool] = None,
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
# allow setting debug policy per actor
if debug_mode is not None:
_rtv['_debug_mode'] = debug_mode
enable_modules = enable_modules or []
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
subactor = Actor(
name,
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
)
parent_addr = self._actor.accept_addr
assert parent_addr
# start a task to spawn a process
# blocks until process has been started and a portal setup
nursery = nursery or self._spawn_n
# XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore
partial(
_spawn.new_proc,
name,
self,
subactor,
self.errors,
bind_addr,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
)
)
async def run_in_actor(
self,
fn: typing.Callable,
*,
name: Optional[str] = None,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[List[str]] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn``
) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and
return its result.
Actors spawned using this method are kept alive at nursery teardown
until the task spawned by executing ``fn`` completes at which point
the actor is terminated.
"""
mod_path = fn.__module__
if name is None:
# use the explicit function name if not provided
name = fn.__name__
portal = await self.start_actor(
name,
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
bind_addr=bind_addr,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._spawn_n,
infect_asyncio=infect_asyncio,
)
# XXX: don't allow stream funcs
if not (
inspect.iscoroutinefunction(fn) and
not getattr(fn, '_tractor_stream_function', False)
):
raise TypeError(f'{fn} must be an async function!')
# this marks the actor to be cancelled after its portal result
# is retreived, see logic in `open_nursery()` below.
self._cancel_after_result_on_exit.add(portal)
await portal._submit_for_result(
mod_path,
fn.__name__,
**kwargs
)
return portal
async def cancel(
self,
) -> None:
"""
Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
# entries may be poppsed by the spawning backend as
# actors cancel individually
childs = self._children.copy()
if self.cancel_called:
log.warning(
f'Nursery with children {len(childs)} already cancelled')
return
log.cancel(
f'Cancelling nursery in {self._actor.uid} with children\n'
f'{childs.keys()}'
)
self._cancel_called = True
# wake up all spawn tasks to move on as those nursery
# has ``__aexit__()``-ed
self._join_procs.set()
await maybe_wait_for_debugger()
# one-cancels-all strat
try:
async with trio.open_nursery() as cancel_sender:
for subactor, proc, portal in childs.values():
if not portal.cancel_called and portal.channel.connected():
cancel_sender.start_soon(portal.cancel_actor)
except trio.MultiError as err:
_err = err
log.exception(f'{self} errors during cancel')
# await breakpoint()
# # LOL, ok so multiprocessing requires this for some reason..
# with trio.CancelScope(shield=True):
# await trio.lowlevel.checkpoint()
# cancel all spawner tasks
# self._spawn_n.cancel_scope.cancel()
self.cancelled = True
async def _handle_err(
self,
err: BaseException,
portal: Optional[Portal] = None,
is_ctx_error: bool = False,
) -> bool:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
if is_ctx_error:
assert not portal
uid = self._actor.uid
else:
uid = portal.channel.uid
if err not in self.errors.values():
self.errors[uid] = err
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.error(
f"Nursery for {current_actor().uid} "
f"errored from {uid} with\n{err}")
# cancel all subactors
await self.cancel()
return True
log.warning(f'Skipping duplicate error for {uid}')
return False
@asynccontextmanager
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]:
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
src_err: Optional[BaseException] = None
nurse_err: Optional[BaseException] = None
# errors from this daemon actor nursery bubble up to caller
try:
async with trio.open_nursery() as spawn_n:
# try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
anursery = ActorNursery(
actor,
spawn_n,
errors
)
# spawning of actors happens in the caller's scope
# after we yield upwards
try:
yield anursery
log.runtime(
f"Waiting on subactors {anursery._children} "
"to complete"
)
# signal all process monitor tasks to conduct
# hard join phase.
# await maybe_wait_for_debugger()
# log.error('joing trigger NORMAL')
anursery._join_procs.set()
# NOTE: there are 2 cases for error propagation:
# - an actor which is ``.run_in_actor()`` invoked
# runs a single task and reports the error upwards
# - the top level task which opened this nursery (in the
# parent actor) raises. In this case the raise can come
# from a variety of places:
# - user task code unrelated to the nursery/child actors
# - a ``RemoteActorError`` propagated up through the
# portal api from a child actor which will look the exact
# same as a user code failure.
except BaseException as err:
# anursery._join_procs.set()
src_err = err
# with trio.CancelScope(shield=True):
should_raise = await anursery._handle_err(err, is_ctx_error=True)
# XXX: raising here causes some cancellation
# / multierror tests to fail because of what appears to
# be double raise? we probably need to see how `trio`
# does this case..
if should_raise:
raise
# except trio.MultiError as err:
except BaseException as err:
# nursery bubble up
nurse_err = err
# do not double cancel subactors
if not anursery.cancelled:
await anursery._handle_err(err)
raise
finally:
if anursery._children:
log.cancel(f'Waiting on remaining children {anursery._children}')
with trio.CancelScope(shield=True):
await anursery._all_children_reaped.wait()
log.cancel(f'All children complete for {anursery}')
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
# await breakpoint()
if errors:
# if nurse_err or src_err:
if anursery._children:
raise RuntimeError("WHERE TF IS THE ZOMBIE LORD!?!?!")
# with trio.CancelScope(shield=True):
# await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
log.cancel(f'{anursery} terminated gracefully')
# XXX" honestly no idea why this is needed but sure..
if isinstance(src_err, KeyboardInterrupt) and anursery.cancelled:
raise src_err
@asynccontextmanager
async def open_nursery(
**kwargs,
) -> typing.AsyncGenerator[ActorNursery, None]:
"""
Create and yield a new ``ActorNursery`` to be used for spawning
structured concurrent subactors.
When an actor is spawned a new trio task is started which
invokes one of the process spawning backends to create and start
a new subprocess. These tasks are started by one of two nurseries
detailed below. The reason for spawning processes from within
a new task is because ``trio_run_in_process`` itself creates a new
internal nursery and the same task that opens a nursery **must**
close it. It turns out this approach is probably more correct
anyway since it is more clear from the following nested nurseries
which cancellation scopes correspond to each spawned subactor set.
"""
implicit_runtime = False
actor = current_actor(err_on_no_runtime=False)
try:
if actor is None and is_main_process():
# if we are the parent process start the
# actor runtime implicitly
log.info("Starting actor runtime!")
# mark us for teardown on exit
implicit_runtime = True
async with open_root_actor(**kwargs) as actor:
assert actor is current_actor()
# try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
else: # sub-nursery case
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally:
log.debug("Nursery teardown complete")
# shutdown runtime if it was started
if implicit_runtime:
log.info("Shutting down actor tree")

View File

@ -1,440 +0,0 @@
"""
``trio`` inspired apis and helpers
"""
from functools import partial
import inspect
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional
import typing
import warnings
import trio
from async_generator import asynccontextmanager
from . import _debug
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
from ._exceptions import is_multi_cancelled
from ._root import open_root_actor
from . import _state
from . import _spawn
log = get_logger(__name__)
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(
self,
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: Dict[Tuple[str, str], Exception],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
self._ria_nursery = ria_nursery
self._da_nursery = da_nursery
self._children: Dict[
Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]]
] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
self._join_procs = trio.Event()
self.errors = errors
async def start_actor(
self,
name: str,
*,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: List[str] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
enable_modules = enable_modules or []
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
subactor = Actor(
name,
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
)
parent_addr = self._actor.accept_addr
assert parent_addr
# start a task to spawn a process
# blocks until process has been started and a portal setup
nursery = nursery or self._da_nursery
# XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore
partial(
_spawn.new_proc,
name,
self,
subactor,
self.errors,
bind_addr,
parent_addr,
_rtv, # run time vars
)
)
async def run_in_actor(
self,
fn: typing.Callable,
*,
name: Optional[str] = None,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[List[str]] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
**kwargs, # explicit args to ``fn``
) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and
return its result.
Actors spawned using this method are kept alive at nursery teardown
until the task spawned by executing ``fn`` completes at which point
the actor is terminated.
"""
mod_path = fn.__module__
if name is None:
# use the explicit function name if not provided
name = fn.__name__
portal = await self.start_actor(
name,
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
bind_addr=bind_addr,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,
)
# XXX: don't allow stream funcs
if not (
inspect.iscoroutinefunction(fn) and
not getattr(fn, '_tractor_stream_function', False)
):
raise TypeError(f'{fn} must be an async function!')
# this marks the actor to be cancelled after its portal result
# is retreived, see logic in `open_nursery()` below.
self._cancel_after_result_on_exit.add(portal)
await portal._submit_for_result(
mod_path,
fn.__name__,
**kwargs
)
return portal
async def cancel(self, hard_kill: bool = False) -> None:
"""Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
self.cancelled = True
log.cancel(f"Cancelling nursery in {self._actor.uid}")
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values():
# TODO: are we ever even going to use this or
# is the spawning backend responsible for such
# things? I'm thinking latter.
if hard_kill:
proc.terminate()
else:
if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid]
log.warning(
f"{subactor.uid} wasn't finished spawning?")
await event.wait()
# channel/portal should now be up
_, _, portal = self._children[subactor.uid]
# XXX should be impossible to get here
# unless method was called from within
# shielded cancel scope.
if portal is None:
# cancelled while waiting on the event
# to arrive
chan = self._actor._peers[subactor.uid][-1]
if chan:
portal = Portal(chan)
else: # there's no other choice left
proc.terminate()
# spawn cancel tasks for each sub-actor
assert portal
nursery.start_soon(portal.cancel_actor)
# if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes
if cs.cancelled_caught:
log.error(
f"Failed to cancel {self}\nHard killing process tree!")
for subactor, proc, portal in self._children.values():
log.warning(f"Hard killing process {proc}")
proc.terminate()
# mark ourselves as having (tried to have) cancelled all subactors
self._join_procs.set()
@asynccontextmanager
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]:
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
# errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery() as da_nursery:
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.runtime(
f"Waiting on subactors {anursery._children} "
"to complete"
)
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
# signal all process monitor tasks to conduct
# hard join phase.
anursery._join_procs.set()
except BaseException as err:
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
if is_root_process():
# TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be
# scheduled and grab the tty lock on the next
# tick?
# await trio.testing.wait_all_tasks_blocked()
debug_complete = _debug._no_remote_has_tty
if (
debug_complete and
not debug_complete.is_set()
):
log.warning(
'Root has errored but pdb is in use by '
f'child {_debug._global_actor_in_debug}\n'
'Waiting on tty lock to release..')
# with trio.CancelScope(shield=True):
await debug_complete.wait()
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
# this is the catch around the ``.run_in_actor()`` nursery
except (
Exception,
trio.MultiError,
trio.Cancelled
) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.cancel(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end - nursery checkpoint
# after nursery exit
@asynccontextmanager
async def open_nursery(
**kwargs,
) -> typing.AsyncGenerator[ActorNursery, None]:
"""Create and yield a new ``ActorNursery`` to be used for spawning
structured concurrent subactors.
When an actor is spawned a new trio task is started which
invokes one of the process spawning backends to create and start
a new subprocess. These tasks are started by one of two nurseries
detailed below. The reason for spawning processes from within
a new task is because ``trio_run_in_process`` itself creates a new
internal nursery and the same task that opens a nursery **must**
close it. It turns out this approach is probably more correct
anyway since it is more clear from the following nested nurseries
which cancellation scopes correspond to each spawned subactor set.
"""
implicit_runtime = False
actor = current_actor(err_on_no_runtime=False)
try:
if actor is None and is_main_process():
# if we are the parent process start the
# actor runtime implicitly
log.info("Starting actor runtime!")
# mark us for teardown on exit
implicit_runtime = True
async with open_root_actor(**kwargs) as actor:
assert actor is current_actor()
# try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
else: # sub-nursery case
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally:
log.debug("Nursery teardown complete")
# shutdown runtime if it was started
if implicit_runtime:
log.info("Shutting down actor tree")

View File

@ -121,6 +121,7 @@ def pub(
wrapped: typing.Callable = None, wrapped: typing.Callable = None,
*, *,
tasks: Set[str] = set(), tasks: Set[str] = set(),
send_on_connect: Any = None,
): ):
"""Publisher async generator decorator. """Publisher async generator decorator.
@ -206,7 +207,7 @@ def pub(
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
return partial(pub, tasks=tasks) return partial(pub, tasks=tasks, send_on_connect=send_on_connect)
task2lock: Dict[str, trio.StrictFIFOLock] = {} task2lock: Dict[str, trio.StrictFIFOLock] = {}
@ -249,6 +250,11 @@ def pub(
try: try:
modify_subs(topics2ctxs, topics, ctx) modify_subs(topics2ctxs, topics, ctx)
# if specified send the startup message back to consumer
if send_on_connect is not None:
await ctx.send_yield(send_on_connect)
# block and let existing feed task deliver # block and let existing feed task deliver
# stream data until it is cancelled in which case # stream data until it is cancelled in which case
# the next waiting task will take over and spawn it again # the next waiting task will take over and spawn it again

View File

@ -0,0 +1,300 @@
'''
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
'''
import asyncio
from contextlib import asynccontextmanager as acm
import inspect
from typing import (
Any,
Callable,
AsyncIterator,
Awaitable,
Optional,
)
import trio
from .log import get_logger, get_console_log
from ._state import current_actor
log = get_logger(__name__)
__all__ = ['run_task', 'run_as_asyncio_guest']
# async def consume_asyncgen(
# to_trio: trio.MemorySendChannel,
# coro: AsyncIterator,
# ) -> None:
# """Stream async generator results back to ``trio``.
# ``from_trio`` might eventually be used here for
# bidirectional streaming.
# """
# async for item in coro:
# to_trio.send_nowait(item)
def _run_asyncio_task(
func: Callable,
*,
qsize: int = 1,
# _treat_as_stream: bool = False,
provide_channels: bool = False,
**kwargs,
) -> Any:
"""
Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
if not current_actor().is_infected_aio():
raise RuntimeError("`infect_asyncio` mode is not enabled!?")
# ITC (inter task comms)
from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
from_aio._err = None
args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
# _treat_as_stream = True
assert qsize > 1
if provide_channels:
assert 'to_trio' in args
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
coro = func(**kwargs)
cancel_scope = trio.CancelScope()
aio_task_complete = trio.Event()
aio_err: Optional[BaseException] = None
async def wait_on_coro_final_result(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
aio_task_complete: trio.Event,
) -> None:
"""
Await ``coro`` and relay result back to ``trio``.
"""
nonlocal aio_err
orig = result = id(coro)
try:
result = await coro
except BaseException as err:
aio_err = err
from_aio._err = aio_err
finally:
aio_task_complete.set()
if result != orig and aio_err is None:
to_trio.send_nowait(result)
# start the asyncio task we submitted from trio
if inspect.isawaitable(coro):
task = asyncio.create_task(
wait_on_coro_final_result(to_trio, coro, aio_task_complete)
)
# elif inspect.isasyncgen(coro):
# task = asyncio.create_task(consume_asyncgen(to_trio, coro))
else:
raise TypeError(f"No support for invoking {coro}")
def cancel_trio(task):
"""Cancel the calling ``trio`` task on error.
"""
nonlocal aio_err
try:
aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}")
from_aio._err = aio_err
cancel_scope.cancel()
from_aio.close()
task.add_done_callback(cancel_trio)
return task, from_aio, to_trio, cancel_scope, aio_task_complete
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
# _treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
# simple async func
try:
task, from_aio, to_trio, cs, _ = _run_asyncio_task(
func,
qsize=1,
**kwargs,
)
# return single value
with cs:
# naively expect the mem chan api to do the job
# of handling cross-framework cancellations / errors
return await from_aio.receive()
if cs.cancelled_caught:
# always raise from any captured asyncio error
if from_aio._err:
raise from_aio._err
# Do we need this?
except BaseException as err:
aio_err = from_aio._err
if aio_err is not None:
# always raise from any captured asyncio error
raise err from aio_err
else:
raise
# except trio.Cancelled:
# raise
finally:
if not task.done():
task.cancel()
# TODO: explicit api for the streaming case where
# we pull from the mem chan in an async generator?
# This ends up looking more like our ``Portal.open_stream_from()``
# NB: code below is untested.
# async def _start_and_sync_aio_task(
# from_trio,
# to_trio,
# from_aio,
@acm
async def open_channel_from(
target: Callable[[Any, ...], Any],
**kwargs,
) -> AsyncIterator[Any]:
try:
task, from_aio, to_trio, cs, aio_task_complete = _run_asyncio_task(
target,
qsize=2**8,
provide_channels=True,
**kwargs,
)
with cs:
# sync to "started()" call.
first = await from_aio.receive()
# stream values upward
async with from_aio:
yield first, from_aio
# await aio_task_complete.wait()
except BaseException as err:
aio_err = from_aio._err
if aio_err is not None:
# always raise from any captured asyncio error
raise err from aio_err
else:
raise
finally:
if cs.cancelled_caught:
# always raise from any captured asyncio error
if from_aio._err:
raise from_aio._err
if not task.done():
task.cancel()
def run_as_asyncio_guest(
trio_main: Callable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
# Disable sigint handling in children?
# import signal
# signal.signal(signal.SIGINT, signal.SIG_IGN)
get_console_log('runtime')
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
print(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
# might as well if it's installed.
try:
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
except ImportError:
pass
asyncio.run(aio_main(trio_main))

View File

@ -0,0 +1,14 @@
'''
Sugary patterns for trio + tractor designs.
'''
from ._mngrs import async_enter_all
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
__all__ = [
'async_enter_all',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',
]

View File

@ -0,0 +1,64 @@
'''
Async context manager primitives with hard ``trio``-aware semantics
'''
from typing import AsyncContextManager
from typing import TypeVar
from contextlib import asynccontextmanager as acm
import trio
# A regular invariant generic type
T = TypeVar("T")
async def _enter_and_sleep(
mngr: AsyncContextManager[T],
to_yield: dict[int, T],
all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> T:
'''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
to_yield[id(mngr)] = value
if all(to_yield.values()):
all_entered.set()
# sleep until cancelled
await trio.sleep_forever()
@acm
async def async_enter_all(
*mngrs: tuple[AsyncContextManager[T]],
) -> tuple[T]:
to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event()
async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_sleep,
mngr,
to_yield,
all_entered,
)
# deliver control once all managers have started up
await all_entered.wait()
yield tuple(to_yield.values())
# tear down all sleeper tasks thus triggering individual
# mngr ``__aexit__()``s.
n.cancel_scope.cancel()