diff --git a/.travis.yml b/.travis.yml index 8837171..6df2b7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ language: python matrix: include: - - python: 3.6 + # - python: 3.6 - python: 3.7 dist: xenial sudo: required diff --git a/README.rst b/README.rst index 8af6799..ad0f1e6 100644 --- a/README.rst +++ b/README.rst @@ -42,6 +42,9 @@ down. A great place to start is the `trio docs`_ and this `blog post`_. .. _modern async Python: https://www.python.org/dev/peps/pep-0525/ +.. contents:: + + Philosophy ---------- ``tractor``'s tenets non-comprehensively include: @@ -71,8 +74,12 @@ No PyPi release yet! pip install git+git://github.com/tgoodlet/tractor.git +Examples +-------- + + A trynamic first scene ----------------------- +********************** Let's direct a couple *actors* and have them run their lines for the hip new film we're shooting: @@ -104,6 +111,7 @@ the hip new film we're shooting: donny = await n.run_in_actor( 'donny', say_hello, + # arguments are always named other_actor='gretchen', ) gretchen = await n.run_in_actor( @@ -129,7 +137,7 @@ this case our "director" executing ``main()``). Actor spawning and causality ----------------------------- +**************************** ``tractor`` tries to take ``trio``'s concept of causal task lifetimes to multi-process land. Accordingly, ``tractor``'s *actor nursery* behaves similar to ``trio``'s nursery_. That is, ``tractor.open_nursery()`` @@ -155,7 +163,7 @@ and use the ``run_in_actor()`` method: """ async with tractor.open_nursery() as n: - portal = await n.run_in_actor('frank', movie_theatre_question) + portal = await n.run_in_actor('teacher', cellar_door) # The ``async with`` will unblock here since the 'frank' # actor has completed its main task ``movie_theatre_question()``. @@ -186,7 +194,7 @@ What's going on? much like you'd expect from a future_. This ``run_in_actor()`` API should look very familiar to users of -``asyncio``'s run_in_executor_ which uses a ``concurrent.futures`` Executor_. +``asyncio``'s `run_in_executor()`_ which uses a ``concurrent.futures`` Executor_. Since you might also want to spawn long running *worker* or *daemon* actors, each actor's *lifetime* can be determined based on the spawn @@ -199,7 +207,7 @@ method: - actors can be spawned to *live forever* using the ``start_actor()`` method and act like an RPC daemon that runs indefinitely (the - ``with tractor.open_nursery()`` wont' exit) until cancelled_ + ``with tractor.open_nursery()`` won't exit) until cancelled_ Had we wanted the latter form in our example it would have looked like: @@ -251,12 +259,12 @@ to all others with ease over standard network protocols). .. _nursery: https://trio.readthedocs.io/en/latest/reference-core.html#nurseries-and-spawning .. _causal: https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#causality .. _cancelled: https://trio.readthedocs.io/en/latest/reference-core.html#child-tasks-and-cancellation -.. _run_in_executor: https://docs.python.org/3/library/asyncio-eventloop.html#executor +.. _run_in_executor(): https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor -Transparent remote function calling using *portals* ---------------------------------------------------- +Async IPC using *portals* +************************* ``tractor`` introduces the concept of a *portal* which is an API borrowed_ from ``trio``. A portal may seem similar to the idea of a RPC future_ except a *portal* allows invoking remote *async* functions and @@ -322,6 +330,9 @@ generator function running in a separate actor: tractor.run(main) + +A full fledged streaming service +******************************** Alright, let's get fancy. Say you wanted to spawn two actors which each pull data feeds from @@ -443,7 +454,7 @@ as ``multiprocessing`` calls it) which is running ``main()``. Cancellation ------------- +************ ``tractor`` supports ``trio``'s cancellation_ system verbatim. Cancelling a nursery block cancels all actors spawned by it. Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. @@ -452,7 +463,7 @@ Eventually ``tractor`` plans to support different `supervision strategies`_ like Remote error propagation ------------------------- +************************ Any task invoked in a remote actor should ship any error(s) back to the calling actor where it is raised and expected to be dealt with. This way remote actors are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. @@ -494,12 +505,12 @@ a ``Supervisor`` type. .. _erlang strategies: http://learnyousomeerlang.com/supervisors -Shared task state ------------------ +Actor local variables +********************* Although ``tractor`` uses a *shared-nothing* architecture between processes -you can of course share state within an actor. ``trio`` tasks spawned via -multiple RPC calls to an actor can access global data using the per actor -``statespace`` dictionary: +you can of course share state between tasks running *within* an actor. +``trio`` tasks spawned via multiple RPC calls to an actor can access global +state using the per actor ``statespace`` dictionary: .. code:: python @@ -528,7 +539,7 @@ out a state sharing system per-actor is totally up to you. How do actors find each other (a poor man's *service discovery*)? ------------------------------------------------------------------ +***************************************************************** Though it will be built out much more in the near future, ``tractor`` currently keeps track of actors by ``(name: str, id: str)`` using a special actor called the *arbiter*. Currently the *arbiter* must exist @@ -561,20 +572,67 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. -Using ``Channel`` directly (undocumented) ------------------------------------------ -You can use the ``Channel`` api if necessary by simply defining a -``chan`` and ``cid`` *kwarg* in your async function definition. -``tractor`` will treat such async functions like async generators on -the calling side (for now anyway) such that you can push stream values -a little more granularly if you find *yielding* values to be restrictive. -I am purposely not documenting this feature with code because I'm not yet -sure yet how it should be used correctly. If you'd like more details -please feel free to ask me on the `trio gitter channel`_. +Streaming using channels and contexts +************************************* +``Channel`` is the API which wraps an underlying *transport* and *interchange* +format to enable *inter-actor-communication*. In its present state ``tractor`` +uses TCP and msgpack_. + +If you aren't fond of having to write an async generator to stream data +between actors (or need something more flexible) you can instead use a +``Context``. A context wraps an actor-local spawned task and a ``Channel`` +so that tasks executing across multiple processes can stream data +to one another using a low level, request oriented API. + +As an example if you wanted to create a streaming server without writing +an async generator that *yields* values you instead define an async +function: + +.. code:: python + + async def streamer(ctx, rate=2): + """A simple web response streaming server. + """ + while True: + val = await web_request('http://data.feed.com') + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) -Running actors standalone (without spawning) --------------------------------------------- +All that's required is declaring a ``ctx`` argument name somewhere in +your function signature and ``tractor`` will treat the async function +like an async generator - as a streaming function from the client side. +This turns out to be handy particularly if you have +multiple tasks streaming responses concurrently: + +.. code:: python + + async def streamer(ctx, url, rate=2): + """A simple web response streaming server. + """ + while True: + val = await web_request(url) + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) + + + async def stream_multiple_sources(ctx, sources): + async with trio.open_nursery() as n: + for url in sources: + n.start_soon(streamer, ctx, url) + + +The context notion comes from the context_ in nanomsg_. + + +Running actors standalone +************************* You don't have to spawn any actors using ``open_nursery()`` if you just want to run a single actor that connects to an existing cluster. All the comms and arbiter registration stuff still works. This can @@ -588,7 +646,7 @@ need to hop into a debugger. You just need to pass the existing Enabling logging ----------------- +**************** Considering how complicated distributed software can become it helps to know what exactly it's doing (even at the lowest levels). Luckily ``tractor`` has tons of logging throughout the core. ``tractor`` isn't opinionated on @@ -616,12 +674,20 @@ Stuff I'd like to see ``tractor`` do real soon: - an extensive `chaos engineering`_ test suite - support for reactive programming primitives and native support for asyncitertools_ like libs -If you're interested in tackling any of these please do shout about it on the -`trio gitter channel`_! + +Feel like saying hi? +-------------------- +This project is very much coupled to the ongoing development of +``trio`` (i.e. ``tractor`` gets all its ideas from that brilliant +community). If you want to help, have suggestions or just want to +say hi, please feel free to ping me on the `trio gitter channel`_! + .. _supervisors: https://github.com/tgoodlet/tractor/issues/22 -.. _nanomsg: https://github.com/tgoodlet/tractor/issues/19 +.. _nanomsg: https://nanomsg.github.io/nng/index.html +.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 .. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol .. _trio gitter channel: https://gitter.im/python-trio/general .. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html .. _pdb++: https://github.com/antocuni/pdb +.. _msgpack: https://en.wikipedia.org/wiki/MessagePack diff --git a/setup.py b/setup.py index 9103ab7..885d992 100755 --- a/setup.py +++ b/setup.py @@ -37,7 +37,8 @@ setup( 'tractor', 'tractor.testing', ], - install_requires=['msgpack', 'trio>0.8', 'async_generator', 'colorlog'], + install_requires=[ + 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'], tests_require=['pytest'], python_requires=">=3.6", keywords=[ diff --git a/tests/test_local.py b/tests/test_local.py index 2f9e5ee..eb0c676 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -7,7 +7,6 @@ import pytest import trio import tractor - from conftest import tractor_test diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py new file mode 100644 index 0000000..f0c4beb --- /dev/null +++ b/tests/test_pubsub.py @@ -0,0 +1,187 @@ +import time +from itertools import cycle + +import pytest +import trio +import tractor +from async_generator import aclosing +from tractor.testing import tractor_test + + +def test_type_checks(): + + with pytest.raises(TypeError) as err: + @tractor.msg.pub + async def no_get_topics(yo): + yield + + assert "must define a `get_topics`" in str(err.value) + + with pytest.raises(TypeError) as err: + @tractor.msg.pub + def not_async_gen(yo): + pass + + assert "must be an async generator function" in str(err.value) + + +def is_even(i): + return i % 2 == 0 + + +@tractor.msg.pub +async def pubber(get_topics): + ss = tractor.current_actor().statespace + + for i in cycle(range(10)): + + # ensure topic subscriptions are as expected + ss['get_topics'] = get_topics + + yield {'even' if is_even(i) else 'odd': i} + await trio.sleep(0.1) + + +async def subs(which, pub_actor_name): + if len(which) == 1: + if which[0] == 'even': + pred = is_even + else: + def pred(i): + return not is_even(i) + else: + def pred(i): + return isinstance(i, int) + + async with tractor.find_actor(pub_actor_name) as portal: + agen = await portal.run(__name__, 'pubber', topics=which) + async with aclosing(agen) as agen: + async for pkt in agen: + for topic, value in pkt.items(): + assert pred(value) + + +@tractor.msg.pub(tasks=['one', 'two']) +async def multilock_pubber(get_topics): + yield {'doggy': 10} + + +@pytest.mark.parametrize( + 'callwith_expecterror', + [ + (pubber, {}, TypeError), + # missing a `topics` + (multilock_pubber, {'ctx': None}, TypeError), + # missing a `task_name` + (multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError), + # should work + (multilock_pubber, + {'ctx': None, 'topics': ['topic1'], 'task_name': 'one'}, + None), + ], +) +@tractor_test +async def test_required_args(callwith_expecterror): + func, kwargs, err = callwith_expecterror + + if err is not None: + with pytest.raises(err): + await func(**kwargs) + else: + async with tractor.open_nursery() as n: + # await func(**kwargs) + portal = await n.run_in_actor( + 'sub', multilock_pubber, **kwargs) + + async for val in await portal.result(): + assert val == {'doggy': 10} + + +@pytest.mark.parametrize( + 'pub_actor', + ['streamer', 'arbiter'] +) +def test_pubsub_multi_actor_subs( + loglevel, + arb_addr, + pub_actor, +): + """Try out the neato @pub decorator system. + """ + async def main(): + ss = tractor.current_actor().statespace + + async with tractor.open_nursery() as n: + + name = 'arbiter' + + if pub_actor is 'streamer': + # start the publisher as a daemon + master_portal = await n.start_actor( + 'streamer', + rpc_module_paths=[__name__], + ) + + even_portal = await n.run_in_actor( + 'evens', subs, which=['even'], pub_actor_name=name) + odd_portal = await n.run_in_actor( + 'odds', subs, which=['odd'], pub_actor_name=name) + + async with tractor.wait_for_actor('evens'): + # block until 2nd actor is initialized + pass + + if pub_actor is 'arbiter': + # wait for publisher task to be spawned in a local RPC task + while not ss.get('get_topics'): + await trio.sleep(0.1) + + get_topics = ss.get('get_topics') + + assert 'even' in get_topics() + + async with tractor.wait_for_actor('odds'): + # block until 2nd actor is initialized + pass + + if pub_actor is 'arbiter': + start = time.time() + while 'odd' not in get_topics(): + await trio.sleep(0.1) + if time.time() - start > 1: + pytest.fail("odds subscription never arrived?") + + # TODO: how to make this work when the arbiter gets + # a portal to itself? Currently this causes a hang + # when the channel server is torn down due to a lingering + # loopback channel + # with trio.move_on_after(1): + # await subs(['even', 'odd']) + + # XXX: this would cause infinite + # blocking due to actor never terminating loop + # await even_portal.result() + + await trio.sleep(0.5) + await even_portal.cancel_actor() + await trio.sleep(0.5) + + if pub_actor is 'arbiter': + assert 'even' not in get_topics() + + await odd_portal.cancel_actor() + await trio.sleep(1) + + if pub_actor is 'arbiter': + while get_topics(): + await trio.sleep(0.1) + if time.time() - start > 1: + pytest.fail("odds subscription never dropped?") + else: + await master_portal.cancel_actor() + + tractor.run( + main, + arbiter_addr=arb_addr, + rpc_module_paths=[__name__], + ) diff --git a/tests/test_rpc.py b/tests/test_rpc.py index ec286f5..5dec131 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -1,6 +1,8 @@ """ RPC related """ +import itertools + import pytest import tractor import trio @@ -12,17 +14,20 @@ async def sleep_back_actor( func_defined, exposed_mods, ): - async with tractor.find_actor(actor_name) as portal: - try: - await portal.run(__name__, func_name) - except tractor.RemoteActorError as err: - if not func_defined: - expect = AttributeError - if not exposed_mods: - expect = tractor.ModuleNotExposed + if actor_name: + async with tractor.find_actor(actor_name) as portal: + try: + await portal.run(__name__, func_name) + except tractor.RemoteActorError as err: + if not func_defined: + expect = AttributeError + if not exposed_mods: + expect = tractor.ModuleNotExposed - assert err.type is expect - raise + assert err.type is expect + raise + else: + await trio.sleep(float('inf')) async def short_sleep(): @@ -31,19 +36,40 @@ async def short_sleep(): @pytest.mark.parametrize( 'to_call', [ - ([], 'short_sleep'), - ([__name__], 'short_sleep'), - ([__name__], 'fake_func'), + ([], 'short_sleep', tractor.RemoteActorError), + ([__name__], 'short_sleep', tractor.RemoteActorError), + ([__name__], 'fake_func', tractor.RemoteActorError), + (['tmp_mod'], 'import doggy', ModuleNotFoundError), + (['tmp_mod'], '4doggy', SyntaxError), ], - ids=['no_mods', 'this_mod', 'this_mod_bad_func'], + ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import', + 'fail_on_syntax'], ) -def test_rpc_errors(arb_addr, to_call): +def test_rpc_errors(arb_addr, to_call, testdir): """Test errors when making various RPC requests to an actor that either doesn't have the requested module exposed or doesn't define the named function. """ - exposed_mods, funcname = to_call + exposed_mods, funcname, inside_err = to_call + subactor_exposed_mods = [] func_defined = globals().get(funcname, False) + subactor_requests_to = 'arbiter' + remote_err = tractor.RemoteActorError + + # remote module that fails at import time + if exposed_mods == ['tmp_mod']: + # create an importable module with a bad import + testdir.syspathinsert() + # module should cause raise a ModuleNotFoundError at import + testdir.makefile('.py', tmp_mod=funcname) + + # no need to exposed module to the subactor + subactor_exposed_mods = exposed_mods + exposed_mods = [] + func_defined = False + # subactor should not try to invoke anything + subactor_requests_to = None + remote_err = trio.MultiError async def main(): actor = tractor.current_actor() @@ -54,12 +80,13 @@ def test_rpc_errors(arb_addr, to_call): await n.run_in_actor( 'subactor', sleep_back_actor, - actor_name=actor.name, - # function from this module the subactor will invoke - # when it RPCs back to this actor + actor_name=subactor_requests_to, + # function from the local exposed module space + # the subactor will invoke when it RPCs back to this actor func_name=funcname, exposed_mods=exposed_mods, func_defined=True if func_defined else False, + rpc_module_paths=subactor_exposed_mods, ) def run(): @@ -73,8 +100,18 @@ def test_rpc_errors(arb_addr, to_call): if exposed_mods and func_defined: run() else: - # underlying errors are propogated upwards (yet) - with pytest.raises(tractor.RemoteActorError) as err: + # underlying errors are propagated upwards (yet) + with pytest.raises(remote_err) as err: run() - assert err.value.type is tractor.RemoteActorError + # get raw instance from pytest wrapper + value = err.value + + # might get multiple `trio.Cancelled`s as well inside an inception + if isinstance(value, trio.MultiError): + value = next(itertools.dropwhile( + lambda exc: not isinstance(exc, tractor.RemoteActorError), + value.exceptions + )) + + assert value.type is inside_err diff --git a/tractor/__init__.py b/tractor/__init__.py index d8d6489..4214c2d 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -11,13 +11,14 @@ import trio # type: ignore from trio import MultiError from .log import get_console_log, get_logger, get_loglevel -from ._ipc import _connect_chan, Channel +from ._ipc import _connect_chan, Channel, Context from ._actor import ( Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor ) from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed +from . import msg __all__ = [ @@ -30,6 +31,7 @@ __all__ = [ 'MultiError', 'RemoteActorError', 'ModuleNotExposed', + 'msg' ] diff --git a/tractor/_actor.py b/tractor/_actor.py index d9a5ee1..ceee16b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -13,9 +13,13 @@ from typing import Dict, List, Tuple, Any, Optional, Union import trio # type: ignore from async_generator import asynccontextmanager, aclosing -from ._ipc import Channel, _connect_chan +from ._ipc import Channel, _connect_chan, Context from .log import get_console_log, get_logger -from ._exceptions import pack_error, InternalActorError, ModuleNotExposed +from ._exceptions import ( + pack_error, + unpack_error, + ModuleNotExposed +) from ._portal import ( Portal, open_portal, @@ -46,15 +50,13 @@ async def _invoke( sig = inspect.signature(func) treat_as_gen = False cs = None - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid + ctx = Context(chan, cid) + if 'ctx' in sig.parameters: + kwargs['ctx'] = ctx # TODO: eventually we want to be more stringent # about what is considered a far-end async-generator. # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its + # function which declares a `ctx` kwarg in its # signature will be treated as one. treat_as_gen = True try: @@ -138,18 +140,14 @@ async def _invoke( task_status.started(err) finally: # RPC task bookeeping - tasks = actor._rpc_tasks.get(chan, None) - if tasks: - try: - scope, func = tasks.pop(cid) - except ValueError: - # If we're cancelled before the task returns then the - # cancel scope will not have been inserted yet - log.warn( - f"Task {func} was likely cancelled before it was started") - - if not tasks: - actor._rpc_tasks.pop(chan, None) + try: + scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) + is_complete.set() + except KeyError: + # If we're cancelled before the task returns then the + # cancel scope will not have been inserted yet + log.warn( + f"Task {func} was likely cancelled before it was started") if not actor._rpc_tasks: log.info(f"All RPC tasks have completed") @@ -195,9 +193,10 @@ class Actor: self._no_more_rpc_tasks = trio.Event() self._no_more_rpc_tasks.set() + # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: Dict[ - Channel, - Dict[str, Tuple[trio._core._run.CancelScope, typing.Callable]] + Tuple[Channel, str], + Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} @@ -225,17 +224,9 @@ class Actor: code (if it exists). """ for path in self.rpc_module_paths: + log.debug(f"Attempting to import {path}") self._mods[path] = importlib.import_module(path) - # XXX: triggers an internal error which can cause a hanging - # problem (without the recently added .throw()) on teardown - # (root nursery tears down thus killing all channels before - # sending cancels to subactors during actor nursery teardown - # - has to do with await main() in MainProcess) - # if self.name == 'gretchen': - # self._mods.pop('test_discovery') - # TODO: how to test the above? - def _get_rpc_func(self, ns, funcname): try: return getattr(self._mods[ns], funcname) @@ -274,7 +265,7 @@ class Actor: log.warning( f"already have channel(s) for {uid}:{chans}?" ) - log.debug(f"Registered {chan} for {uid}") + log.trace(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -301,10 +292,12 @@ class Actor: if chan.connected(): log.debug(f"Disconnecting channel {chan}") try: + # send our msg loop terminate sentinel await chan.send(None) - await chan.aclose() + # await chan.aclose() except trio.BrokenResourceError: - log.exception(f"Channel for {chan.uid} was already zonked..") + log.exception( + f"Channel for {chan.uid} was already zonked..") async def _push_result(self, actorid, cid: str, msg: dict) -> None: """Push an RPC result to the local consumer's queue. @@ -339,7 +332,10 @@ class Actor: return cid, q async def _process_messages( - self, chan: Channel, treat_as_gen: bool = False + self, chan: Channel, + treat_as_gen: bool = False, + shield: bool = False, + task_status=trio.TASK_STATUS_IGNORED, ) -> None: """Process messages for the channel async-RPC style. @@ -347,95 +343,90 @@ class Actor: """ # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! + msg = None log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - async for msg in chan: - if msg is None: # terminate sentinel - log.debug( - f"Cancelling all tasks for {chan} from {chan.uid}") - for cid, (scope, func) in self._rpc_tasks.pop( - chan, {} - ).items(): - scope.cancel() - log.debug( - f"Msg loop signalled to terminate for" - f" {chan} from {chan.uid}") - break - log.debug(f"Received msg {msg} from {chan.uid}") - cid = msg.get('cid') - if cid: - cancel = msg.get('cancel') - if cancel: - # right now this is only implicitly used by - # async generator IPC - scope, func = self._rpc_tasks[chan][cid] + # internal scope allows for keeping this message + # loop running despite the current task having been + # cancelled (eg. `open_portal()` may call this method from + # a locally spawned task) + with trio.open_cancel_scope(shield=shield) as cs: + task_status.started(cs) + async for msg in chan: + if msg is None: # loop terminate sentinel log.debug( - f"Received cancel request for task {cid}" - f" from {chan.uid}") - scope.cancel() - else: + f"Cancelling all tasks for {chan} from {chan.uid}") + for (channel, cid) in self._rpc_tasks: + if channel is chan: + self.cancel_task(cid, Context(channel, cid)) + log.debug( + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") + break + + log.debug(f"Received msg {msg} from {chan.uid}") + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter await self._push_result(chan.uid, cid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") - continue - - # process command request - try: - ns, funcname, kwargs, actorid, cid = msg['cmd'] - except KeyError: - # This is the non-rpc error case, that is, an - # error **not** raised inside a call to ``_invoke()`` - # (i.e. no cid was provided in the msg - see above). - # Push this error to all local channel consumers - # (normally portals) by marking the channel as errored - tb_str = msg.get('tb_str') - assert chan.uid - exc = InternalActorError( - f"{chan.uid}\n" + tb_str, - **msg, - ) - chan._exc = exc - raise exc - - log.debug( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - if ns == 'self': - func = getattr(self, funcname) - else: - # complain to client about restricted modules - try: - func = self._get_rpc_func(ns, funcname) - except (ModuleNotExposed, AttributeError) as err: - err_msg = pack_error(err) - err_msg['cid'] = cid - await chan.send(err_msg) continue - # spin up a task for the requested function - log.debug(f"Spawning task for {func}") - cs = await self._root_nursery.start( - _invoke, self, cid, chan, func, kwargs, - name=funcname - ) - # never allow cancelling cancel requests (results in - # deadlock and other weird behaviour) - if func != self.cancel: - if isinstance(cs, Exception): - log.warn(f"Task for RPC func {func} failed with {cs}") + # process command request + try: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + except KeyError: + # This is the non-rpc error case, that is, an + # error **not** raised inside a call to ``_invoke()`` + # (i.e. no cid was provided in the msg - see above). + # Push this error to all local channel consumers + # (normally portals) by marking the channel as errored + assert chan.uid + exc = unpack_error(msg, chan=chan) + chan._exc = exc + raise exc + + log.debug( + f"Processing request from {actorid}\n" + f"{ns}.{funcname}({kwargs})") + if ns == 'self': + func = getattr(self, funcname) else: - # mark that we have ongoing rpc tasks - self._no_more_rpc_tasks.clear() - log.info(f"RPC func is {func}") - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - self._rpc_tasks.setdefault(chan, {})[cid] = (cs, func) - log.debug( - f"Waiting on next msg for {chan} from {chan.uid}") - else: - # channel disconnect - log.debug(f"{chan} from {chan.uid} disconnected") + # complain to client about restricted modules + try: + func = self._get_rpc_func(ns, funcname) + except (ModuleNotExposed, AttributeError) as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + await chan.send(err_msg) + continue + + # spin up a task for the requested function + log.debug(f"Spawning task for {func}") + cs = await self._root_nursery.start( + _invoke, self, cid, chan, func, kwargs, + name=funcname + ) + # never allow cancelling cancel requests (results in + # deadlock and other weird behaviour) + if func != self.cancel: + if isinstance(cs, Exception): + log.warn(f"Task for RPC func {func} failed with" + f"{cs}") + else: + # mark that we have ongoing rpc tasks + self._no_more_rpc_tasks.clear() + log.info(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks[(chan, cid)] = ( + cs, func, trio.Event()) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") + else: + # channel disconnect + log.debug(f"{chan} from {chan.uid} disconnected") except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") @@ -448,8 +439,14 @@ class Actor: raise # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" + except trio.Cancelled: + # debugging only + log.debug("Msg loop was cancelled") + raise finally: - log.debug(f"Exiting msg loop for {chan} from {chan.uid}") + log.debug( + f"Exiting msg loop for {chan} from {chan.uid} " + f"with last msg:\n{msg}") def _fork_main( self, @@ -462,6 +459,8 @@ class Actor: self._forkserver_info = forkserver_info from ._trionics import ctx if self.loglevel is not None: + log.info( + f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) log.info( f"Started new {ctx.current_process()} for {self.uid}") @@ -493,9 +492,6 @@ class Actor: async with trio.open_nursery() as nursery: self._root_nursery = nursery - # load allowed RPC module - self.load_modules() - # Startup up channel server host, port = accept_addr await nursery.start(partial( @@ -524,6 +520,11 @@ class Actor: nursery.start_soon( self._process_messages, self._parent_chan) + # load exposed/allowed RPC modules + # XXX: do this **after** establishing connection to parent + # so that import errors are properly propagated upwards + self.load_modules() + # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") async with get_arbiter(*arbiter_addr) as arb_portal: @@ -546,8 +547,7 @@ class Actor: if self._parent_chan: try: # internal error so ship to parent without cid - await self._parent_chan.send( - pack_error(err)) + await self._parent_chan.send(pack_error(err)) except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " @@ -632,21 +632,49 @@ class Actor: self.cancel_server() self._root_nursery.cancel_scope.cancel() + async def cancel_task(self, cid, ctx): + """Cancel a local task. + + Note this method will be treated as a streaming funciton + by remote actor-callers due to the declaration of ``ctx`` + in the signature (for now). + """ + # right now this is only implicitly called by + # streaming IPC but it should be called + # to cancel any remotely spawned task + chan = ctx.chan + # the ``dict.get()`` ensures the requested task to be cancelled + # was indeed spawned by a request from this channel + scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + log.debug( + f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" + f"peer: {chan.uid}\n") + + # don't allow cancelling this function mid-execution + # (is this necessary?) + if func is self.cancel_task: + return + + scope.cancel() + # wait for _invoke to mark the task complete + await is_complete.wait() + log.debug( + f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" + f"peer: {chan.uid}\n") + async def cancel_rpc_tasks(self) -> None: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ tasks = self._rpc_tasks - log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks}") - for chan, cids2scopes in tasks.items(): - log.debug(f"Cancelling all tasks for {chan.uid}") - for cid, (scope, func) in cids2scopes.items(): - log.debug(f"Cancelling task for {func}") - scope.cancel() - if tasks: - log.info( - f"Waiting for remaining rpc tasks to complete {tasks}") - await self._no_more_rpc_tasks.wait() + log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") + for (chan, cid) in tasks.copy(): + # TODO: this should really done in a nursery batch + await self.cancel_task(cid, Context(chan, cid)) + # if tasks: + log.info( + f"Waiting for remaining rpc tasks to complete {tasks}") + await self._no_more_rpc_tasks.wait() def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby @@ -815,7 +843,9 @@ async def find_actor( sockaddr = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just # the last one that registered - if sockaddr: + if name == 'arbiter' and actor.is_arbiter: + raise RuntimeError("The current actor is the arbiter") + elif sockaddr: async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index efedcf0..0466a73 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -39,7 +39,7 @@ class NoResult(RuntimeError): "No final result is expected for this actor" -class ModuleNotExposed(RuntimeError): +class ModuleNotExposed(ModuleNotFoundError): "The requested module is not exposed for RPC" @@ -55,12 +55,12 @@ def pack_error(exc): } -def unpack_error(msg, chan=None): +def unpack_error(msg, chan=None, err_type=RemoteActorError): """Unpack an 'error' message from the wire into a local ``RemoteActorError``. """ tb_str = msg['error'].get('tb_str', '') - return RemoteActorError( + return err_type( f"{chan.uid}\n" + tb_str, **msg['error'], ) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index f7bebbe..0477740 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,6 +1,7 @@ """ Inter-process comms abstractions """ +from dataclasses import dataclass import typing from typing import Any, Tuple, Optional @@ -205,6 +206,28 @@ class Channel: return self.squeue.connected() if self.squeue else False +@dataclass(frozen=True) +class Context: + """An IAC (inter-actor communication) context. + + Allows maintaining task or protocol specific state between communicating + actors. A unique context is created on the receiving end for every request + to a remote actor. + """ + chan: Channel + cid: str + + # TODO: we should probably attach the actor-task + # cancel scope here now that trio is exposing it + # as a public object + + async def send_yield(self, data: Any) -> None: + await self.chan.send({'yield': data, 'cid': self.cid}) + + async def send_stop(self) -> None: + await self.chan.send({'stop': True, 'cid': self.cid}) + + @asynccontextmanager async def _connect_chan( host: str, port: int diff --git a/tractor/_portal.py b/tractor/_portal.py index fff39db..2683d88 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -5,9 +5,11 @@ import importlib import inspect import typing from typing import Tuple, Any, Dict, Optional, Set +from functools import partial +from dataclasses import dataclass import trio -from async_generator import asynccontextmanager +from async_generator import asynccontextmanager, aclosing from ._state import current_actor from ._ipc import Channel @@ -53,7 +55,7 @@ class Portal: underlying ``tractor.Channel`` as though the remote (async) function / generator was invoked locally. - Think of this like an native async IPC API. + Think of this like a native async IPC API. """ def __init__(self, channel: Channel) -> None: self.channel = channel @@ -124,7 +126,7 @@ class Portal: # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': + if resptype == 'yield': # stream response async def yield_from_q(): try: @@ -140,12 +142,26 @@ class Portal: "Received internal error at portal?") raise unpack_error(msg, self.channel) - except GeneratorExit: - # for now this msg cancels an ongoing remote task - await self.channel.send({'cancel': True, 'cid': cid}) - log.warn( + except (GeneratorExit, trio.Cancelled): + log.warning( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") + with trio.move_on_after(0.5) as cs: + cs.shield = True + # TODO: yeah.. it'd be nice if this was just an + # async func on the far end. Gotta figure out a + # better way then implicitly feeding the ctx + # to declaring functions; likely a decorator + # sytsem. + agen = await self.run('self', 'cancel_task', cid=cid) + async with aclosing(agen) as agen: + async for _ in agen: + pass + if cs.cancelled_caught: + if not self.channel.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self.channel.uid}") raise # TODO: use AsyncExitStack to aclose() all agens @@ -154,7 +170,7 @@ class Portal: self._agens.add(agen) return agen - elif resptype == 'return': + elif resptype == 'return': # single response msg = await q.get() try: return msg['return'] @@ -176,7 +192,7 @@ class Portal: # not expecting a "main" result if self._expect_result is None: - log.warn( + log.warning( f"Portal for {self.channel.uid} not expecting a final" " result?\nresult() should only be called if subactor" " was spawned with `ActorNursery.run_in_actor()`") @@ -198,44 +214,53 @@ class Portal: return self._result + async def _cancel_streams(self): + # terminate all locally running async generator + # IPC calls + if self._agens: + log.warning( + f"Cancelling all streams with {self.channel.uid}") + for agen in self._agens: + await agen.aclose() + async def close(self) -> None: - # trigger remote msg loop `break` - chan = self.channel - log.debug(f"Closing portal for {chan} to {chan.uid}") - await self.channel.send(None) + await self._cancel_streams() async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ + if not self.channel.connected(): + log.warning("This portal is already closed can't cancel") + return False + + await self._cancel_streams() + log.warning( - f"Sending cancel request to {self.channel.uid} on " + f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") try: - with trio.move_on_after(0.1) as cancel_scope: + # send cancel cmd - might not get response + with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True - # send cancel cmd - might not get response await self.run('self', 'cancel') return True + if cancel_scope.cancelled_caught: + log.warning(f"May have failed to cancel {self.channel.uid}") + return False except trio.ClosedResourceError: log.warning( f"{self.channel} for {self.channel.uid} was already closed?") return False - else: - log.warning(f"May have failed to cancel {self.channel.uid}") - return False +@dataclass class LocalPortal: """A 'portal' to a local ``Actor``. A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__( - self, - actor: 'Actor' # type: ignore - ) -> None: - self.actor = actor + actor: 'Actor' # type: ignore async def run(self, ns: str, func_name: str, **kwargs) -> Any: """Run a requested function locally and return it's result. @@ -270,20 +295,26 @@ async def open_portal( if channel.uid is None: await _do_handshake(actor, channel) - nursery.start_soon(actor._process_messages, channel) + msg_loop_cs = await nursery.start( + partial( + actor._process_messages, + channel, + # if the local task is cancelled we want to keep + # the msg loop running until our block ends + shield=True, + ) + ) portal = Portal(channel) try: yield portal finally: - # tear down all async generators - for agen in portal._agens: - await agen.aclose() + await portal.close() - # cancel remote channel-msg loop - if channel.connected(): - await portal.close() + if was_connected: + # cancel remote channel-msg loop + await channel.send(None) # cancel background msg loop task + msg_loop_cs.cancel() + nursery.cancel_scope.cancel() - if was_connected: - await channel.aclose() diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 289e42b..44f8dcd 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -114,7 +114,7 @@ class ActorNursery: name: str, fn: typing.Callable, bind_addr: Tuple[str, int] = ('127.0.0.1', 0), - rpc_module_paths: List[str] = None, + rpc_module_paths: List[str] = [], statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` @@ -129,7 +129,7 @@ class ActorNursery: mod_path = fn.__module__ portal = await self.start_actor( name, - rpc_module_paths=[mod_path], + rpc_module_paths=[mod_path] + rpc_module_paths, bind_addr=bind_addr, statespace=statespace, ) @@ -317,7 +317,7 @@ class ActorNursery: # the `else:` block here might not complete? # For now, shield both. with trio.open_cancel_scope(shield=True): - if etype is trio.Cancelled: + if etype in (trio.Cancelled, KeyboardInterrupt): log.warning( f"Nursery for {current_actor().uid} was " f"cancelled with {etype}") diff --git a/tractor/msg.py b/tractor/msg.py new file mode 100644 index 0000000..ebed198 --- /dev/null +++ b/tractor/msg.py @@ -0,0 +1,261 @@ +""" +Messaging pattern APIs and helpers. +""" +import inspect +import typing +from typing import Dict, Any, Set, Union, Callable +from functools import partial +from async_generator import aclosing + +import trio +import wrapt + +from .log import get_logger +from . import current_actor +from ._ipc import Context + +__all__ = ['pub'] + +log = get_logger('messaging') + + +async def fan_out_to_ctxs( + pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy + topics2ctxs: Dict[str, set], + packetizer: typing.Callable = None, +) -> None: + """Request and fan out quotes to each subscribed actor channel. + """ + def get_topics(): + return tuple(topics2ctxs.keys()) + + agen = pub_async_gen_func(get_topics=get_topics) + async with aclosing(agen) as pub_gen: + async for published in pub_gen: + ctx_payloads: Dict[str, Any] = {} + for topic, data in published.items(): + log.debug(f"publishing {topic, data}") + # build a new dict packet or invoke provided packetizer + if packetizer is None: + packet = {topic: data} + else: + packet = packetizer(topic, data) + for ctx in topics2ctxs.get(topic, set()): + ctx_payloads.setdefault(ctx, {}).update(packet), + + # deliver to each subscriber (fan out) + if ctx_payloads: + for ctx, payload in ctx_payloads.items(): + try: + await ctx.send_yield(payload) + except ( + # That's right, anything you can think of... + trio.ClosedStreamError, ConnectionResetError, + ConnectionRefusedError, + ): + log.warning(f"{ctx.chan} went down?") + for ctx_set in topics2ctxs.values(): + ctx_set.discard(ctx) + + if not get_topics(): + log.warning(f"No subscribers left for {pub_gen}") + break + + +def modify_subs(topics2ctxs, topics, ctx): + """Absolute symbol subscription list for each quote stream. + + Effectively a symbol subscription api. + """ + log.info(f"{ctx.chan.uid} changed subscription to {topics}") + + # update map from each symbol to requesting client's chan + for topic in topics: + topics2ctxs.setdefault(topic, set()).add(ctx) + + # remove any existing symbol subscriptions if symbol is not + # found in ``symbols`` + # TODO: this can likely be factored out into the pub-sub api + for topic in filter( + lambda topic: topic not in topics, topics2ctxs.copy() + ): + ctx_set = topics2ctxs.get(topic) + ctx_set.discard(ctx) + + if not ctx_set: + # pop empty sets which will trigger bg quoter task termination + topics2ctxs.pop(topic) + + +def pub( + wrapped: typing.Callable = None, + *, + tasks: Set[str] = set(), +): + """Publisher async generator decorator. + + A publisher can be called multiple times from different actors + but will only spawn a finite set of internal tasks to stream values + to each caller. The ``tasks` argument to the decorator (``Set[str]``) + specifies the names of the mutex set of publisher tasks. + When the publisher function is called, an argument ``task_name`` must be + passed to specify which task (of the set named in ``tasks``) should be + used. This allows for using the same publisher with different input + (arguments) without allowing more concurrent tasks then necessary. + + Values yielded from the decorated async generator + must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the + topic string an determines which subscription the packet will be delivered + to and the value is a packet ``Dict[str, Any]`` by default of the form: + + .. ::python + + {topic: value} + + The caller can instead opt to pass a ``packetizer`` callback who's return + value will be delivered as the published response. + + The decorated function must *accept* an argument :func:`get_topics` which + dynamically returns the tuple of current subscriber topics: + + .. code:: python + + from tractor.msg import pub + + @pub(tasks={'source_1', 'source_2'}) + async def pub_service(get_topics): + data = await web_request(endpoints=get_topics()) + for item in data: + yield data['key'], data + + + The publisher must be called passing in the following arguments: + - ``topics: Set[str]`` the topic sequence or "subscriptions" + - ``task_name: str`` the task to use (if ``tasks`` was passed) + - ``ctx: Context`` the tractor context (only needed if calling the + pub func without a nursery, otherwise this is provided implicitly) + - packetizer: ``Callable[[str, Any], Any]`` a callback who receives + the topic and value from the publisher function each ``yield`` such that + whatever is returned is sent as the published value to subscribers of + that topic. By default this is a dict ``{topic: value}``. + + As an example, to make a subscriber call the above function: + + .. code:: python + + from functools import partial + import tractor + + async with tractor.open_nursery() as n: + portal = n.run_in_actor( + 'publisher', # actor name + partial( # func to execute in it + pub_service, + topics=('clicks', 'users'), + task_name='source1', + ) + ) + async for value in portal.result(): + print(f"Subscriber received {value}") + + + Here, you don't need to provide the ``ctx`` argument since the remote actor + provides it automatically to the spawned task. If you were to call + ``pub_service()`` directly from a wrapping function you would need to + provide this explicitly. + + Remember you only need this if you need *a finite set of tasks* running in + a single actor to stream data to an arbitrary number of subscribers. If you + are ok to have a new task running for every call to ``pub_service()`` then + probably don't need this. + """ + # handle the decorator not called with () case + if wrapped is None: + return partial(pub, tasks=tasks) + + task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = { + None: trio.StrictFIFOLock()} + for name in tasks: + task2lock[name] = trio.StrictFIFOLock() + + async def takes_ctx(get_topics, ctx=None): + pass + + @wrapt.decorator(adapter=takes_ctx) + async def wrapper(agen, instance, args, kwargs): + # this is used to extract arguments properly as per + # the `wrapt` docs + async def _execute( + ctx: Context, + topics: Set[str], + *args, + # *, + task_name: str = None, + packetizer: Callable = None, + **kwargs, + ): + if tasks and task_name is None: + raise TypeError( + f"{agen} must be called with a `task_name` named " + f"argument with a falue from {tasks}") + + ss = current_actor().statespace + lockmap = ss.setdefault('_pubtask2lock', task2lock) + lock = lockmap[task_name] + + all_subs = ss.setdefault('_subs', {}) + topics2ctxs = all_subs.setdefault(task_name, {}) + + try: + modify_subs(topics2ctxs, topics, ctx) + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # the next waiting task will take over and spawn it again + async with lock: + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False + log.info( + f"Spawning data feed task for {funcname}") + try: + # unblocks when no more symbols subscriptions exist + # and the streamer task terminates + await fan_out_to_ctxs( + pub_async_gen_func=partial( + agen, *args, **kwargs), + topics2ctxs=topics2ctxs, + packetizer=packetizer, + ) + log.info( + f"Terminating stream task {task_name or ''}" + f" for {agen.__name__}") + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True + finally: + # remove all subs for this context + modify_subs(topics2ctxs, (), ctx) + + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(topics2ctxs.values()): + log.info( + f"No more subscriptions for publisher {task_name}") + + # invoke it + await _execute(*args, **kwargs) + + + funcname = wrapped.__name__ + if not inspect.isasyncgenfunction(wrapped): + raise TypeError( + f"Publisher {funcname} must be an async generator function" + ) + if 'get_topics' not in inspect.signature(wrapped).parameters: + raise TypeError( + f"Publisher async gen {funcname} must define a " + "`get_topics` argument" + ) + + return wrapper(wrapped)