forked from goodboy/tractor
1
0
Fork 0

Merge pull request #198 from goodboy/kinda_drop_run

Kinda drop run
first_pypi_release
goodboy 2021-02-25 09:09:41 -05:00 committed by GitHub
commit 49a02e6700
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 163 additions and 138 deletions

View File

@ -17,10 +17,13 @@ async def name_error():
async def main(): async def main():
"""Test breakpoint in a streaming actor. """Test breakpoint in a streaming actor.
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=True,
loglevel='error',
) as n:
p0 = await n.start_actor('bp_forever', rpc_module_paths=[__name__]) p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
p1 = await n.start_actor('name_error', rpc_module_paths=[__name__]) p1 = await n.start_actor('name_error', enable_modules=[__name__])
# retreive results # retreive results
stream = await p0.run(breakpoint_forever) stream = await p0.run(breakpoint_forever)
@ -28,4 +31,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True, loglevel='error') trio.run(main)

View File

@ -1,3 +1,4 @@
import trio
import tractor import tractor
@ -50,7 +51,9 @@ async def main():
python -m tractor._child --uid ('spawn_until_0', 'de918e6d ...) python -m tractor._child --uid ('spawn_until_0', 'de918e6d ...)
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=True,
) as n:
# spawn both actors # spawn both actors
portal = await n.run_in_actor( portal = await n.run_in_actor(
@ -70,4 +73,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) trio.run(main)

View File

@ -82,7 +82,7 @@ def test_root_actor_error(spawn, user_in_out):
before = str(child.before.decode()) before = str(child.before.decode())
# make sure expected logging and error arrives # make sure expected logging and error arrives
assert "Attaching to pdb in crashed actor: ('arbiter'" in before assert "Attaching to pdb in crashed actor: ('root'" in before
assert 'AssertionError' in before assert 'AssertionError' in before
# send user command # send user command
@ -170,7 +170,7 @@ def test_subactor_error(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
# root actor gets debugger engaged # root actor gets debugger engaged
assert "Attaching to pdb in crashed actor: ('arbiter'" in before assert "Attaching to pdb in crashed actor: ('root'" in before
# error is a remote error propagated from the subactor # error is a remote error propagated from the subactor
assert "RemoteActorError: ('name_error'" in before assert "RemoteActorError: ('name_error'" in before
@ -276,7 +276,7 @@ def test_multi_subactors(spawn):
child.sendline('q') child.sendline('q')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('arbiter'" in before assert "Attaching to pdb in crashed actor: ('root'" in before
assert "RemoteActorError: ('breakpoint_forever'" in before assert "RemoteActorError: ('breakpoint_forever'" in before
assert 'bdb.BdbQuit' in before assert 'bdb.BdbQuit' in before
@ -323,7 +323,6 @@ def test_multi_daemon_subactors(spawn, loglevel):
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
def test_multi_subactors_root_errors(spawn): def test_multi_subactors_root_errors(spawn):
"""Multiple subactors, both erroring and breakpointing as well as """Multiple subactors, both erroring and breakpointing as well as
a nested subactor erroring. a nested subactor erroring.
@ -345,7 +344,7 @@ def test_multi_subactors_root_errors(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
# should have come just after priot prompt # should have come just after priot prompt
assert "Attaching to pdb in crashed actor: ('arbiter'" in before assert "Attaching to pdb in crashed actor: ('root'" in before
assert "AssertionError" in before assert "AssertionError" in before
# warnings assert we probably don't need # warnings assert we probably don't need
@ -402,7 +401,10 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
assert "NameError" in before assert "NameError" in before
def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method): def test_root_nursery_cancels_before_child_releases_tty_lock(
spawn,
start_method
):
"""Test that when the root sends a cancel message before a nested """Test that when the root sends a cancel message before a nested
child has unblocked (which can happen when it has the tty lock and child has unblocked (which can happen when it has the tty lock and
is engaged in pdb) it is indeed cancelled after exiting the debugger. is engaged in pdb) it is indeed cancelled after exiting the debugger.
@ -420,7 +422,6 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method
child.sendline('c') child.sendline('c')
for i in range(4): for i in range(4):
time.sleep(0.5) time.sleep(0.5)
try: try:
@ -440,7 +441,6 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method
# race conditions on how fast the continue is sent? # race conditions on how fast the continue is sent?
break break
before = str(child.before.decode()) before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before assert "NameError: name 'doggypants' is not defined" in before

View File

@ -161,7 +161,7 @@ def test_multi_actor_subs_arbiter_pub(
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
name = 'arbiter' name = 'root'
if pub_actor == 'streamer': if pub_actor == 'streamer':
# start the publisher as a daemon # start the publisher as a daemon

View File

@ -53,7 +53,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
exposed_mods, funcname, inside_err = to_call exposed_mods, funcname, inside_err = to_call
subactor_exposed_mods = [] subactor_exposed_mods = []
func_defined = globals().get(funcname, False) func_defined = globals().get(funcname, False)
subactor_requests_to = 'arbiter' subactor_requests_to = 'root'
remote_err = tractor.RemoteActorError remote_err = tractor.RemoteActorError
# remote module that fails at import time # remote module that fails at import time

View File

@ -164,7 +164,7 @@ async def open_root_actor(
) )
try: try:
yield actor yield actor
# result = await main()
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
logger.exception("Actor crashed:") logger.exception("Actor crashed:")
await _debug._maybe_enter_pm(err) await _debug._maybe_enter_pm(err)
@ -185,7 +185,7 @@ def run(
*args, *args,
# runtime kwargs # runtime kwargs
name: Optional[str] = None, name: Optional[str] = 'root',
arbiter_addr: Tuple[str, int] = ( arbiter_addr: Tuple[str, int] = (
_default_arbiter_host, _default_arbiter_host,
_default_arbiter_port, _default_arbiter_port,

View File

@ -5,7 +5,6 @@ from functools import partial
import multiprocessing as mp import multiprocessing as mp
from typing import Tuple, List, Dict, Optional from typing import Tuple, List, Dict, Optional
import typing import typing
from contextlib import AsyncExitStack
import warnings import warnings
import trio import trio
@ -200,6 +199,128 @@ class ActorNursery:
self._join_procs.set() self._join_procs.set()
@asynccontextmanager
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]:
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
# errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery() as da_nursery:
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
f"Waiting on subactors {anursery._children} "
"to complete"
)
except BaseException as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError, trio.Cancelled) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end - nursery checkpoint
# after nursery exit
@asynccontextmanager @asynccontextmanager
async def open_nursery( async def open_nursery(
**kwargs, **kwargs,
@ -221,137 +342,35 @@ async def open_nursery(
actor = current_actor(err_on_no_runtime=False) actor = current_actor(err_on_no_runtime=False)
if actor is None and is_main_process():
# if we are the parent process start the actor runtime implicitly
log.info("Starting actor runtime!")
root_runtime_stack = AsyncExitStack()
actor = await root_runtime_stack.enter_async_context(
open_root_actor(**kwargs)
)
assert actor is current_actor()
# mark us for teardown on exit
implicit_runtime = True
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
try: try:
async with trio.open_nursery() as da_nursery: if actor is None and is_main_process():
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
f"Waiting on subactors {anursery._children} "
"to complete"
)
except BaseException as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors # if we are the parent process start the actor runtime implicitly
await anursery.cancel() log.info("Starting actor runtime!")
except trio.MultiError as merr: # mark us for teardown on exit
# If we receive additional errors while waiting on implicit_runtime = True
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# Last bit before first nursery block ends in the case async with open_root_actor(**kwargs) as actor:
# where we didn't error in the caller's scope assert actor is current_actor()
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope end async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
# XXX: do we need a `trio.Cancelled` catch here as well? yield anursery
except (Exception, trio.MultiError, trio.Cancelled) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed else: # sub-nursery case
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end - nursery checkpoint async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
# after nursery exit
finally: finally:
log.debug("Nursery teardown complete") log.debug("Nursery teardown complete")
# shutdown runtime if it was started # shutdown runtime if it was started
if implicit_runtime: if implicit_runtime:
log.info("Shutting down actor tree") log.info("Shutting down actor tree")
await root_runtime_stack.aclose()