From d2d8860dad779d639f7ea35b3555f92c548e9871 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Aug 2020 23:51:23 -0400 Subject: [PATCH 01/19] Add test for dereg failure on manual stream close There was code from the last de-registration fix PR that I had commented (to do with shielding arbiter dereg steps in `Actor._async_main()`) because the block didn't seem to make a difference under infinite streaming tests. Turns out it **for sure** is needed under certain conditions (likely if the actor's root nursery is cancelled prior to actor nursery exit). This was an attempt to simulate the failure mode if you manually close the stream **before** cancelling the containing **actor**. More tests to come I guess. --- tests/test_discovery.py | 56 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 6370fe2..96a3908 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -228,3 +228,59 @@ def test_subactors_unregister_on_cancel_remote_daemon( # XXX: required to use remote daemon! arbiter_addr=arb_addr ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_close_channel_explicit( + daemon, + start_method, + use_signal, + arb_addr, +): + """Verify that closing a stream explicitly **before** the containing + nursery tears down also results in subactor(s) deregistering from the + arbiter. + """ + async def streamer(agen): + async for item in agen: + print(item) + + async def main(): + async with tractor.get_arbiter(*arb_addr) as aportal: + try: + get_reg = partial(aportal.run, 'self', 'get_registry') + async with tractor.open_nursery() as tn: + portal1 = await tn.run_in_actor('consumer1', stream_forever) + agen1 = await portal1.result() + + portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) + agen2 = await portal2.run(__name__, 'stream_forever') + + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block + # (i think?). + await agen1.aclose() + await agen2.aclose() + finally: + with trio.CancelScope(shield=True): + await trio.sleep(.5) + + # all subactors should have de-registered + registry = await get_reg() + assert portal1.channel.uid not in registry + assert portal2.channel.uid not in registry + assert len(registry) == 2 + + + with pytest.raises(KeyboardInterrupt): + tractor.run( + main, + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + ) From 3a868fec30fc592c6a4d60c021f44efd2e3fd253 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Aug 2020 11:34:17 -0400 Subject: [PATCH 02/19] Cancel root nursery to trigger failure The real issue is if the root nursery gets cancelled prior to de-registration with the arbiter. This doesn't seem easy to reproduce by side effect of a KBI however that is how it was discovered in practise. --- tests/test_discovery.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 96a3908..d182f4c 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -237,9 +237,9 @@ def test_close_channel_explicit( use_signal, arb_addr, ): - """Verify that closing a stream explicitly **before** the containing - nursery tears down also results in subactor(s) deregistering from the - arbiter. + """Verify that closing a stream explicitly and killing the actor's + "root nursery" **before** the containing nursery tears down also + results in subactor(s) deregistering from the arbiter. """ async def streamer(agen): async for item in agen: @@ -264,7 +264,14 @@ def test_close_channel_explicit( finally: # XXX: THIS IS THE KEY THING that happens # **before** exiting the actor nursery block - # (i think?). + + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + tractor.current_actor()._root_nursery.cancel_scope.cancel() + + # also kill off channels cuz why not await agen1.aclose() await agen2.aclose() finally: From ae8488a57862549706812de0a345df074bb32af2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Aug 2020 15:33:15 -0400 Subject: [PATCH 03/19] Always shield de-register step with arbiter --- tractor/_actor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index a0fb229..f98c695 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -678,9 +678,9 @@ class Actor: finally: if registered_with_arbiter: - # with trio.move_on_after(3) as cs: - # cs.shield = True - await self._do_unreg(self._arb_addr) + with trio.move_on_after(3) as cs: + cs.shield = True + await self._do_unreg(self._arb_addr) # terminate actor once all it's peers (actors that connected # to it as clients) have disappeared From fe45d99f65924347747119347d2c6fad2bdd6a9b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jul 2020 13:23:36 -0400 Subject: [PATCH 04/19] Allow opening a portal through an existing channel --- tractor/_portal.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 89cab39..c84a7fe 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -314,7 +314,8 @@ class LocalPortal: @asynccontextmanager async def open_portal( channel: Channel, - nursery: Optional[trio.Nursery] = None + nursery: Optional[trio.Nursery] = None, + start_msg_loop: bool = True, ) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -332,15 +333,17 @@ async def open_portal( if channel.uid is None: await actor._do_handshake(channel) - msg_loop_cs: trio.CancelScope = await nursery.start( - partial( - actor._process_messages, - channel, - # if the local task is cancelled we want to keep - # the msg loop running until our block ends - shield=True, + msg_loop_cs = None + if start_msg_loop: + msg_loop_cs: trio.CancelScope = await nursery.start( + partial( + actor._process_messages, + channel, + # if the local task is cancelled we want to keep + # the msg loop running until our block ends + shield=True, + ) ) - ) portal = Portal(channel) try: yield portal @@ -352,6 +355,7 @@ async def open_portal( await channel.send(None) # cancel background msg loop task - msg_loop_cs.cancel() + if msg_loop_cs: + msg_loop_cs.cancel() nursery.cancel_scope.cancel() From 532429aec9d722ae7feeeefdf01abf6d51acbc81 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 14:43:25 -0400 Subject: [PATCH 05/19] Harden `trio` spawner process waiting Always shield waiting for he process and always run ``trio.Process.__aexit__()`` on teardown. This enforces that shutdown happens to due cancellation triggered inside the sub-actor instead of the process being killed externally by the parent. --- tractor/_spawn.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index b8620c2..f0eb012 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -148,7 +148,7 @@ async def cancel_on_completion( else: log.info( f"Cancelling {portal.channel.uid} gracefully " - "after result {result}") + f"after result {result}") # cancel the process now that we have a final result await portal.cancel_actor() @@ -159,7 +159,6 @@ async def spawn_subactor( subactor: 'Actor', parent_addr: Tuple[str, int], ): - spawn_cmd = [ sys.executable, "-m", @@ -184,13 +183,19 @@ async def spawn_subactor( ] proc = await trio.open_process(spawn_cmd) - yield proc + try: + yield proc + finally: + # XXX: do this **after** cancellation/tearfown + # to avoid killing the process too early + # since trio does this internally on ``__aexit__()`` - # XXX: do this **after** cancellation/tearfown - # to avoid killing the process too early - # since trio does this internally on ``__aexit__()`` - async with proc: - log.debug(f"Terminating {proc}") + # NOTE: we always "shield" join sub procs in + # the outer scope since no actor zombies are + # ever allowed. This ``__aexit__()`` also shields + # internally. + async with proc: + log.debug(f"Terminating {proc}") async def new_proc( @@ -243,16 +248,21 @@ async def new_proc( task_status.started(portal) # wait for ActorNursery.wait() to be called - await actor_nursery._join_procs.wait() + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() if portal in actor_nursery._cancel_after_result_on_exit: cancel_scope = await nursery.start( cancel_on_completion, portal, subactor, errors) - # Wait for proc termination but **dont'** yet call + # Wait for proc termination but **dont' yet** call # ``trio.Process.__aexit__()`` (it tears down stdio # which will kill any waiting remote pdb trace). - await proc.wait() + + # always "hard" join sub procs: + # no actor zombies allowed + with trio.CancelScope(shield=True): + await proc.wait() else: # `multiprocessing` assert _ctx From 90c7fa69631e7b4e0311b75bb4bda3a72b984156 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 14:47:52 -0400 Subject: [PATCH 06/19] Allow shielding in `open_portal()` --- tractor/_portal.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index c84a7fe..5181af1 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -22,7 +22,8 @@ log = get_logger('tractor') @asynccontextmanager async def maybe_open_nursery( - nursery: trio.Nursery = None + nursery: trio.Nursery = None, + shield: bool = False, ) -> typing.AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. @@ -32,6 +33,7 @@ async def maybe_open_nursery( yield nursery else: async with trio.open_nursery() as nursery: + nursery.cancel_scope.shield = shield yield nursery @@ -275,6 +277,8 @@ class Portal: f"{self.channel}") try: # send cancel cmd - might not get response + # XXX: sure would be nice to make this work with a proper shield + # with trio.CancelScope(shield=True): with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True await self.run('self', 'cancel') @@ -316,6 +320,7 @@ async def open_portal( channel: Channel, nursery: Optional[trio.Nursery] = None, start_msg_loop: bool = True, + shield: bool = False, ) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -325,7 +330,7 @@ async def open_portal( assert actor was_connected = False - async with maybe_open_nursery(nursery) as nursery: + async with maybe_open_nursery(nursery, shield=shield) as nursery: if not channel.connected(): await channel.connect() was_connected = True From 8477d21499f5a4888c70de22beaffb6116bcebd0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 14:55:41 -0400 Subject: [PATCH 07/19] Restructure actor runtime nursery scoping In an effort acquire more deterministic actor cancellation, this adds a clearer and more resilient (whilst possibly a bit slower) internal nursery structure with explicit semantics for clarifying the task-scope shutdown sequence. Namely, on cancellation, the explicit steps are now: - cancel all currently running rpc tasks and wait for them to complete - cancel the channel server and wait for it to complete - cancel the msg loop for the channel with the immediate parent - de-register with arbiter if possible - wait on remaining connections to release - exit process To accomplish this add a new nursery called the "service nursery" which spawns all rpc tasks **instead of using** the "root nursery". The root is now used solely for async launching the msg loop for the primary channel with the parent such that it is (nearly) the last thing torn down on cancellation. In the future it should also be possible to have `self.cancel()` return a result to the parent once the runtime is sure that the rest of the shutdown is atomic; this would allow for a true unbounded shield in `Portal.cancel_actor()`. This will likely require that the error handling blocks in `Actor._async_main()` are moved "inside" the root nursery block such that the msg loop with the parent truly is the last thing to terminate. --- tractor/_actor.py | 350 +++++++++++++++++++++++++++------------------- 1 file changed, 206 insertions(+), 144 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index f98c695..e426be0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -167,9 +167,10 @@ class Actor: """ is_arbiter: bool = False - # placeholders filled in by `_async_main` after fork - _root_nursery: trio.Nursery - _server_nursery: trio.Nursery + # nursery placeholders filled in by `_async_main()` after fork + _root_n: trio.Nursery = None + _service_n: trio.Nursery = None + _server_n: Optional[trio.Nursery] = None # Information about `__main__` from parent _parent_main_data: Dict[str, str] @@ -293,7 +294,7 @@ class Actor: ) -> None: """Entry point for new inbound connections to the channel server. """ - self._no_more_peers = trio.Event() + self._no_more_peers = trio.Event() # unset chan = Channel(stream=stream) log.info(f"New connection to us {chan}") @@ -427,13 +428,13 @@ class Actor: msg = None log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - with trio.CancelScope(shield=shield) as cs: + with trio.CancelScope(shield=shield) as loop_cs: # this internal scope allows for keeping this message # loop running despite the current task having been # cancelled (eg. `open_portal()` may call this method from # a locally spawned task) and recieve this scope using # ``scope = Nursery.start()`` - task_status.started(cs) + task_status.started(loop_cs) async for msg in chan: if msg is None: # loop terminate sentinel log.debug( @@ -496,7 +497,7 @@ class Actor: # spin up a task for the requested function log.debug(f"Spawning task for {func}") - cs = await self._root_nursery.start( + cs = await self._service_n.start( partial(_invoke, self, cid, chan, func, kwargs), name=funcname, ) @@ -514,6 +515,13 @@ class Actor: # cancelled gracefully if requested self._rpc_tasks[(chan, cid)] = ( cs, func, trio.Event()) + else: + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.warning(f"{self.uid} was remotely cancelled") + loop_cs.cancel() + break + log.debug( f"Waiting on next msg for {chan} from {chan.uid}") else: @@ -540,6 +548,46 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") + async def _chan_to_parent( + self, + parent_addr: Optional[Tuple[str, int]], + ) -> Channel: + try: + # Connect back to the parent actor and conduct initial + # handshake. From this point on if we error, we + # attempt to ship the exception back to the parent. + chan = Channel( + destaddr=parent_addr, + ) + await chan.connect() + + # Initial handshake: swap names. + await self._do_handshake(chan) + + if self._spawn_method == "trio": + # Receive runtime state from our parent + parent_data = await chan.recv() + log.debug( + "Recieved state from parent:\n" + f"{parent_data}" + ) + accept_addr = ( + parent_data.pop('bind_host'), + parent_data.pop('bind_port'), + ) + for attr, value in parent_data.items(): + setattr(self, attr, value) + + return chan, accept_addr + + except OSError: # failed to connect + log.warning( + f"Failed to connect to parent @ {parent_addr}," + " closing server") + await self.cancel() + # self._parent_chan = None + raise + async def _async_main( self, accept_addr: Optional[Tuple[str, int]] = None, @@ -561,88 +609,84 @@ class Actor: """ registered_with_arbiter = False try: - async with trio.open_nursery() as nursery: - self._root_nursery = nursery - # TODO: just make `parent_addr` a bool system (see above)? - if parent_addr is not None: - try: - # Connect back to the parent actor and conduct initial - # handshake. From this point on if we error, we - # attempt to ship the exception back to the parent. - chan = self._parent_chan = Channel( - destaddr=parent_addr, + # establish primary connection with immediate parent + self._parent_chan = None + if parent_addr is not None: + self._parent_chan, accept_addr = await self._chan_to_parent( + parent_addr) + + # load exposed/allowed RPC modules + # XXX: do this **after** establishing a channel to the parent + # but **before** starting the message loop for that channel + # such that import errors are properly propagated upwards + self.load_modules() + + # The "root" nursery ensures the channel with the immediate + # parent is kept alive as a reslient service until + # cancellation steps have (mostly) ocurred in + # a deterministic way. + async with trio.open_nursery() as root_nursery: + self._root_n = root_nursery + + async with trio.open_nursery() as service_nursery: + # This nursery is used to handle all inbound + # connections to us such that if the TCP server + # is killed, connections can continue to process + # in the background until this nursery is cancelled. + self._service_n = service_nursery + + # Startup up the channel server with, + # - subactor: the bind address sent to us by our parent + # over our established channel + # - root actor: the ``accept_addr`` passed to this method + assert accept_addr + host, port = accept_addr + self._server_n = await service_nursery.start( + partial( + self._serve_forever, + service_nursery, + accept_host=host, + accept_port=port ) - await chan.connect() - - # Initial handshake: swap names. - await self._do_handshake(chan) - - if self._spawn_method == "trio": - # Receive runtime state from our parent - parent_data = await chan.recv() - log.debug( - "Recieved state from parent:\n" - f"{parent_data}" - ) - accept_addr = ( - parent_data.pop('bind_host'), - parent_data.pop('bind_port'), - ) - for attr, value in parent_data.items(): - setattr(self, attr, value) - - except OSError: # failed to connect - log.warning( - f"Failed to connect to parent @ {parent_addr}," - " closing server") - await self.cancel() - self._parent_chan = None - raise - - # load exposed/allowed RPC modules - # XXX: do this **after** establishing a channel to the parent - # but **before** starting the message loop for that channel - # such that import errors are properly propagated upwards - self.load_modules() - - # Startup up channel server with, - # - subactor: the bind address sent to us by our parent - # over our established channel - # - root actor: the ``accept_addr`` passed to this method - assert accept_addr - host, port = accept_addr - await nursery.start( - partial( - self._serve_forever, - accept_host=host, - accept_port=port ) - ) - # Begin handling our new connection back to parent. - # This is done here since we don't want to start - # processing parent requests until our server is - # 100% up and running. - if self._parent_chan: - nursery.start_soon( - self._process_messages, self._parent_chan) + # Register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + assert isinstance(self._arb_addr, tuple) + + async with get_arbiter(*self._arb_addr) as arb_portal: + await arb_portal.run( + 'self', + 'register_actor', + uid=self.uid, + sockaddr=self.accept_addr, + ) - # Register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") - assert isinstance(self._arb_addr, tuple) - async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - uid=self.uid, sockaddr=self.accept_addr) registered_with_arbiter = True - task_status.started() - log.debug("Waiting on root nursery to complete") + # init steps complete + task_status.started() - # Blocks here as expected until the channel server is + # Begin handling our new connection back to our + # parent. This is done last since we don't want to + # start processing parent requests until our channel + # server is 100% up and running. + if self._parent_chan: + await root_nursery.start( + partial( + self._process_messages, + self._parent_chan, + shield=True, + ) + ) + log.info("Waiting on service nursery to complete") + log.info("Service nursery complete") + log.info("Waiting on root nursery to complete") + + # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception as err: + except (trio.MultiError, Exception) as err: if not registered_with_arbiter: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger @@ -658,47 +702,57 @@ class Actor: ) if self._parent_chan: - try: - # internal error so ship to parent without cid - await self._parent_chan.send(pack_error(err)) - except trio.ClosedResourceError: - log.error( - f"Failed to ship error to parent " - f"{self._parent_chan.uid}, channel was closed") - log.exception("Actor errored:") + with trio.CancelScope(shield=True): + try: + # internal error so ship to parent without cid + await self._parent_chan.send(pack_error(err)) + except trio.ClosedResourceError: + log.error( + f"Failed to ship error to parent " + f"{self._parent_chan.uid}, channel was closed") + log.exception("Actor errored:") - if isinstance(err, ModuleNotFoundError): - raise - else: - # XXX wait, why? - # causes a hang if I always raise.. - # A parent process does something weird here? - # i'm so lost now.. - raise + # always! + raise finally: - if registered_with_arbiter: - with trio.move_on_after(3) as cs: - cs.shield = True - await self._do_unreg(self._arb_addr) + log.info("Root nursery complete") - # terminate actor once all it's peers (actors that connected - # to it as clients) have disappeared + # UNregister actor from the arbiter + if registered_with_arbiter and ( + self._arb_addr is not None + ): + failed = False + with trio.move_on_after(5) as cs: + cs.shield = True + try: + async with get_arbiter(*self._arb_addr) as arb_portal: + await arb_portal.run( + 'self', 'unregister_actor', uid=self.uid) + except OSError: + failed = True + if cs.cancelled_caught: + failed = True + if failed: + log.warning( + f"Failed to unregister {self.name} from arbiter") + + # Ensure all peers (actors connected to us as clients) are finished if not self._no_more_peers.is_set(): if any( chan.connected() for chan in chain(*self._peers.values()) ): log.debug( f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() + with trio.CancelScope(shield=True): + await self._no_more_peers.wait() log.debug("All peer channels are complete") - # tear down channel server no matter what since we errored - # or completed - self.cancel_server() + log.debug("Runtime completed") async def _serve_forever( self, + handler_nursery: trio.Nursery, *, # (host, port) to bind for channel server accept_host: Tuple[str, int] = None, @@ -710,48 +764,55 @@ class Actor: This will cause an actor to continue living (blocking) until ``cancel_server()`` is called. """ - async with trio.open_nursery() as nursery: - self._server_nursery = nursery - # TODO: might want to consider having a separate nursery - # for the stream handler such that the server can be cancelled - # whilst leaving existing channels up - listeners: List[trio.abc.Listener] = await nursery.start( - partial( - trio.serve_tcp, - self._stream_handler, - # new connections will stay alive even if this server - # is cancelled - handler_nursery=self._root_nursery, - port=accept_port, host=accept_host, - ) - ) - log.debug("Started tcp server(s) on" # type: ignore - f" {[l.socket for l in listeners]}") - self._listeners.extend(listeners) - task_status.started() - - async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None: - # UNregister actor from the arbiter + self._server_down = trio.Event() try: - if arbiter_addr is not None: - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'unregister_actor', uid=self.uid) - except OSError: - log.warning(f"Unable to unregister {self.name} from arbiter") + async with trio.open_nursery() as server_n: + listeners: List[trio.abc.Listener] = await server_n.start( + partial( + trio.serve_tcp, + self._stream_handler, + # new connections will stay alive even if this server + # is cancelled + handler_nursery=handler_nursery, + port=accept_port, + host=accept_host, + ) + ) + log.debug("Started tcp server(s) on" # type: ignore + f" {[l.socket for l in listeners]}") + self._listeners.extend(listeners) + task_status.started(server_n) + finally: + # signal the server is down since nursery above terminated + self._server_down.set() async def cancel(self) -> None: """Cancel this actor. - The sequence in order is: - - cancelling all rpc tasks - - cancelling the channel server - - cancel the "root" nursery + The "deterministic" teardown sequence in order is: + - cancel all ongoing rpc tasks by cancel scope + - cancel the channel server to prevent new inbound + connections + - cancel the "service" nursery reponsible for + spawning new rpc tasks + - return control the parent channel message loop """ # cancel all ongoing rpc tasks - await self.cancel_rpc_tasks() - self.cancel_server() - self._root_nursery.cancel_scope.cancel() + with trio.CancelScope(shield=True): + await self.cancel_rpc_tasks() + self.cancel_server() + await self._server_down.wait() + self._service_n.cancel_scope.cancel() + + return True + + # XXX: hard kill logic if needed? + # def _hard_mofo_kill(self): + # # If we're the root actor or zombied kill everything + # if self._parent_chan is None: # TODO: more robust check + # root = trio.lowlevel.current_root_task() + # for n in root.child_nurseries: + # n.cancel_scope.cancel() async def _cancel_task(self, cid, chan): """Cancel a local task by call-id / channel. @@ -804,11 +865,12 @@ class Actor: """Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. """ - log.debug("Shutting down channel server") - self._server_nursery.cancel_scope.cancel() + if self._server_n: + log.debug("Shutting down channel server") + self._server_n.cancel_scope.cancel() @property - def accept_addr(self) -> Tuple[str, int]: + def accept_addr(self) -> Optional[Tuple[str, int]]: """Primary address to which the channel server is bound. """ # throws OSError on failure From 7f74182a8ab5b9d8c2b0f39f2d31796fb9b7b8cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 15:15:43 -0400 Subject: [PATCH 08/19] Never allow more then info logging in daemon; causes blocking --- tests/conftest.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 07a455c..8525481 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -124,6 +124,10 @@ def sig_prog(proc, sig): def daemon(loglevel, testdir, arb_addr): """Run a daemon actor as a "remote arbiter". """ + if loglevel in ('trace', 'debug'): + # too much logging will lock up the subproc (smh) + loglevel = 'info' + cmdargs = [ sys.executable, '-c', "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" From c821690834753ecab7ffee165825cc3158a2d449 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 15:16:10 -0400 Subject: [PATCH 09/19] Actor cancellation is now more latent; loosen timeing --- tests/test_pubsub.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 745e5ec..aaee831 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -204,18 +204,18 @@ def test_multi_actor_subs_arbiter_pub( await trio.sleep(0.5) await even_portal.cancel_actor() - await trio.sleep(0.5) + await trio.sleep(1) if pub_actor == 'arbiter': assert 'even' not in get_topics() await odd_portal.cancel_actor() - await trio.sleep(1) + await trio.sleep(2) if pub_actor == 'arbiter': while get_topics(): await trio.sleep(0.1) - if time.time() - start > 1: + if time.time() - start > 2: pytest.fail("odds subscription never dropped?") else: await master_portal.cancel_actor() From acd5b80f4c36ceb819af13ae12744a892dbc38b4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 15:17:04 -0400 Subject: [PATCH 10/19] Add close channel test with remote arbiter --- tests/test_discovery.py | 132 ++++++++++++++++++++++++++-------------- 1 file changed, 85 insertions(+), 47 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index d182f4c..f180b60 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -199,7 +199,7 @@ def test_subactors_unregister_on_cancel( spawn_and_check_registry, arb_addr, use_signal, - False, + False, # remote arbiter with_streaming, arbiter_addr=arb_addr ) @@ -223,15 +223,94 @@ def test_subactors_unregister_on_cancel_remote_daemon( spawn_and_check_registry, arb_addr, use_signal, - True, + True, # remote arbiter with_streaming, # XXX: required to use remote daemon! arbiter_addr=arb_addr ) +async def streamer(agen): + async for item in agen: + print(item) + + +async def close_chans_before_nursery( + arb_addr: tuple, + use_signal: bool, + remote_arbiter: bool = False, +) -> None: + + # logic for how many actors should still be + # in the registry at teardown. + if remote_arbiter: + entries_at_end = 2 + else: + entries_at_end = 1 + + async with tractor.get_arbiter(*arb_addr) as aportal: + try: + get_reg = partial(aportal.run, 'self', 'get_registry') + + async with tractor.open_nursery() as tn: + portal1 = await tn.run_in_actor('consumer1', stream_forever) + agen1 = await portal1.result() + + portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) + agen2 = await portal2.run(__name__, 'stream_forever') + + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() + + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block + + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() + finally: + with trio.CancelScope(shield=True): + await trio.sleep(.5) + + # all subactors should have de-registered + registry = await get_reg() + assert portal1.channel.uid not in registry + assert portal2.channel.uid not in registry + assert len(registry) == entries_at_end + + @pytest.mark.parametrize('use_signal', [False, True]) def test_close_channel_explicit( + start_method, + use_signal, + arb_addr, +): + """Verify that closing a stream explicitly and killing the actor's + "root nursery" **before** the containing nursery tears down also + results in subactor(s) deregistering from the arbiter. + """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + close_chans_before_nursery, + arb_addr, + use_signal, + False, + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_close_channel_explicit_remote_arbiter( daemon, start_method, use_signal, @@ -241,53 +320,12 @@ def test_close_channel_explicit( "root nursery" **before** the containing nursery tears down also results in subactor(s) deregistering from the arbiter. """ - async def streamer(agen): - async for item in agen: - print(item) - - async def main(): - async with tractor.get_arbiter(*arb_addr) as aportal: - try: - get_reg = partial(aportal.run, 'self', 'get_registry') - async with tractor.open_nursery() as tn: - portal1 = await tn.run_in_actor('consumer1', stream_forever) - agen1 = await portal1.result() - - portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) - agen2 = await portal2.run(__name__, 'stream_forever') - - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block - - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - tractor.current_actor()._root_nursery.cancel_scope.cancel() - - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() - finally: - with trio.CancelScope(shield=True): - await trio.sleep(.5) - - # all subactors should have de-registered - registry = await get_reg() - assert portal1.channel.uid not in registry - assert portal2.channel.uid not in registry - assert len(registry) == 2 - - with pytest.raises(KeyboardInterrupt): tractor.run( - main, + close_chans_before_nursery, + arb_addr, + use_signal, + True, # XXX: required to use remote daemon! arbiter_addr=arb_addr ) From 42be410076ad2566b7d85c1941ae76155fb72c0a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 20:27:43 -0400 Subject: [PATCH 11/19] Handle mp accept_addr --- tractor/_actor.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index e426be0..94eab7f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -578,6 +578,9 @@ class Actor: for attr, value in parent_data.items(): setattr(self, attr, value) + else: # mp + accept_addr = None + return chan, accept_addr except OSError: # failed to connect @@ -613,9 +616,14 @@ class Actor: # establish primary connection with immediate parent self._parent_chan = None if parent_addr is not None: - self._parent_chan, accept_addr = await self._chan_to_parent( + self._parent_chan, accept_addr_from_rent = await self._chan_to_parent( parent_addr) + # either it's passed in because we're not a child + # or because we're running in mp mode + if accept_addr_from_rent is not None: + accept_addr = accept_addr_from_rent + # load exposed/allowed RPC modules # XXX: do this **after** establishing a channel to the parent # but **before** starting the message loop for that channel From b3eba00c3acab7066598cc367eedf20c3f704c62 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 20:57:18 -0400 Subject: [PATCH 12/19] Appease the great mypy --- tractor/_actor.py | 27 ++++++++++++++++++--------- tractor/_portal.py | 4 ++-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 94eab7f..d0c0ac2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -168,8 +168,8 @@ class Actor: is_arbiter: bool = False # nursery placeholders filled in by `_async_main()` after fork - _root_n: trio.Nursery = None - _service_n: trio.Nursery = None + _root_n: Optional[trio.Nursery] = None + _service_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None # Information about `__main__` from parent @@ -497,6 +497,7 @@ class Actor: # spin up a task for the requested function log.debug(f"Spawning task for {func}") + assert self._service_n cs = await self._service_n.start( partial(_invoke, self, cid, chan, func, kwargs), name=funcname, @@ -551,7 +552,7 @@ class Actor: async def _chan_to_parent( self, parent_addr: Optional[Tuple[str, int]], - ) -> Channel: + ) -> Tuple[Channel, Optional[Tuple[str, int]]]: try: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we @@ -564,6 +565,8 @@ class Actor: # Initial handshake: swap names. await self._do_handshake(chan) + accept_addr: Optional[Tuple[str, int]] = None + if self._spawn_method == "trio": # Receive runtime state from our parent parent_data = await chan.recv() @@ -578,9 +581,6 @@ class Actor: for attr, value in parent_data.items(): setattr(self, attr, value) - else: # mp - accept_addr = None - return chan, accept_addr except OSError: # failed to connect @@ -636,6 +636,7 @@ class Actor: # a deterministic way. async with trio.open_nursery() as root_nursery: self._root_n = root_nursery + assert self._root_n async with trio.open_nursery() as service_nursery: # This nursery is used to handle all inbound @@ -643,6 +644,7 @@ class Actor: # is killed, connections can continue to process # in the background until this nursery is cancelled. self._service_n = service_nursery + assert self._service_n # Startup up the channel server with, # - subactor: the bind address sent to us by our parent @@ -650,6 +652,7 @@ class Actor: # - root actor: the ``accept_addr`` passed to this method assert accept_addr host, port = accept_addr + self._server_n = await service_nursery.start( partial( self._serve_forever, @@ -765,7 +768,7 @@ class Actor: # (host, port) to bind for channel server accept_host: Tuple[str, int] = None, accept_port: int = 0, - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, begin listening for new connections. @@ -794,7 +797,7 @@ class Actor: # signal the server is down since nursery above terminated self._server_down.set() - async def cancel(self) -> None: + async def cancel(self) -> bool: """Cancel this actor. The "deterministic" teardown sequence in order is: @@ -807,10 +810,16 @@ class Actor: """ # cancel all ongoing rpc tasks with trio.CancelScope(shield=True): + # kill all ongoing tasks await self.cancel_rpc_tasks() + + # stop channel server self.cancel_server() await self._server_down.wait() - self._service_n.cancel_scope.cancel() + + # rekt all channel loops + if self._service_n: + self._service_n.cancel_scope.cancel() return True diff --git a/tractor/_portal.py b/tractor/_portal.py index 5181af1..e749ec6 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -338,9 +338,9 @@ async def open_portal( if channel.uid is None: await actor._do_handshake(channel) - msg_loop_cs = None + msg_loop_cs: Optional[trio.CancelScope] = None if start_msg_loop: - msg_loop_cs: trio.CancelScope = await nursery.start( + msg_loop_cs = await nursery.start( partial( actor._process_messages, channel, From 292513b3535fa04d72dd0dce17a26a833857f1e7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 20:58:04 -0400 Subject: [PATCH 13/19] Module define default accept addr --- tractor/_trionics.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 746960c..e846144 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -18,6 +18,8 @@ from . import _spawn log = get_logger('tractor') +_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) + class ActorNursery: """Spawn scoped subprocess actors. @@ -48,7 +50,7 @@ class ActorNursery: self, name: str, *, - bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + bind_addr: Tuple[str, int] = _default_bind_addr, statespace: Optional[Dict[str, Any]] = None, rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor @@ -89,7 +91,7 @@ class ActorNursery: name: str, fn: typing.Callable, *, - bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: Optional[List[str]] = None, statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor From 8a995beb6a592990e6a077de65cb61f6e775cc47 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Aug 2020 22:29:57 -0400 Subject: [PATCH 14/19] Docs fixes --- tractor/msg.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index cd064a0..41d1d6b 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -143,7 +143,7 @@ def pub( - packetizer: ``Callable[[str, Any], Any]`` a callback who receives the topic and value from the publisher function each ``yield`` such that whatever is returned is sent as the published value to subscribers of - that topic. By default this is a dict ``{topic: value}``. + that topic. By default this is a dict ``{topic: str: value: Any}``. As an example, to make a subscriber call the above function: @@ -161,7 +161,7 @@ def pub( task_name='source1', ) ) - async for value in portal.result(): + async for value in await portal.result(): print(f"Subscriber received {value}") @@ -264,4 +264,5 @@ def pub( # ``wrapt.decorator`` doesn't seem to want to play nice with its # whole "adapter" thing... wrapped._tractor_stream_function = True # type: ignore + return wrapper(wrapped) From 1ae0efb03307b4f6ac7204ab76b71c0c570d3864 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Aug 2020 11:53:45 -0400 Subject: [PATCH 15/19] Make rpc_module_paths a list --- tests/conftest.py | 2 +- tractor/__init__.py | 6 +++--- tractor/testing/_tractor_test.py | 4 +++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 8525481..24b0f92 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -130,7 +130,7 @@ def daemon(loglevel, testdir, arb_addr): cmdargs = [ sys.executable, '-c', - "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" + "import tractor; tractor.run_daemon([], arbiter_addr={}, loglevel={})" .format( arb_addr, "'{}'".format(loglevel) if loglevel else None) diff --git a/tractor/__init__.py b/tractor/__init__.py index 80eaf05..ea61d88 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -from typing import Tuple, Any, Optional +from typing import Tuple, Any, Optional, List import typing import trio # type: ignore @@ -115,7 +115,7 @@ def run( def run_daemon( - rpc_module_paths: Tuple[str], + rpc_module_paths: List[str], **kwargs ) -> None: """Spawn daemon actor which will respond to RPC. @@ -124,7 +124,7 @@ def run_daemon( ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned is meant to run forever responding to RPC requests. """ - kwargs['rpc_module_paths'] = rpc_module_paths + kwargs['rpc_module_paths'] = list(rpc_module_paths) for path in rpc_module_paths: importlib.import_module(path) diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 3db56e5..65f199f 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -2,7 +2,7 @@ import inspect import platform from functools import partial, wraps -from .. import run +from tractor import run __all__ = ['tractor_test'] @@ -38,6 +38,7 @@ def tractor_test(fn): # injects test suite fixture value to test as well # as `run()` kwargs['arb_addr'] = arb_addr + if 'loglevel' in inspect.signature(fn).parameters: # allows test suites to define a 'loglevel' fixture # that activates the internal logging @@ -52,6 +53,7 @@ def tractor_test(fn): if 'start_method' in inspect.signature(fn).parameters: # set of subprocess spawning backends kwargs['start_method'] = start_method + return run( partial(fn, *args, **kwargs), arbiter_addr=arb_addr, From 0c8dcd0ec5b6f4ebf67c129911a58dd5e53e3fce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Aug 2020 11:54:37 -0400 Subject: [PATCH 16/19] Use allocated arbiter port in local reg test --- tests/test_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_local.py b/tests/test_local.py index eb0c676..0a594d0 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -31,7 +31,7 @@ def test_no_main(): @tractor_test -async def test_self_is_registered(): +async def test_self_is_registered(arb_addr): "Verify waiting on the arbiter to register itself using the standard api." actor = tractor.current_actor() assert actor.is_arbiter From 863a4b7933c2ef12979114856e47e71bf2bfe1a6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Aug 2020 11:55:03 -0400 Subject: [PATCH 17/19] Update copyright date --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4f56472..cca38cf 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ # # tractor: a trionic actor model built on `multiprocessing` and `trio` # -# Copyright (C) 2018 Tyler Goodlet +# Copyright (C) 2018-2020 Tyler Goodlet # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by From ec5d443ee55b5e097a74aab391097a31ae49a9d5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Aug 2020 11:55:22 -0400 Subject: [PATCH 18/19] Always log actor errors --- tractor/_actor.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index d0c0ac2..d59bd82 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -588,7 +588,6 @@ class Actor: f"Failed to connect to parent @ {parent_addr}," " closing server") await self.cancel() - # self._parent_chan = None raise async def _async_main( @@ -631,8 +630,8 @@ class Actor: self.load_modules() # The "root" nursery ensures the channel with the immediate - # parent is kept alive as a reslient service until - # cancellation steps have (mostly) ocurred in + # parent is kept alive as a resilient service until + # cancellation steps have (mostly) occurred in # a deterministic way. async with trio.open_nursery() as root_nursery: self._root_n = root_nursery @@ -647,7 +646,7 @@ class Actor: assert self._service_n # Startup up the channel server with, - # - subactor: the bind address sent to us by our parent + # - subactor: the bind address is sent by our parent # over our established channel # - root actor: the ``accept_addr`` passed to this method assert accept_addr @@ -721,15 +720,15 @@ class Actor: log.error( f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") - log.exception("Actor errored:") # always! + log.exception("Actor errored:") raise finally: log.info("Root nursery complete") - # UNregister actor from the arbiter + # Unregister actor from the arbiter if registered_with_arbiter and ( self._arb_addr is not None ): From 451170bb637b51d4688770e3949cd00516065de0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Aug 2020 13:26:08 -0400 Subject: [PATCH 19/19] Pass explicit kwargs to new discovery test funcs --- tests/test_discovery.py | 44 ++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index f180b60..52a1dc8 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -196,11 +196,13 @@ def test_subactors_unregister_on_cancel( """ with pytest.raises(KeyboardInterrupt): tractor.run( - spawn_and_check_registry, - arb_addr, - use_signal, - False, # remote arbiter - with_streaming, + partial( + spawn_and_check_registry, + arb_addr, + use_signal, + remote_arbiter=False, + with_streaming=with_streaming, + ), arbiter_addr=arb_addr ) @@ -220,11 +222,13 @@ def test_subactors_unregister_on_cancel_remote_daemon( """ with pytest.raises(KeyboardInterrupt): tractor.run( - spawn_and_check_registry, - arb_addr, - use_signal, - True, # remote arbiter - with_streaming, + partial( + spawn_and_check_registry, + arb_addr, + use_signal, + remote_arbiter=True, + with_streaming=with_streaming, + ), # XXX: required to use remote daemon! arbiter_addr=arb_addr ) @@ -300,10 +304,12 @@ def test_close_channel_explicit( """ with pytest.raises(KeyboardInterrupt): tractor.run( - close_chans_before_nursery, - arb_addr, - use_signal, - False, + partial( + close_chans_before_nursery, + arb_addr, + use_signal, + remote_arbiter=False, + ), # XXX: required to use remote daemon! arbiter_addr=arb_addr ) @@ -322,10 +328,12 @@ def test_close_channel_explicit_remote_arbiter( """ with pytest.raises(KeyboardInterrupt): tractor.run( - close_chans_before_nursery, - arb_addr, - use_signal, - True, + partial( + close_chans_before_nursery, + arb_addr, + use_signal, + remote_arbiter=True, + ), # XXX: required to use remote daemon! arbiter_addr=arb_addr )