diff --git a/tests/conftest.py b/tests/conftest.py index 9822a65..5226305 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ import pytest import tractor +pytest_plugins = ['pytester'] _arb_addr = '127.0.0.1', random.randint(1000, 9999) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py new file mode 100644 index 0000000..b13b80b --- /dev/null +++ b/tests/test_multi_program.py @@ -0,0 +1,77 @@ +""" +Multiple python programs invoking ``tractor.run()`` +""" +import sys +import time +import signal +import subprocess + +import pytest +import tractor + + +from conftest import tractor_test + + +def sig_prog(proc, sig): + "Kill the actor-process with ``sig``." + proc.send_signal(sig) + ret = proc.wait() + assert ret + + +@pytest.fixture +def daemon(loglevel, testdir, arb_addr): + cmdargs = [ + sys.executable, '-c', + "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" + .format( + arb_addr, + "'{}'".format(loglevel) if loglevel else None) + ] + proc = testdir.popen( + cmdargs, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + assert not proc.returncode + time.sleep(0.2) + yield proc + # TODO: why sometimes does SIGINT not work on teardown? + sig_prog(proc, signal.SIGINT) + + +def test_abort_on_sigint(daemon): + assert daemon.returncode is None + time.sleep(0.1) + sig_prog(daemon, signal.SIGINT) + assert daemon.returncode == 1 + # XXX: oddly, couldn't get capfd.readouterr() to work here? + assert "KeyboardInterrupt" in str(daemon.stderr.read()) + + +@tractor_test +async def test_cancel_remote_arbiter(daemon, arb_addr): + async with tractor.get_arbiter(*arb_addr) as portal: + await portal.cancel_actor() + + time.sleep(0.1) + # the arbiter channel server is cancelled but not its main task + assert daemon.returncode is None + + # no arbiter socket should exist + with pytest.raises(OSError): + async with tractor.get_arbiter(*arb_addr) as portal: + pass + + +@tractor_test +async def test_register_duplicate_name(daemon): + async with tractor.open_nursery() as n: + p1 = await n.start_actor('doggy') + p2 = await n.start_actor('doggy') + + async with tractor.wait_for_actor('doggy') as portal: + assert portal.channel.uid in (p2.channel.uid, p1.channel.uid) + + await n.cancel() diff --git a/tractor/__init__.py b/tractor/__init__.py index d48e257..f6100f7 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -2,7 +2,9 @@ tractor: An actor model micro-framework built on ``trio`` and ``multiprocessing``. """ +import importlib from functools import partial +from typing import Tuple, Any, Optional import typing import trio # type: ignore @@ -35,10 +37,10 @@ _default_arbiter_port = 1616 async def _main( async_fn: typing.Callable[..., typing.Awaitable], - args: typing.Tuple, + args: Tuple, kwargs: typing.Dict[str, typing.Any], name: str, - arbiter_addr: typing.Tuple[str, int] + arbiter_addr: Tuple[str, int] ) -> typing.Any: """Async entry point for ``tractor``. """ @@ -54,7 +56,7 @@ async def _main( async with _connect_chan(host, port): arbiter_found = True except OSError: - log.warn(f"No actor could be found @ {host}:{port}") + log.warning(f"No actor could be found @ {host}:{port}") # create a local actor and start up its main routine/task if arbiter_found: # we were able to connect to an arbiter @@ -81,13 +83,28 @@ async def _main( def run( async_fn: typing.Callable[..., typing.Awaitable], - *args: typing.Tuple, + *args: Tuple, name: str = None, - arbiter_addr: typing.Tuple[str, int] = (_default_arbiter_host, _default_arbiter_port), + arbiter_addr: Tuple[str, int] = ( + _default_arbiter_host, _default_arbiter_port), **kwargs: typing.Dict[str, typing.Any], -): +) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) + + +def run_daemon( + rpc_modules: Tuple[str], + **kwargs +) -> None: + """Spawn a single daemon-actor which will repond to RPC. + """ + kwargs['rpc_module_paths'] = rpc_modules + + for path in rpc_modules: + importlib.import_module(path) + + return run(partial(trio.sleep, float('inf')), **kwargs) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2e00e99..1962229 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -233,7 +233,7 @@ class Actor: try: uid = await _do_handshake(self, chan) except StopAsyncIteration: - log.warn(f"Channel {chan} failed to handshake") + log.warning(f"Channel {chan} failed to handshake") return # channel tracking @@ -248,7 +248,7 @@ class Actor: chans = self._peers[uid] if chans: - log.warn( + log.warning( f"already have channel(s) for {uid}:{chans}?" ) log.debug(f"Registered {chan} for {uid}") @@ -400,7 +400,7 @@ class Actor: parent_addr: Tuple[str, int] = None ) -> None: # after fork routine which invokes a fresh ``trio.run`` - # log.warn("Log level after fork is {self.loglevel}") + # log.warning("Log level after fork is {self.loglevel}") self._forkserver_info = forkserver_info from ._trionics import ctx if self.loglevel is not None: @@ -456,7 +456,7 @@ class Actor: # initial handshake, report who we are, who they are await _do_handshake(self, chan) except OSError: # failed to connect - log.warn( + log.warning( f"Failed to connect to parent @ {parent_addr}," " closing server") await self.cancel() @@ -555,7 +555,7 @@ class Actor: await arb_portal.run( 'self', 'unregister_actor', uid=self.uid) except OSError: - log.warn(f"Unable to unregister {self.name} from arbiter") + log.warning(f"Unable to unregister {self.name} from arbiter") async def cancel(self) -> None: """Cancel this actor. @@ -668,7 +668,8 @@ class Arbiter(Actor): events = self._waiters.pop(name, ()) self._waiters.setdefault(name, []).append(uid) for event in events: - event.set() + if isinstance(event, trio.Event): + event.set() def unregister_actor(self, uid: Tuple[str, str]) -> None: self._registry.pop(uid, None) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6162150..6e2349e 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -158,11 +158,11 @@ class Channel: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warn( + log.warning( "Reconnect timed out after 3 seconds, retrying...") continue else: - log.warn("Stream connection re-established!") + log.warning("Stream connection re-established!") # run any reconnection sequence on_recon = self._recon_seq if on_recon: @@ -171,7 +171,7 @@ class Channel: except (OSError, ConnectionRefusedError): if not down: down = True - log.warn( + log.warning( f"Connection to {self.raddr} went down, waiting" " for re-establishment") await trio.sleep(1) diff --git a/tractor/_portal.py b/tractor/_portal.py index ddbea76..f8a01ac 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -186,7 +186,7 @@ class Portal: async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ - log.warn( + log.warning( f"Sending cancel request to {self.channel.uid} on " f"{self.channel}") try: @@ -196,11 +196,11 @@ class Portal: await self.run('self', 'cancel') return True except trio.ClosedResourceError: - log.warn( + log.warning( f"{self.channel} for {self.channel.uid} was already closed?") return False else: - log.warn(f"May have failed to cancel {self.channel.uid}") + log.warning(f"May have failed to cancel {self.channel.uid}") return False diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 7c3e72c..dc607c7 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -153,7 +153,7 @@ class ActorNursery: # if it's an async-gen then we should alert the user # that we're cancelling it if inspect.isasyncgen(res): - log.warn( + log.warning( f"Blindly consuming asyncgen for {actor.uid}") with trio.fail_after(1): async with aclosing(res) as agen: @@ -177,7 +177,7 @@ class ActorNursery: self._children.pop(actor.uid) # proc terminated, cancel result waiter if cancel_scope: - log.warn( + log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -194,7 +194,7 @@ class ActorNursery: await portal.cancel_actor() if cs.cancelled_caught: - log.warn("Result waiter was cancelled") + log.warning("Result waiter was cancelled") # unblocks when all waiter tasks have completed children = self._children.copy() @@ -213,7 +213,7 @@ class ActorNursery: directly without any far end graceful ``trio`` cancellation. """ def do_hard_kill(proc): - log.warn(f"Hard killing subactors {self._children}") + log.warning(f"Hard killing subactors {self._children}") proc.terminate() # XXX: below doesn't seem to work? # send KeyBoardInterrupt (trio abort signal) to sub-actors @@ -228,7 +228,7 @@ class ActorNursery: else: if portal is None: # actor hasn't fully spawned yet event = self._actor._peer_connected[subactor.uid] - log.warn( + log.warning( f"{subactor.uid} wasn't finished spawning?") await event.wait() # channel/portal should now be up @@ -260,7 +260,7 @@ class ActorNursery: # else block here might not complete? Should both be shielded? with trio.open_cancel_scope(shield=True): if etype is trio.Cancelled: - log.warn( + log.warning( f"{current_actor().uid} was cancelled with {etype}" ", cancelling actor nursery") await self.cancel() @@ -276,7 +276,7 @@ class ActorNursery: try: await self.wait() except Exception as err: - log.warn(f"Nursery caught {err}, cancelling") + log.warning(f"Nursery caught {err}, cancelling") await self.cancel() raise log.debug(f"Nursery teardown complete")