From 6b8393a4d68aaa105d140ab8f1e22fa4fd04ec7b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Sep 2018 09:39:53 -0400 Subject: [PATCH 1/6] Add `tractor.run_daemon()` for running a main rpc daemon --- tractor/__init__.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index d48e257..aac8036 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -2,7 +2,9 @@ tractor: An actor model micro-framework built on ``trio`` and ``multiprocessing``. """ +import importlib from functools import partial +from typing import Tuple, Any import typing import trio # type: ignore @@ -35,10 +37,10 @@ _default_arbiter_port = 1616 async def _main( async_fn: typing.Callable[..., typing.Awaitable], - args: typing.Tuple, + args: Tuple, kwargs: typing.Dict[str, typing.Any], name: str, - arbiter_addr: typing.Tuple[str, int] + arbiter_addr: Tuple[str, int] ) -> typing.Any: """Async entry point for ``tractor``. """ @@ -81,13 +83,28 @@ async def _main( def run( async_fn: typing.Callable[..., typing.Awaitable], - *args: typing.Tuple, + *args: Tuple, name: str = None, - arbiter_addr: typing.Tuple[str, int] = (_default_arbiter_host, _default_arbiter_port), + arbiter_addr: Tuple[str, int] = ( + _default_arbiter_host, _default_arbiter_port), **kwargs: typing.Dict[str, typing.Any], -): +) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) + + +def run_daemon( + rpc_modules: Tuple[str] = (), + **kwargs +) -> None: + for path in rpc_modules: + importlib.import_module(path) + + return run( + partial(trio.sleep, float('inf')), + rpc_module_paths=rpc_modules, + **kwargs + ) From ee7959cb55eb527fb2ea2713de1663ae7c1af3d9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Sep 2018 09:40:35 -0400 Subject: [PATCH 2/6] Fix same named actor race When an actor has already been registered with the arbiter it should exist in the registry and thus the wait event should have been removed. Check that the registry indeed holds an event before clearing it. --- tractor/_actor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2e00e99..4278a0d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -668,7 +668,8 @@ class Arbiter(Actor): events = self._waiters.pop(name, ()) self._waiters.setdefault(name, []).append(uid) for event in events: - event.set() + if isinstance(event, trio.Event): + event.set() def unregister_actor(self, uid: Tuple[str, str]) -> None: self._registry.pop(uid, None) From d12136d44d1142944f42de3b2f619bcca5a08166 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Sep 2018 09:44:29 -0400 Subject: [PATCH 3/6] Add some mult-program tests Run the arbiter-actor in a separate program and do some basic tests to make sure everything works - particularly, registration and cancellation. --- tests/conftest.py | 1 + tests/test_multi_program.py | 77 +++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 tests/test_multi_program.py diff --git a/tests/conftest.py b/tests/conftest.py index 9822a65..5226305 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ import pytest import tractor +pytest_plugins = ['pytester'] _arb_addr = '127.0.0.1', random.randint(1000, 9999) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py new file mode 100644 index 0000000..6a3a347 --- /dev/null +++ b/tests/test_multi_program.py @@ -0,0 +1,77 @@ +""" +Multiple python programs invoking ``tractor.run()`` +""" +import sys +import time +import signal +import subprocess + +import pytest +import tractor + + +from conftest import tractor_test + + +def sig_prog(proc, sig): + "Kill the actor-process with ctr-c." + proc.send_signal(sig) + ret = proc.wait() + assert ret + + +@pytest.fixture +def daemon(loglevel, testdir, arb_addr): + cmdargs = [ + sys.executable, '-c', + "import tractor; tractor.run_daemon(arbiter_addr={}, loglevel={})" + .format( + arb_addr, + "'{}'".format(loglevel) if loglevel else None) + ] + proc = testdir.popen( + cmdargs, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + assert not proc.returncode + time.sleep(0.2) + yield proc + # TODO: why sometimes does SIGINT not work on teardown? + sig_prog(proc, signal.SIGINT) + + +def test_abort_on_sigint(daemon): + assert daemon.returncode is None + time.sleep(0.1) + sig_prog(daemon, signal.SIGINT) + assert daemon.returncode == 1 + # XXX: oddly, couldn't get capfd.readouterr() to work here? + assert "KeyboardInterrupt" in str(daemon.stderr.read()) + + +@tractor_test +async def test_cancel_remote_arbiter(daemon, arb_addr): + async with tractor.get_arbiter(*arb_addr) as portal: + await portal.cancel_actor() + + time.sleep(0.1) + # the arbiter channel server is cancelled but not its main task + assert daemon.returncode is None + + # no arbiter socket should exist + with pytest.raises(OSError): + async with tractor.get_arbiter(*arb_addr) as portal: + pass + + +@tractor_test +async def test_register_duplicate_name(daemon): + async with tractor.open_nursery() as n: + p1 = await n.start_actor('doggy') + p2 = await n.start_actor('doggy') + + async with tractor.wait_for_actor('doggy') as portal: + assert portal.channel.uid in (p2.channel.uid, p1.channel.uid) + + await n.cancel() From d808ffd8f342342600cbb0e1bb74474f5789c521 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Sep 2018 15:19:49 -0400 Subject: [PATCH 4/6] `Logger.warn()` is deprecated --- tractor/__init__.py | 16 +++++++--------- tractor/_actor.py | 10 +++++----- tractor/_ipc.py | 6 +++--- tractor/_portal.py | 6 +++--- tractor/_trionics.py | 14 +++++++------- 5 files changed, 25 insertions(+), 27 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index aac8036..9742b72 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -from typing import Tuple, Any +from typing import Tuple, Any, Optional import typing import trio # type: ignore @@ -56,7 +56,7 @@ async def _main( async with _connect_chan(host, port): arbiter_found = True except OSError: - log.warn(f"No actor could be found @ {host}:{port}") + log.warning(f"No actor could be found @ {host}:{port}") # create a local actor and start up its main routine/task if arbiter_found: # we were able to connect to an arbiter @@ -97,14 +97,12 @@ def run( def run_daemon( - rpc_modules: Tuple[str] = (), + rpc_modules: Optional[Tuple[str]] = None, **kwargs ) -> None: - for path in rpc_modules: + for path in rpc_modules or (): importlib.import_module(path) - return run( - partial(trio.sleep, float('inf')), - rpc_module_paths=rpc_modules, - **kwargs - ) + kwargs['rpc_module_paths'] = rpc_modules + + return run(partial(trio.sleep, float('inf')), **kwargs) diff --git a/tractor/_actor.py b/tractor/_actor.py index 4278a0d..1962229 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -233,7 +233,7 @@ class Actor: try: uid = await _do_handshake(self, chan) except StopAsyncIteration: - log.warn(f"Channel {chan} failed to handshake") + log.warning(f"Channel {chan} failed to handshake") return # channel tracking @@ -248,7 +248,7 @@ class Actor: chans = self._peers[uid] if chans: - log.warn( + log.warning( f"already have channel(s) for {uid}:{chans}?" ) log.debug(f"Registered {chan} for {uid}") @@ -400,7 +400,7 @@ class Actor: parent_addr: Tuple[str, int] = None ) -> None: # after fork routine which invokes a fresh ``trio.run`` - # log.warn("Log level after fork is {self.loglevel}") + # log.warning("Log level after fork is {self.loglevel}") self._forkserver_info = forkserver_info from ._trionics import ctx if self.loglevel is not None: @@ -456,7 +456,7 @@ class Actor: # initial handshake, report who we are, who they are await _do_handshake(self, chan) except OSError: # failed to connect - log.warn( + log.warning( f"Failed to connect to parent @ {parent_addr}," " closing server") await self.cancel() @@ -555,7 +555,7 @@ class Actor: await arb_portal.run( 'self', 'unregister_actor', uid=self.uid) except OSError: - log.warn(f"Unable to unregister {self.name} from arbiter") + log.warning(f"Unable to unregister {self.name} from arbiter") async def cancel(self) -> None: """Cancel this actor. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6162150..6e2349e 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -158,11 +158,11 @@ class Channel: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warn( + log.warning( "Reconnect timed out after 3 seconds, retrying...") continue else: - log.warn("Stream connection re-established!") + log.warning("Stream connection re-established!") # run any reconnection sequence on_recon = self._recon_seq if on_recon: @@ -171,7 +171,7 @@ class Channel: except (OSError, ConnectionRefusedError): if not down: down = True - log.warn( + log.warning( f"Connection to {self.raddr} went down, waiting" " for re-establishment") await trio.sleep(1) diff --git a/tractor/_portal.py b/tractor/_portal.py index ddbea76..f8a01ac 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -186,7 +186,7 @@ class Portal: async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ - log.warn( + log.warning( f"Sending cancel request to {self.channel.uid} on " f"{self.channel}") try: @@ -196,11 +196,11 @@ class Portal: await self.run('self', 'cancel') return True except trio.ClosedResourceError: - log.warn( + log.warning( f"{self.channel} for {self.channel.uid} was already closed?") return False else: - log.warn(f"May have failed to cancel {self.channel.uid}") + log.warning(f"May have failed to cancel {self.channel.uid}") return False diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 7c3e72c..dc607c7 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -153,7 +153,7 @@ class ActorNursery: # if it's an async-gen then we should alert the user # that we're cancelling it if inspect.isasyncgen(res): - log.warn( + log.warning( f"Blindly consuming asyncgen for {actor.uid}") with trio.fail_after(1): async with aclosing(res) as agen: @@ -177,7 +177,7 @@ class ActorNursery: self._children.pop(actor.uid) # proc terminated, cancel result waiter if cancel_scope: - log.warn( + log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -194,7 +194,7 @@ class ActorNursery: await portal.cancel_actor() if cs.cancelled_caught: - log.warn("Result waiter was cancelled") + log.warning("Result waiter was cancelled") # unblocks when all waiter tasks have completed children = self._children.copy() @@ -213,7 +213,7 @@ class ActorNursery: directly without any far end graceful ``trio`` cancellation. """ def do_hard_kill(proc): - log.warn(f"Hard killing subactors {self._children}") + log.warning(f"Hard killing subactors {self._children}") proc.terminate() # XXX: below doesn't seem to work? # send KeyBoardInterrupt (trio abort signal) to sub-actors @@ -228,7 +228,7 @@ class ActorNursery: else: if portal is None: # actor hasn't fully spawned yet event = self._actor._peer_connected[subactor.uid] - log.warn( + log.warning( f"{subactor.uid} wasn't finished spawning?") await event.wait() # channel/portal should now be up @@ -260,7 +260,7 @@ class ActorNursery: # else block here might not complete? Should both be shielded? with trio.open_cancel_scope(shield=True): if etype is trio.Cancelled: - log.warn( + log.warning( f"{current_actor().uid} was cancelled with {etype}" ", cancelling actor nursery") await self.cancel() @@ -276,7 +276,7 @@ class ActorNursery: try: await self.wait() except Exception as err: - log.warn(f"Nursery caught {err}, cancelling") + log.warning(f"Nursery caught {err}, cancelling") await self.cancel() raise log.debug(f"Nursery teardown complete") From 037c4c3797e95f4b156e443715e041b974bf686b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Sep 2018 15:28:19 -0400 Subject: [PATCH 5/6] Comment tweak --- tests/test_multi_program.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 6a3a347..dc3d7cf 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -14,7 +14,7 @@ from conftest import tractor_test def sig_prog(proc, sig): - "Kill the actor-process with ctr-c." + "Kill the actor-process with ``sig``." proc.send_signal(sig) ret = proc.wait() assert ret From 827a6c60147cf4decdd38a7192b302ba51f8e6ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Sep 2018 21:56:40 -0400 Subject: [PATCH 6/6] Make `rpc_modules` a positional arg to `tractor.run_daemon()` --- tests/test_multi_program.py | 2 +- tractor/__init__.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index dc3d7cf..b13b80b 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -24,7 +24,7 @@ def sig_prog(proc, sig): def daemon(loglevel, testdir, arb_addr): cmdargs = [ sys.executable, '-c', - "import tractor; tractor.run_daemon(arbiter_addr={}, loglevel={})" + "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" .format( arb_addr, "'{}'".format(loglevel) if loglevel else None) diff --git a/tractor/__init__.py b/tractor/__init__.py index 9742b72..f6100f7 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -97,12 +97,14 @@ def run( def run_daemon( - rpc_modules: Optional[Tuple[str]] = None, + rpc_modules: Tuple[str], **kwargs ) -> None: - for path in rpc_modules or (): - importlib.import_module(path) - + """Spawn a single daemon-actor which will repond to RPC. + """ kwargs['rpc_module_paths'] = rpc_modules + for path in rpc_modules: + importlib.import_module(path) + return run(partial(trio.sleep, float('inf')), **kwargs)