diff --git a/setup.py b/setup.py index 4f56472..cca38cf 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ # # tractor: a trionic actor model built on `multiprocessing` and `trio` # -# Copyright (C) 2018 Tyler Goodlet +# Copyright (C) 2018-2020 Tyler Goodlet # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by diff --git a/tests/conftest.py b/tests/conftest.py index 07a455c..24b0f92 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -124,9 +124,13 @@ def sig_prog(proc, sig): def daemon(loglevel, testdir, arb_addr): """Run a daemon actor as a "remote arbiter". """ + if loglevel in ('trace', 'debug'): + # too much logging will lock up the subproc (smh) + loglevel = 'info' + cmdargs = [ sys.executable, '-c', - "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" + "import tractor; tractor.run_daemon([], arbiter_addr={}, loglevel={})" .format( arb_addr, "'{}'".format(loglevel) if loglevel else None) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 6370fe2..52a1dc8 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -196,11 +196,13 @@ def test_subactors_unregister_on_cancel( """ with pytest.raises(KeyboardInterrupt): tractor.run( - spawn_and_check_registry, - arb_addr, - use_signal, - False, - with_streaming, + partial( + spawn_and_check_registry, + arb_addr, + use_signal, + remote_arbiter=False, + with_streaming=with_streaming, + ), arbiter_addr=arb_addr ) @@ -220,11 +222,118 @@ def test_subactors_unregister_on_cancel_remote_daemon( """ with pytest.raises(KeyboardInterrupt): tractor.run( - spawn_and_check_registry, - arb_addr, - use_signal, - True, - with_streaming, + partial( + spawn_and_check_registry, + arb_addr, + use_signal, + remote_arbiter=True, + with_streaming=with_streaming, + ), + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + ) + + +async def streamer(agen): + async for item in agen: + print(item) + + +async def close_chans_before_nursery( + arb_addr: tuple, + use_signal: bool, + remote_arbiter: bool = False, +) -> None: + + # logic for how many actors should still be + # in the registry at teardown. + if remote_arbiter: + entries_at_end = 2 + else: + entries_at_end = 1 + + async with tractor.get_arbiter(*arb_addr) as aportal: + try: + get_reg = partial(aportal.run, 'self', 'get_registry') + + async with tractor.open_nursery() as tn: + portal1 = await tn.run_in_actor('consumer1', stream_forever) + agen1 = await portal1.result() + + portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) + agen2 = await portal2.run(__name__, 'stream_forever') + + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() + + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block + + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() + finally: + with trio.CancelScope(shield=True): + await trio.sleep(.5) + + # all subactors should have de-registered + registry = await get_reg() + assert portal1.channel.uid not in registry + assert portal2.channel.uid not in registry + assert len(registry) == entries_at_end + + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_close_channel_explicit( + start_method, + use_signal, + arb_addr, +): + """Verify that closing a stream explicitly and killing the actor's + "root nursery" **before** the containing nursery tears down also + results in subactor(s) deregistering from the arbiter. + """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + partial( + close_chans_before_nursery, + arb_addr, + use_signal, + remote_arbiter=False, + ), + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_close_channel_explicit_remote_arbiter( + daemon, + start_method, + use_signal, + arb_addr, +): + """Verify that closing a stream explicitly and killing the actor's + "root nursery" **before** the containing nursery tears down also + results in subactor(s) deregistering from the arbiter. + """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + partial( + close_chans_before_nursery, + arb_addr, + use_signal, + remote_arbiter=True, + ), # XXX: required to use remote daemon! arbiter_addr=arb_addr ) diff --git a/tests/test_local.py b/tests/test_local.py index eb0c676..0a594d0 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -31,7 +31,7 @@ def test_no_main(): @tractor_test -async def test_self_is_registered(): +async def test_self_is_registered(arb_addr): "Verify waiting on the arbiter to register itself using the standard api." actor = tractor.current_actor() assert actor.is_arbiter diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 745e5ec..aaee831 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -204,18 +204,18 @@ def test_multi_actor_subs_arbiter_pub( await trio.sleep(0.5) await even_portal.cancel_actor() - await trio.sleep(0.5) + await trio.sleep(1) if pub_actor == 'arbiter': assert 'even' not in get_topics() await odd_portal.cancel_actor() - await trio.sleep(1) + await trio.sleep(2) if pub_actor == 'arbiter': while get_topics(): await trio.sleep(0.1) - if time.time() - start > 1: + if time.time() - start > 2: pytest.fail("odds subscription never dropped?") else: await master_portal.cancel_actor() diff --git a/tractor/__init__.py b/tractor/__init__.py index 80eaf05..ea61d88 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -from typing import Tuple, Any, Optional +from typing import Tuple, Any, Optional, List import typing import trio # type: ignore @@ -115,7 +115,7 @@ def run( def run_daemon( - rpc_module_paths: Tuple[str], + rpc_module_paths: List[str], **kwargs ) -> None: """Spawn daemon actor which will respond to RPC. @@ -124,7 +124,7 @@ def run_daemon( ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned is meant to run forever responding to RPC requests. """ - kwargs['rpc_module_paths'] = rpc_module_paths + kwargs['rpc_module_paths'] = list(rpc_module_paths) for path in rpc_module_paths: importlib.import_module(path) diff --git a/tractor/_actor.py b/tractor/_actor.py index a0fb229..d59bd82 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -167,9 +167,10 @@ class Actor: """ is_arbiter: bool = False - # placeholders filled in by `_async_main` after fork - _root_nursery: trio.Nursery - _server_nursery: trio.Nursery + # nursery placeholders filled in by `_async_main()` after fork + _root_n: Optional[trio.Nursery] = None + _service_n: Optional[trio.Nursery] = None + _server_n: Optional[trio.Nursery] = None # Information about `__main__` from parent _parent_main_data: Dict[str, str] @@ -293,7 +294,7 @@ class Actor: ) -> None: """Entry point for new inbound connections to the channel server. """ - self._no_more_peers = trio.Event() + self._no_more_peers = trio.Event() # unset chan = Channel(stream=stream) log.info(f"New connection to us {chan}") @@ -427,13 +428,13 @@ class Actor: msg = None log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - with trio.CancelScope(shield=shield) as cs: + with trio.CancelScope(shield=shield) as loop_cs: # this internal scope allows for keeping this message # loop running despite the current task having been # cancelled (eg. `open_portal()` may call this method from # a locally spawned task) and recieve this scope using # ``scope = Nursery.start()`` - task_status.started(cs) + task_status.started(loop_cs) async for msg in chan: if msg is None: # loop terminate sentinel log.debug( @@ -496,7 +497,8 @@ class Actor: # spin up a task for the requested function log.debug(f"Spawning task for {func}") - cs = await self._root_nursery.start( + assert self._service_n + cs = await self._service_n.start( partial(_invoke, self, cid, chan, func, kwargs), name=funcname, ) @@ -514,6 +516,13 @@ class Actor: # cancelled gracefully if requested self._rpc_tasks[(chan, cid)] = ( cs, func, trio.Event()) + else: + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.warning(f"{self.uid} was remotely cancelled") + loop_cs.cancel() + break + log.debug( f"Waiting on next msg for {chan} from {chan.uid}") else: @@ -540,6 +549,47 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") + async def _chan_to_parent( + self, + parent_addr: Optional[Tuple[str, int]], + ) -> Tuple[Channel, Optional[Tuple[str, int]]]: + try: + # Connect back to the parent actor and conduct initial + # handshake. From this point on if we error, we + # attempt to ship the exception back to the parent. + chan = Channel( + destaddr=parent_addr, + ) + await chan.connect() + + # Initial handshake: swap names. + await self._do_handshake(chan) + + accept_addr: Optional[Tuple[str, int]] = None + + if self._spawn_method == "trio": + # Receive runtime state from our parent + parent_data = await chan.recv() + log.debug( + "Recieved state from parent:\n" + f"{parent_data}" + ) + accept_addr = ( + parent_data.pop('bind_host'), + parent_data.pop('bind_port'), + ) + for attr, value in parent_data.items(): + setattr(self, attr, value) + + return chan, accept_addr + + except OSError: # failed to connect + log.warning( + f"Failed to connect to parent @ {parent_addr}," + " closing server") + await self.cancel() + raise + async def _async_main( self, accept_addr: Optional[Tuple[str, int]] = None, @@ -561,88 +611,92 @@ class Actor: """ registered_with_arbiter = False try: - async with trio.open_nursery() as nursery: - self._root_nursery = nursery - # TODO: just make `parent_addr` a bool system (see above)? - if parent_addr is not None: - try: - # Connect back to the parent actor and conduct initial - # handshake. From this point on if we error, we - # attempt to ship the exception back to the parent. - chan = self._parent_chan = Channel( - destaddr=parent_addr, + # establish primary connection with immediate parent + self._parent_chan = None + if parent_addr is not None: + self._parent_chan, accept_addr_from_rent = await self._chan_to_parent( + parent_addr) + + # either it's passed in because we're not a child + # or because we're running in mp mode + if accept_addr_from_rent is not None: + accept_addr = accept_addr_from_rent + + # load exposed/allowed RPC modules + # XXX: do this **after** establishing a channel to the parent + # but **before** starting the message loop for that channel + # such that import errors are properly propagated upwards + self.load_modules() + + # The "root" nursery ensures the channel with the immediate + # parent is kept alive as a resilient service until + # cancellation steps have (mostly) occurred in + # a deterministic way. + async with trio.open_nursery() as root_nursery: + self._root_n = root_nursery + assert self._root_n + + async with trio.open_nursery() as service_nursery: + # This nursery is used to handle all inbound + # connections to us such that if the TCP server + # is killed, connections can continue to process + # in the background until this nursery is cancelled. + self._service_n = service_nursery + assert self._service_n + + # Startup up the channel server with, + # - subactor: the bind address is sent by our parent + # over our established channel + # - root actor: the ``accept_addr`` passed to this method + assert accept_addr + host, port = accept_addr + + self._server_n = await service_nursery.start( + partial( + self._serve_forever, + service_nursery, + accept_host=host, + accept_port=port ) - await chan.connect() - - # Initial handshake: swap names. - await self._do_handshake(chan) - - if self._spawn_method == "trio": - # Receive runtime state from our parent - parent_data = await chan.recv() - log.debug( - "Recieved state from parent:\n" - f"{parent_data}" - ) - accept_addr = ( - parent_data.pop('bind_host'), - parent_data.pop('bind_port'), - ) - for attr, value in parent_data.items(): - setattr(self, attr, value) - - except OSError: # failed to connect - log.warning( - f"Failed to connect to parent @ {parent_addr}," - " closing server") - await self.cancel() - self._parent_chan = None - raise - - # load exposed/allowed RPC modules - # XXX: do this **after** establishing a channel to the parent - # but **before** starting the message loop for that channel - # such that import errors are properly propagated upwards - self.load_modules() - - # Startup up channel server with, - # - subactor: the bind address sent to us by our parent - # over our established channel - # - root actor: the ``accept_addr`` passed to this method - assert accept_addr - host, port = accept_addr - await nursery.start( - partial( - self._serve_forever, - accept_host=host, - accept_port=port ) - ) - # Begin handling our new connection back to parent. - # This is done here since we don't want to start - # processing parent requests until our server is - # 100% up and running. - if self._parent_chan: - nursery.start_soon( - self._process_messages, self._parent_chan) + # Register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + assert isinstance(self._arb_addr, tuple) + + async with get_arbiter(*self._arb_addr) as arb_portal: + await arb_portal.run( + 'self', + 'register_actor', + uid=self.uid, + sockaddr=self.accept_addr, + ) - # Register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") - assert isinstance(self._arb_addr, tuple) - async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - uid=self.uid, sockaddr=self.accept_addr) registered_with_arbiter = True - task_status.started() - log.debug("Waiting on root nursery to complete") + # init steps complete + task_status.started() - # Blocks here as expected until the channel server is + # Begin handling our new connection back to our + # parent. This is done last since we don't want to + # start processing parent requests until our channel + # server is 100% up and running. + if self._parent_chan: + await root_nursery.start( + partial( + self._process_messages, + self._parent_chan, + shield=True, + ) + ) + log.info("Waiting on service nursery to complete") + log.info("Service nursery complete") + log.info("Waiting on root nursery to complete") + + # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception as err: + except (trio.MultiError, Exception) as err: if not registered_with_arbiter: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger @@ -658,100 +712,123 @@ class Actor: ) if self._parent_chan: - try: - # internal error so ship to parent without cid - await self._parent_chan.send(pack_error(err)) - except trio.ClosedResourceError: - log.error( - f"Failed to ship error to parent " - f"{self._parent_chan.uid}, channel was closed") - log.exception("Actor errored:") + with trio.CancelScope(shield=True): + try: + # internal error so ship to parent without cid + await self._parent_chan.send(pack_error(err)) + except trio.ClosedResourceError: + log.error( + f"Failed to ship error to parent " + f"{self._parent_chan.uid}, channel was closed") - if isinstance(err, ModuleNotFoundError): - raise - else: - # XXX wait, why? - # causes a hang if I always raise.. - # A parent process does something weird here? - # i'm so lost now.. - raise + # always! + log.exception("Actor errored:") + raise finally: - if registered_with_arbiter: - # with trio.move_on_after(3) as cs: - # cs.shield = True - await self._do_unreg(self._arb_addr) + log.info("Root nursery complete") - # terminate actor once all it's peers (actors that connected - # to it as clients) have disappeared + # Unregister actor from the arbiter + if registered_with_arbiter and ( + self._arb_addr is not None + ): + failed = False + with trio.move_on_after(5) as cs: + cs.shield = True + try: + async with get_arbiter(*self._arb_addr) as arb_portal: + await arb_portal.run( + 'self', 'unregister_actor', uid=self.uid) + except OSError: + failed = True + if cs.cancelled_caught: + failed = True + if failed: + log.warning( + f"Failed to unregister {self.name} from arbiter") + + # Ensure all peers (actors connected to us as clients) are finished if not self._no_more_peers.is_set(): if any( chan.connected() for chan in chain(*self._peers.values()) ): log.debug( f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() + with trio.CancelScope(shield=True): + await self._no_more_peers.wait() log.debug("All peer channels are complete") - # tear down channel server no matter what since we errored - # or completed - self.cancel_server() + log.debug("Runtime completed") async def _serve_forever( self, + handler_nursery: trio.Nursery, *, # (host, port) to bind for channel server accept_host: Tuple[str, int] = None, accept_port: int = 0, - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, begin listening for new connections. This will cause an actor to continue living (blocking) until ``cancel_server()`` is called. """ - async with trio.open_nursery() as nursery: - self._server_nursery = nursery - # TODO: might want to consider having a separate nursery - # for the stream handler such that the server can be cancelled - # whilst leaving existing channels up - listeners: List[trio.abc.Listener] = await nursery.start( - partial( - trio.serve_tcp, - self._stream_handler, - # new connections will stay alive even if this server - # is cancelled - handler_nursery=self._root_nursery, - port=accept_port, host=accept_host, - ) - ) - log.debug("Started tcp server(s) on" # type: ignore - f" {[l.socket for l in listeners]}") - self._listeners.extend(listeners) - task_status.started() - - async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None: - # UNregister actor from the arbiter + self._server_down = trio.Event() try: - if arbiter_addr is not None: - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'unregister_actor', uid=self.uid) - except OSError: - log.warning(f"Unable to unregister {self.name} from arbiter") + async with trio.open_nursery() as server_n: + listeners: List[trio.abc.Listener] = await server_n.start( + partial( + trio.serve_tcp, + self._stream_handler, + # new connections will stay alive even if this server + # is cancelled + handler_nursery=handler_nursery, + port=accept_port, + host=accept_host, + ) + ) + log.debug("Started tcp server(s) on" # type: ignore + f" {[l.socket for l in listeners]}") + self._listeners.extend(listeners) + task_status.started(server_n) + finally: + # signal the server is down since nursery above terminated + self._server_down.set() - async def cancel(self) -> None: + async def cancel(self) -> bool: """Cancel this actor. - The sequence in order is: - - cancelling all rpc tasks - - cancelling the channel server - - cancel the "root" nursery + The "deterministic" teardown sequence in order is: + - cancel all ongoing rpc tasks by cancel scope + - cancel the channel server to prevent new inbound + connections + - cancel the "service" nursery reponsible for + spawning new rpc tasks + - return control the parent channel message loop """ # cancel all ongoing rpc tasks - await self.cancel_rpc_tasks() - self.cancel_server() - self._root_nursery.cancel_scope.cancel() + with trio.CancelScope(shield=True): + # kill all ongoing tasks + await self.cancel_rpc_tasks() + + # stop channel server + self.cancel_server() + await self._server_down.wait() + + # rekt all channel loops + if self._service_n: + self._service_n.cancel_scope.cancel() + + return True + + # XXX: hard kill logic if needed? + # def _hard_mofo_kill(self): + # # If we're the root actor or zombied kill everything + # if self._parent_chan is None: # TODO: more robust check + # root = trio.lowlevel.current_root_task() + # for n in root.child_nurseries: + # n.cancel_scope.cancel() async def _cancel_task(self, cid, chan): """Cancel a local task by call-id / channel. @@ -804,11 +881,12 @@ class Actor: """Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. """ - log.debug("Shutting down channel server") - self._server_nursery.cancel_scope.cancel() + if self._server_n: + log.debug("Shutting down channel server") + self._server_n.cancel_scope.cancel() @property - def accept_addr(self) -> Tuple[str, int]: + def accept_addr(self) -> Optional[Tuple[str, int]]: """Primary address to which the channel server is bound. """ # throws OSError on failure diff --git a/tractor/_portal.py b/tractor/_portal.py index 89cab39..e749ec6 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -22,7 +22,8 @@ log = get_logger('tractor') @asynccontextmanager async def maybe_open_nursery( - nursery: trio.Nursery = None + nursery: trio.Nursery = None, + shield: bool = False, ) -> typing.AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. @@ -32,6 +33,7 @@ async def maybe_open_nursery( yield nursery else: async with trio.open_nursery() as nursery: + nursery.cancel_scope.shield = shield yield nursery @@ -275,6 +277,8 @@ class Portal: f"{self.channel}") try: # send cancel cmd - might not get response + # XXX: sure would be nice to make this work with a proper shield + # with trio.CancelScope(shield=True): with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True await self.run('self', 'cancel') @@ -314,7 +318,9 @@ class LocalPortal: @asynccontextmanager async def open_portal( channel: Channel, - nursery: Optional[trio.Nursery] = None + nursery: Optional[trio.Nursery] = None, + start_msg_loop: bool = True, + shield: bool = False, ) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -324,7 +330,7 @@ async def open_portal( assert actor was_connected = False - async with maybe_open_nursery(nursery) as nursery: + async with maybe_open_nursery(nursery, shield=shield) as nursery: if not channel.connected(): await channel.connect() was_connected = True @@ -332,15 +338,17 @@ async def open_portal( if channel.uid is None: await actor._do_handshake(channel) - msg_loop_cs: trio.CancelScope = await nursery.start( - partial( - actor._process_messages, - channel, - # if the local task is cancelled we want to keep - # the msg loop running until our block ends - shield=True, + msg_loop_cs: Optional[trio.CancelScope] = None + if start_msg_loop: + msg_loop_cs = await nursery.start( + partial( + actor._process_messages, + channel, + # if the local task is cancelled we want to keep + # the msg loop running until our block ends + shield=True, + ) ) - ) portal = Portal(channel) try: yield portal @@ -352,6 +360,7 @@ async def open_portal( await channel.send(None) # cancel background msg loop task - msg_loop_cs.cancel() + if msg_loop_cs: + msg_loop_cs.cancel() nursery.cancel_scope.cancel() diff --git a/tractor/_spawn.py b/tractor/_spawn.py index b8620c2..f0eb012 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -148,7 +148,7 @@ async def cancel_on_completion( else: log.info( f"Cancelling {portal.channel.uid} gracefully " - "after result {result}") + f"after result {result}") # cancel the process now that we have a final result await portal.cancel_actor() @@ -159,7 +159,6 @@ async def spawn_subactor( subactor: 'Actor', parent_addr: Tuple[str, int], ): - spawn_cmd = [ sys.executable, "-m", @@ -184,13 +183,19 @@ async def spawn_subactor( ] proc = await trio.open_process(spawn_cmd) - yield proc + try: + yield proc + finally: + # XXX: do this **after** cancellation/tearfown + # to avoid killing the process too early + # since trio does this internally on ``__aexit__()`` - # XXX: do this **after** cancellation/tearfown - # to avoid killing the process too early - # since trio does this internally on ``__aexit__()`` - async with proc: - log.debug(f"Terminating {proc}") + # NOTE: we always "shield" join sub procs in + # the outer scope since no actor zombies are + # ever allowed. This ``__aexit__()`` also shields + # internally. + async with proc: + log.debug(f"Terminating {proc}") async def new_proc( @@ -243,16 +248,21 @@ async def new_proc( task_status.started(portal) # wait for ActorNursery.wait() to be called - await actor_nursery._join_procs.wait() + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() if portal in actor_nursery._cancel_after_result_on_exit: cancel_scope = await nursery.start( cancel_on_completion, portal, subactor, errors) - # Wait for proc termination but **dont'** yet call + # Wait for proc termination but **dont' yet** call # ``trio.Process.__aexit__()`` (it tears down stdio # which will kill any waiting remote pdb trace). - await proc.wait() + + # always "hard" join sub procs: + # no actor zombies allowed + with trio.CancelScope(shield=True): + await proc.wait() else: # `multiprocessing` assert _ctx diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 746960c..e846144 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -18,6 +18,8 @@ from . import _spawn log = get_logger('tractor') +_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) + class ActorNursery: """Spawn scoped subprocess actors. @@ -48,7 +50,7 @@ class ActorNursery: self, name: str, *, - bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + bind_addr: Tuple[str, int] = _default_bind_addr, statespace: Optional[Dict[str, Any]] = None, rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor @@ -89,7 +91,7 @@ class ActorNursery: name: str, fn: typing.Callable, *, - bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: Optional[List[str]] = None, statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor diff --git a/tractor/msg.py b/tractor/msg.py index cd064a0..41d1d6b 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -143,7 +143,7 @@ def pub( - packetizer: ``Callable[[str, Any], Any]`` a callback who receives the topic and value from the publisher function each ``yield`` such that whatever is returned is sent as the published value to subscribers of - that topic. By default this is a dict ``{topic: value}``. + that topic. By default this is a dict ``{topic: str: value: Any}``. As an example, to make a subscriber call the above function: @@ -161,7 +161,7 @@ def pub( task_name='source1', ) ) - async for value in portal.result(): + async for value in await portal.result(): print(f"Subscriber received {value}") @@ -264,4 +264,5 @@ def pub( # ``wrapt.decorator`` doesn't seem to want to play nice with its # whole "adapter" thing... wrapped._tractor_stream_function = True # type: ignore + return wrapper(wrapped) diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 3db56e5..65f199f 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -2,7 +2,7 @@ import inspect import platform from functools import partial, wraps -from .. import run +from tractor import run __all__ = ['tractor_test'] @@ -38,6 +38,7 @@ def tractor_test(fn): # injects test suite fixture value to test as well # as `run()` kwargs['arb_addr'] = arb_addr + if 'loglevel' in inspect.signature(fn).parameters: # allows test suites to define a 'loglevel' fixture # that activates the internal logging @@ -52,6 +53,7 @@ def tractor_test(fn): if 'start_method' in inspect.signature(fn).parameters: # set of subprocess spawning backends kwargs['start_method'] = start_method + return run( partial(fn, *args, **kwargs), arbiter_addr=arb_addr,