diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 01b3af2..a6e2813 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -14,26 +14,66 @@ async def assert_err(): assert 0 -def test_remote_error(arb_addr): - """Verify an error raises in a subactor is propagated to the parent. +@pytest.mark.parametrize( + 'args_err', + [ + # expected to be thrown in assert_err + ({}, AssertionError), + # argument mismatch raised in _invoke() + ({'unexpected': 10}, TypeError) + ], + ids=['no_args', 'unexpected_args'], +) +def test_remote_error(arb_addr, args_err): + """Verify an error raised in a subactor that is propagated + to the parent nursery, contains the underlying boxed builtin + error type info and causes cancellation and reraising all the + way up the stack. + """ + args, errtype = args_err + + async def main(): + async with tractor.open_nursery() as nursery: + + portal = await nursery.run_in_actor('errorer', assert_err, **args) + + # get result(s) from main task + try: + await portal.result() + except tractor.RemoteActorError as err: + assert err.type == errtype + print("Look Maa that actor failed hard, hehh") + raise + + with pytest.raises(tractor.RemoteActorError) as excinfo: + tractor.run(main, arbiter_addr=arb_addr) + + # ensure boxed error is correct + assert excinfo.value.type == errtype + + +def test_multierror(arb_addr): + """Verify we raise a ``trio.MultiError`` out of a nursery where + more then one actor errors. """ async def main(): async with tractor.open_nursery() as nursery: - portal = await nursery.run_in_actor('errorer', assert_err) + await nursery.run_in_actor('errorer1', assert_err) + portal2 = await nursery.run_in_actor('errorer2', assert_err) # get result(s) from main task try: - return await portal.result() - except tractor.RemoteActorError: - print("Look Maa that actor failed hard, hehh") + await portal2.result() + except tractor.RemoteActorError as err: + assert err.type == AssertionError + print("Look Maa that first actor failed hard, hehh") raise - except Exception: - pass - assert 0, "Remote error was not raised?" - with pytest.raises(tractor.RemoteActorError): - # also raises + # here we should get a `trio.MultiError` containing exceptions + # from both subactors + + with pytest.raises(trio.MultiError): tractor.run(main, arbiter_addr=arb_addr) @@ -42,9 +82,12 @@ def do_nothing(): def test_cancel_single_subactor(arb_addr): - - async def main(): - + """Ensure a ``ActorNursery.start_actor()`` spawned subactor + cancels when the nursery is cancelled. + """ + async def spawn_actor(): + """Spawn an actor that blocks indefinitely. + """ async with tractor.open_nursery() as nursery: portal = await nursery.start_actor( @@ -55,7 +98,7 @@ def test_cancel_single_subactor(arb_addr): # would hang otherwise await nursery.cancel() - tractor.run(main, arbiter_addr=arb_addr) + tractor.run(spawn_actor, arbiter_addr=arb_addr) async def stream_forever(): @@ -87,13 +130,22 @@ async def test_cancel_infinite_streamer(): assert n.cancelled +@pytest.mark.parametrize( + 'num_actors_and_errs', + [ + (1, tractor.RemoteActorError, AssertionError), + (2, tractor.MultiError, AssertionError) + ], + ids=['one_actor', 'two_actors'], +) @tractor_test -async def test_one_cancels_all(): - """Verify one failed actor causes all others in the nursery - to be cancelled just like in trio. +async def test_some_cancels_all(num_actors_and_errs): + """Verify a subset of failed subactors causes all others in + the nursery to be cancelled just like the strategy in trio. This is the first and only supervisory strategy at the moment. """ + num, first_err, err_type = num_actors_and_errs try: async with tractor.open_nursery() as n: real_actors = [] @@ -103,13 +155,23 @@ async def test_one_cancels_all(): rpc_module_paths=[__name__], )) - # start one actor that will fail immediately - await n.run_in_actor('extra', assert_err) + for i in range(num): + # start actor(s) that will fail immediately + await n.run_in_actor(f'extra_{i}', assert_err) - # should error here with a ``RemoteActorError`` containing - # an ``AssertionError` + # should error here with a ``RemoteActorError`` or ``MultiError`` + + except first_err as err: + if isinstance(err, tractor.MultiError): + assert len(err.exceptions) == num + for exc in err.exceptions: + if isinstance(exc, tractor.RemoteActorError): + assert exc.type == err_type + else: + assert isinstance(exc, trio.Cancelled) + elif isinstance(err, tractor.RemoteActorError): + assert err.type == err_type - except tractor.RemoteActorError: assert n.cancelled is True assert not n._children else: diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index b85ee98..1a71d6b 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -41,7 +41,7 @@ def daemon(loglevel, testdir, arb_addr): stderr=subprocess.PIPE, ) assert not proc.returncode - wait = 0.6 if sys.version_info < (3, 7) else 0.2 + wait = 0.6 if sys.version_info < (3, 7) else 0.4 time.sleep(wait) yield proc sig_prog(proc, signal.SIGINT) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 31f85a0..5691fbd 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -166,7 +166,7 @@ def test_a_quadruple_example(time_quad_ex): @pytest.mark.parametrize( 'cancel_delay', - list(map(lambda i: i/10, range(2, 8))) + list(map(lambda i: i/10, range(3, 9))) ) def test_not_fast_enough_quad(arb_addr, time_quad_ex, cancel_delay): """Verify we can cancel midway through the quad example and all actors diff --git a/tractor/__init__.py b/tractor/__init__.py index ca2055f..0667bd5 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,10 +4,11 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -from typing import Tuple, Any, Optional +from typing import Tuple, Any import typing import trio # type: ignore +from trio import MultiError from .log import get_console_log, get_logger, get_loglevel from ._ipc import _connect_chan, Channel @@ -16,17 +17,18 @@ from ._actor import ( ) from ._trionics import open_nursery from ._state import current_actor -from ._portal import RemoteActorError +from ._exceptions import RemoteActorError __all__ = [ 'current_actor', 'find_actor', 'get_arbiter', - 'wait_for_actor', 'open_nursery', - 'RemoteActorError', + 'wait_for_actor', 'Channel', + 'MultiError', + 'RemoteActorError', ] diff --git a/tractor/_actor.py b/tractor/_actor.py index 8d686d5..1e7d173 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -16,6 +16,7 @@ from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan from .log import get_console_log, get_logger +from ._exceptions import pack_error, InternalActorError from ._portal import ( Portal, open_portal, @@ -33,10 +34,6 @@ class ActorFailure(Exception): "General actor failure" -class InternalActorError(RuntimeError): - "Actor primitive internals failure" - - async def _invoke( actor: 'Actor', cid: str, @@ -49,6 +46,7 @@ async def _invoke( """ sig = inspect.signature(func) treat_as_gen = False + cs = None if 'chan' in sig.parameters: assert 'cid' in sig.parameters, \ f"{func} must accept a `cid` (caller id) kwarg" @@ -122,10 +120,19 @@ async def _invoke( with trio.open_cancel_scope() as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except Exception: + except Exception as err: # always ship errors back to caller log.exception("Actor errored:") - await chan.send({'error': traceback.format_exc(), 'cid': cid}) + err_msg = pack_error(err) + err_msg['cid'] = cid + try: + await chan.send(err_msg) + except trio.ClosedResourceError: + log.exception( + f"Failed to ship error to caller @ {chan.uid}") + if cs is None: + # error is from above code not from rpc invocation + task_status.started(err) finally: # RPC task bookeeping tasks = actor._rpc_tasks.get(chan, None) @@ -348,13 +355,19 @@ class Actor: try: ns, funcname, kwargs, actorid, cid = msg['cmd'] except KeyError: - # push any non-rpc-response error to all local consumers - # and mark the channel as errored - chan._exc = err = msg['error'] + # This is the non-rpc error case, that is, an + # error **not** raised inside a call to ``_invoke()`` + # (i.e. no cid was provided in the msg - see above). + # Push this error to all local channel consumers + # (normally portals) by marking the channel as errored + tb_str = msg.get('tb_str') assert chan.uid - for cid in self._actors2calls[chan.uid]: - await self._push_result(chan.uid, cid, msg) - raise InternalActorError(f"{chan.uid}\n" + err) + exc = InternalActorError( + f"{chan.uid}\n" + tb_str, + **msg, + ) + chan._exc = exc + raise exc log.debug( f"Processing request from {actorid}\n" @@ -373,22 +386,30 @@ class Actor: # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) if func != self.cancel: - self._no_more_rpc_tasks.clear() - log.info(f"RPC func is {func}") - self._rpc_tasks.setdefault(chan, []).append((cs, func)) + if isinstance(cs, Exception): + log.warn(f"Task for RPC func {func} failed with {cs}") + else: + # mark that we have ongoing rpc tasks + self._no_more_rpc_tasks.clear() + log.info(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks.setdefault(chan, []).append((cs, func)) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") - else: # channel disconnect + else: + # channel disconnect log.debug(f"{chan} from {chan.uid} disconnected") except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") - except Exception: - # ship exception (from above code) to parent + except Exception as err: + # ship any "internal" exception (i.e. one from internal machinery + # not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: - await self._parent_chan.send({'error': traceback.format_exc()}) - raise + await self._parent_chan.send(pack_error(err)) + raise # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" finally: @@ -480,25 +501,30 @@ class Actor: # blocks here as expected until the channel server is # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception: + except Exception as err: + if not registered_with_arbiter: + log.exception( + f"Actor errored and failed to register with arbiter " + f"@ {arbiter_addr}") + if self._parent_chan: try: + # internal error so ship to parent without cid await self._parent_chan.send( - {'error': traceback.format_exc()}) + pack_error(err)) except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") log.exception("Actor errored:") - - if not registered_with_arbiter: - log.exception( - f"Actor errored and failed to register with arbiter " - f"@ {arbiter_addr}") else: + # XXX wait, why? + # causes a hang if I always raise.. raise + finally: - await self._do_unreg(arbiter_addr) + if registered_with_arbiter: + await self._do_unreg(arbiter_addr) # terminate actor once all it's peers (actors that connected # to it as clients) have disappeared if not self._no_more_peers.is_set(): diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py new file mode 100644 index 0000000..7244396 --- /dev/null +++ b/tractor/_exceptions.py @@ -0,0 +1,50 @@ +""" +Our classy exception set. +""" +import builtins +import traceback + + +class RemoteActorError(Exception): + # TODO: local recontruction of remote exception deats + "Remote actor exception bundled locally" + def __init__(self, message, type_str, **msgdata): + super().__init__(message) + self.type = getattr(builtins, type_str, Exception) + self.msgdata = msgdata + + # TODO: a trio.MultiError.catch like context manager + # for catching underlying remote errors of a particular type + + +class InternalActorError(RemoteActorError): + """Remote internal ``tractor`` error indicating + failure of some primitive or machinery. + """ + + +class NoResult(RuntimeError): + "No final result is expected for this actor" + + +def pack_error(exc): + """Create an "error message" for tranmission over + a channel (aka the wire). + """ + return { + 'error': { + 'tb_str': traceback.format_exc(), + 'type_str': type(exc).__name__, + } + } + + +def unpack_error(msg, chan=None): + """Unpack an 'error' message from the wire + into a local ``RemoteActorError``. + """ + tb_str = msg['error'].get('tb_str', '') + return RemoteActorError( + f"{chan.uid}\n" + tb_str, + **msg['error'], + ) diff --git a/tractor/_portal.py b/tractor/_portal.py index 9dc0cba..28e28f4 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -12,15 +12,12 @@ from async_generator import asynccontextmanager from ._state import current_actor from ._ipc import Channel from .log import get_logger +from ._exceptions import unpack_error, NoResult, RemoteActorError log = get_logger('tractor') -class RemoteActorError(RuntimeError): - "Remote actor exception bundled locally" - - @asynccontextmanager async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): """Create a new nursery if None provided. @@ -63,8 +60,8 @@ class Portal: # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime - self._result = None - self._exc: Optional[RemoteActorError] = None + self._result: Optional[Any] = None + # set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None @@ -97,8 +94,7 @@ class Portal: elif functype == 'asyncgen': resp_type = 'yield' elif 'error' in first_msg: - raise RemoteActorError( - f"{self.channel.uid}\n" + first_msg['error']) + raise unpack_error(first_msg, self.channel) else: raise ValueError(f"{first_msg} is an invalid response packet?") @@ -110,10 +106,11 @@ class Portal: self._expect_result = await self._submit(ns, func, **kwargs) async def run(self, ns: str, func: str, **kwargs) -> Any: - """Submit a function to be scheduled and run by actor, wrap and return - its (stream of) result(s). + """Submit a remote function to be scheduled and run by actor, + wrap and return its (stream of) result(s). - This is a blocking call. + This is a blocking call and returns either a value from the + remote rpc task or a local async generator instance. """ return await self._return_from_resptype( *(await self._submit(ns, func, **kwargs)) @@ -137,14 +134,19 @@ class Portal: if 'stop' in msg: break # far end async gen terminated else: - raise RemoteActorError( - f"{self.channel.uid}\n" + msg['error']) + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self.channel) + except StopAsyncIteration: log.debug( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") raise + # TODO: use AsyncExitStack to aclose() all agens + # on teardown return yield_from_q() elif resptype == 'return': @@ -152,30 +154,43 @@ class Portal: try: return msg['return'] except KeyError: - self._exc = RemoteActorError( - f"{self.channel.uid}\n" + msg['error']) - raise self._exc + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, self.channel) else: raise ValueError(f"Unknown msg response type: {first_msg}") async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. """ - if self._expect_result is None: - # (remote) errors are slapped on the channel - # teardown can reraise them - exc = self.channel._exc - if exc: - raise RemoteActorError(f"{self.channel.uid}\n{exc}") - else: - raise RuntimeError( - f"Portal for {self.channel.uid} is not expecting a final" - "result?") + # Check for non-rpc errors slapped on the + # channel for which we always raise + exc = self.channel._exc + if exc: + raise exc + + # not expecting a "main" result + if self._expect_result is None: + log.warn( + f"Portal for {self.channel.uid} not expecting a final" + " result?\nresult() should only be called if subactor" + " was spawned with `ActorNursery.run_in_actor()`") + return NoResult + + # expecting a "main" result + assert self._expect_result + if self._result is None: + try: + self._result = await self._return_from_resptype( + *self._expect_result + ) + except RemoteActorError as err: + self._result = err + + # re-raise error on every call + if isinstance(self._result, RemoteActorError): + raise self._result - elif self._result is None: - self._result = await self._return_from_resptype( - *self._expect_result - ) return self._result async def close(self) -> None: diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dc607c7..289e42b 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -32,7 +32,8 @@ class ActorNursery: Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] ] = {} - # portals spawned with ``run_in_actor()`` + # portals spawned with ``run_in_actor()`` are + # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False self._forkserver: forkserver.ForkServer = None @@ -132,6 +133,8 @@ class ActorNursery: bind_addr=bind_addr, statespace=statespace, ) + # this marks the actor to be cancelled after its portal result + # is retreived, see ``wait()`` below. self._cancel_after_result_on_exit.add(portal) await portal._submit_for_result( mod_path, @@ -142,29 +145,66 @@ class ActorNursery: async def wait(self) -> None: """Wait for all subactors to complete. + + This is probably the most complicated (and confusing, sorry) + function that does all the clever crap to deal with cancellation, + error propagation, and graceful subprocess tear down. """ - async def maybe_consume_result(portal, actor): - if ( - portal in self._cancel_after_result_on_exit and - (portal._result is None and portal._exc is None) - ): - log.debug(f"Waiting on final result from {subactor.uid}") - res = await portal.result() - # if it's an async-gen then we should alert the user - # that we're cancelling it + async def exhaust_portal(portal, actor): + """Pull final result from portal (assuming it has one). + + If the main task is an async generator do our best to consume + what's left of it. + """ + try: + log.debug(f"Waiting on final result from {actor.uid}") + final = res = await portal.result() + # if it's an async-gen then alert that we're cancelling it if inspect.isasyncgen(res): + final = [] log.warning( f"Blindly consuming asyncgen for {actor.uid}") with trio.fail_after(1): async with aclosing(res) as agen: async for item in agen: log.debug(f"Consuming item {item}") + final.append(item) + except (Exception, trio.MultiError) as err: + # we reraise in the parent task via a ``trio.MultiError`` + return err + else: + return final + + async def cancel_on_completion( + portal: Portal, + actor: Actor, + task_status=trio.TASK_STATUS_IGNORED, + ) -> None: + """Cancel actor gracefully once it's "main" portal's + result arrives. + + Should only be called for actors spawned with `run_in_actor()`. + """ + with trio.open_cancel_scope() as cs: + task_status.started(cs) + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors.append(result) + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + + if cs.cancelled_caught: + log.warning( + "Result waiter was cancelled, process may have died") async def wait_for_proc( proc: mp.Process, actor: Actor, portal: Portal, - cancel_scope: trio._core._run.CancelScope, + cancel_scope: Optional[trio._core._run.CancelScope] = None, ) -> None: # TODO: timeout block here? if proc.is_alive(): @@ -172,42 +212,59 @@ class ActorNursery: # please god don't hang proc.join() log.debug(f"Joined {proc}") - await maybe_consume_result(portal, actor) - self._children.pop(actor.uid) - # proc terminated, cancel result waiter + + # proc terminated, cancel result waiter that may have + # been spawned in tandem if cancel_scope: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() - async def wait_for_actor( - portal: Portal, - actor: Actor, - task_status=trio.TASK_STATUS_IGNORED, - ) -> None: - # cancel the actor gracefully - with trio.open_cancel_scope() as cs: - task_status.started(cs) - await maybe_consume_result(portal, actor) - log.info(f"Cancelling {portal.channel.uid} gracefully") - await portal.cancel_actor() - - if cs.cancelled_caught: - log.warning("Result waiter was cancelled") - - # unblocks when all waiter tasks have completed + log.debug(f"Waiting on all subactors to complete") + # since we pop each child subactor on termination, + # iterate a copy children = self._children.copy() + errors: List[Exception] = [] + # wait on run_in_actor() tasks, unblocks when all complete async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): cs = None + # portal from ``run_in_actor()`` if portal in self._cancel_after_result_on_exit: - cs = await nursery.start(wait_for_actor, portal, subactor) + cs = await nursery.start( + cancel_on_completion, portal, subactor) + # TODO: how do we handle remote host spawned actors? + nursery.start_soon( + wait_for_proc, proc, subactor, portal, cs) + + if errors: + if not self.cancelled: + # halt here and expect to be called again once the nursery + # has been cancelled externally (ex. from within __aexit__() + # if an error is captured from ``wait()`` then ``cancel()`` + # is called immediately after which in turn calls ``wait()`` + # again.) + raise trio.MultiError(errors) + + # wait on all `start_actor()` subactors to complete + # if errors were captured above and we have not been cancelled + # then these ``start_actor()`` spawned actors will block until + # cancelled externally + children = self._children.copy() + async with trio.open_nursery() as nursery: + for subactor, proc, portal in children.values(): + # TODO: how do we handle remote host spawned actors? nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) + log.debug(f"All subactors for {self} have terminated") + if errors: + # always raise any error if we're also cancelled + raise trio.MultiError(errors) + async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel - iteslf and wait for all subprocesses to terminate. + itself and wait for all subactors to terminate. If ``hard_killl`` is set to ``True`` then kill the processes directly without any far end graceful ``trio`` cancellation. @@ -234,56 +291,57 @@ class ActorNursery: # channel/portal should now be up _, _, portal = self._children[subactor.uid] if portal is None: - # cancelled while waiting on the event? + # cancelled while waiting on the event + # to arrive chan = self._actor._peers[subactor.uid][-1] if chan: portal = Portal(chan) else: # there's no other choice left do_hard_kill(proc) - # spawn cancel tasks async + # spawn cancel tasks assert portal n.start_soon(portal.cancel_actor) - log.debug(f"Waiting on all subactors to complete") - await self.wait() + # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True - log.debug(f"All subactors for {self} have terminated") + await self.wait() async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - try: - if etype is not None: + if etype is not None: + try: # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case the - # else block here might not complete? Should both be shielded? + # a cancel signal shows up slightly after in which case + # the `else:` block here might not complete? + # For now, shield both. with trio.open_cancel_scope(shield=True): if etype is trio.Cancelled: log.warning( - f"{current_actor().uid} was cancelled with {etype}" - ", cancelling actor nursery") - await self.cancel() + f"Nursery for {current_actor().uid} was " + f"cancelled with {etype}") else: log.exception( - f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() - else: - # XXX: this is effectively the lone cancellation/supervisor - # strategy which exactly mimicks trio's behaviour - log.debug(f"Waiting on subactors {self._children} to complete") - try: - await self.wait() - except Exception as err: - log.warning(f"Nursery caught {err}, cancelling") + f"Nursery for {current_actor().uid} " + f"errored with {etype}, ") await self.cancel() - raise - log.debug(f"Nursery teardown complete") - except Exception: - log.exception("Error on nursery exit:") - await self.wait() - raise + except trio.MultiError as merr: + if value not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [value]) + raise + else: + # XXX: this is effectively the (for now) lone + # cancellation/supervisor strategy which exactly + # mimicks trio's behaviour + log.debug(f"Waiting on subactors {self._children} to complete") + try: + await self.wait() + except (Exception, trio.MultiError) as err: + log.warning(f"Nursery caught {err}, cancelling") + await self.cancel() + raise + log.debug(f"Nursery teardown complete") @asynccontextmanager @@ -297,3 +355,9 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # TODO: figure out supervisors from erlang async with ActorNursery(current_actor()) as nursery: yield nursery + + +def is_main_process(): + """Bool determining if this actor is running in the top-most process. + """ + return mp.current_process().name == 'MainProcess'