Factor OCA supervisor into new func

mp_hang_search
Tyler Goodlet 2021-02-24 12:59:43 -05:00
parent 35775c6763
commit b285db4c58
2 changed files with 130 additions and 112 deletions

View File

@ -164,7 +164,10 @@ async def open_root_actor(
) )
try: try:
yield actor yield actor
# result = await main()
# except BaseException as err:
# breakpoint()
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
logger.exception("Actor crashed:") logger.exception("Actor crashed:")
await _debug._maybe_enter_pm(err) await _debug._maybe_enter_pm(err)

View File

@ -200,6 +200,128 @@ class ActorNursery:
self._join_procs.set() self._join_procs.set()
@asynccontextmanager
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]:
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
# errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery() as da_nursery:
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
f"Waiting on subactors {anursery._children} "
"to complete"
)
except BaseException as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError, trio.Cancelled) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end - nursery checkpoint
# after nursery exit
@asynccontextmanager @asynccontextmanager
async def open_nursery( async def open_nursery(
**kwargs, **kwargs,
@ -234,120 +356,13 @@ async def open_nursery(
# mark us for teardown on exit # mark us for teardown on exit
implicit_runtime = True implicit_runtime = True
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
try: try:
async with trio.open_nursery() as da_nursery: async with _open_and_supervise_one_cancels_all_nursery(
try: actor
# This is the inner level "run in actor" nursery. It is ) as anursery:
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
f"Waiting on subactors {anursery._children} "
"to complete"
)
except BaseException as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors yield anursery
await anursery.cancel()
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError, trio.Cancelled) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end - nursery checkpoint
# after nursery exit
finally: finally:
log.debug("Nursery teardown complete") log.debug("Nursery teardown complete")