From a9932e6c01fa8b9ff23103e47e42911f4a24fead Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 13:26:02 -0500 Subject: [PATCH 01/31] Allow passing error type to `unpack_error()` --- tractor/_exceptions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index efedcf0..7096912 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -55,12 +55,12 @@ def pack_error(exc): } -def unpack_error(msg, chan=None): +def unpack_error(msg, chan=None, err_type=RemoteActorError): """Unpack an 'error' message from the wire into a local ``RemoteActorError``. """ tb_str = msg['error'].get('tb_str', '') - return RemoteActorError( + return err_type( f"{chan.uid}\n" + tb_str, **msg['error'], ) From 87a6165430062104fe1dcf42040830af629b62c5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 14:31:17 -0500 Subject: [PATCH 02/31] Add a `Context` type for task/protocol RPC state This is loosely based off the nanomsg concept found here: https://nanomsg.github.io/nng/man/v1.1.0/nng_ctx.5 --- tractor/__init__.py | 2 +- tractor/_ipc.py | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index d8d6489..5ffcfd5 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -11,7 +11,7 @@ import trio # type: ignore from trio import MultiError from .log import get_console_log, get_logger, get_loglevel -from ._ipc import _connect_chan, Channel +from ._ipc import _connect_chan, Channel, Context from ._actor import ( Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor ) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index f7bebbe..41b8061 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -205,6 +205,28 @@ class Channel: return self.squeue.connected() if self.squeue else False +class Context: + """An IAC (inter-actor communication) context. + + Allows maintaining task or protocol specific state between communicating + actors. A unique context is created on the receiving end for every request + to a remote actor. + """ + def __init__( + self, + channel: Channel, + command_id: str, + ): + self.chan: Channel = channel + self.cid: str = command_id + + async def send_yield(self, data): + await self.chan.send({'yield': data, 'cid': self.cid}) + + async def send_stop(self): + await self.chan.send({'stop': True, 'cid': self.cid}) + + @asynccontextmanager async def _connect_chan( host: str, port: int From 41f2096e864f17c552d0bd3f732bb006620d1bca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 15:27:38 -0500 Subject: [PATCH 03/31] Adopt `Context` in the RPC core Instead of chan/cid, whenever a remote function defines a `ctx` argument name deliver a `Context` instance to the function. This allows remote funcs to provide async generator like streaming replies (and maybe more later). Additionally, - load actor modules *after* establishing a connection to the spawning parent to avoid crashing before the error can be reported upwards - fix a bug to do with unpacking and raising local internal actor errors from received messages --- tractor/_actor.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index d9a5ee1..ed2e031 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -13,9 +13,14 @@ from typing import Dict, List, Tuple, Any, Optional, Union import trio # type: ignore from async_generator import asynccontextmanager, aclosing -from ._ipc import Channel, _connect_chan +from ._ipc import Channel, _connect_chan, Context from .log import get_console_log, get_logger -from ._exceptions import pack_error, InternalActorError, ModuleNotExposed +from ._exceptions import ( + pack_error, + unpack_error, + InternalActorError, + ModuleNotExposed +) from ._portal import ( Portal, open_portal, @@ -46,15 +51,13 @@ async def _invoke( sig = inspect.signature(func) treat_as_gen = False cs = None - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid + ctx = Context(chan, cid) + if 'ctx' in sig.parameters: + kwargs['ctx'] = ctx # TODO: eventually we want to be more stringent # about what is considered a far-end async-generator. # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its + # function which declares a `ctx` kwarg in its # signature will be treated as one. treat_as_gen = True try: @@ -304,7 +307,8 @@ class Actor: await chan.send(None) await chan.aclose() except trio.BrokenResourceError: - log.exception(f"Channel for {chan.uid} was already zonked..") + log.exception( + f"Channel for {chan.uid} was already zonked..") async def _push_result(self, actorid, cid: str, msg: dict) -> None: """Push an RPC result to the local consumer's queue. @@ -389,12 +393,9 @@ class Actor: # (i.e. no cid was provided in the msg - see above). # Push this error to all local channel consumers # (normally portals) by marking the channel as errored - tb_str = msg.get('tb_str') assert chan.uid - exc = InternalActorError( - f"{chan.uid}\n" + tb_str, - **msg, - ) + exc = unpack_error( + msg, chan=chan, err_type=InternalActorError) chan._exc = exc raise exc @@ -493,9 +494,6 @@ class Actor: async with trio.open_nursery() as nursery: self._root_nursery = nursery - # load allowed RPC module - self.load_modules() - # Startup up channel server host, port = accept_addr await nursery.start(partial( @@ -524,6 +522,11 @@ class Actor: nursery.start_soon( self._process_messages, self._parent_chan) + # load exposed/allowed RPC modules + # XXX: do this **after** establishing connection to parent + # so that import errors are properly propagated upwards + self.load_modules() + # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") async with get_arbiter(*arbiter_addr) as arb_portal: From be20e1488b1107cd4c217c5c7c408d49b71dfde8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 15:32:41 -0500 Subject: [PATCH 04/31] Fix type annotations --- tractor/_ipc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 41b8061..2da8e12 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -216,14 +216,14 @@ class Context: self, channel: Channel, command_id: str, - ): + ) -> None: self.chan: Channel = channel self.cid: str = command_id - async def send_yield(self, data): + async def send_yield(self, data: Any) -> None: await self.chan.send({'yield': data, 'cid': self.cid}) - async def send_stop(self): + async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) From 737759868389e0b4863554d94758f99a78c69580 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 17:55:08 -0500 Subject: [PATCH 05/31] Properly respect `rpc_module_paths` in `run_in_actor()` --- tractor/_trionics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 289e42b..d9a5ee0 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -114,7 +114,7 @@ class ActorNursery: name: str, fn: typing.Callable, bind_addr: Tuple[str, int] = ('127.0.0.1', 0), - rpc_module_paths: List[str] = None, + rpc_module_paths: List[str] = [], statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` @@ -129,7 +129,7 @@ class ActorNursery: mod_path = fn.__module__ portal = await self.start_actor( name, - rpc_module_paths=[mod_path], + rpc_module_paths=[mod_path] + rpc_module_paths, bind_addr=bind_addr, statespace=statespace, ) From fffddf88dddc2ca454d7827481be4715989178f0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 17:55:28 -0500 Subject: [PATCH 06/31] Change parent type --- tractor/_exceptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 7096912..0466a73 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -39,7 +39,7 @@ class NoResult(RuntimeError): "No final result is expected for this actor" -class ModuleNotExposed(RuntimeError): +class ModuleNotExposed(ModuleNotFoundError): "The requested module is not exposed for RPC" From 06c908f285bfbb2b76b42e048514e778edb4bc8f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 17:56:22 -0500 Subject: [PATCH 07/31] Wrap remote import-time errors just the same --- tractor/_actor.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index ed2e031..0cf6c30 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -18,7 +18,6 @@ from .log import get_console_log, get_logger from ._exceptions import ( pack_error, unpack_error, - InternalActorError, ModuleNotExposed ) from ._portal import ( @@ -228,17 +227,9 @@ class Actor: code (if it exists). """ for path in self.rpc_module_paths: + log.debug(f"Attempting to import {path}") self._mods[path] = importlib.import_module(path) - # XXX: triggers an internal error which can cause a hanging - # problem (without the recently added .throw()) on teardown - # (root nursery tears down thus killing all channels before - # sending cancels to subactors during actor nursery teardown - # - has to do with await main() in MainProcess) - # if self.name == 'gretchen': - # self._mods.pop('test_discovery') - # TODO: how to test the above? - def _get_rpc_func(self, ns, funcname): try: return getattr(self._mods[ns], funcname) @@ -394,8 +385,7 @@ class Actor: # Push this error to all local channel consumers # (normally portals) by marking the channel as errored assert chan.uid - exc = unpack_error( - msg, chan=chan, err_type=InternalActorError) + exc = unpack_error(msg, chan=chan) chan._exc = exc raise exc From d2f0537850f6983a33a354084c56aa75e818a579 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Jan 2019 17:56:39 -0500 Subject: [PATCH 08/31] Add tests for import-time failures --- tests/test_rpc.py | 81 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/tests/test_rpc.py b/tests/test_rpc.py index ec286f5..5dec131 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -1,6 +1,8 @@ """ RPC related """ +import itertools + import pytest import tractor import trio @@ -12,17 +14,20 @@ async def sleep_back_actor( func_defined, exposed_mods, ): - async with tractor.find_actor(actor_name) as portal: - try: - await portal.run(__name__, func_name) - except tractor.RemoteActorError as err: - if not func_defined: - expect = AttributeError - if not exposed_mods: - expect = tractor.ModuleNotExposed + if actor_name: + async with tractor.find_actor(actor_name) as portal: + try: + await portal.run(__name__, func_name) + except tractor.RemoteActorError as err: + if not func_defined: + expect = AttributeError + if not exposed_mods: + expect = tractor.ModuleNotExposed - assert err.type is expect - raise + assert err.type is expect + raise + else: + await trio.sleep(float('inf')) async def short_sleep(): @@ -31,19 +36,40 @@ async def short_sleep(): @pytest.mark.parametrize( 'to_call', [ - ([], 'short_sleep'), - ([__name__], 'short_sleep'), - ([__name__], 'fake_func'), + ([], 'short_sleep', tractor.RemoteActorError), + ([__name__], 'short_sleep', tractor.RemoteActorError), + ([__name__], 'fake_func', tractor.RemoteActorError), + (['tmp_mod'], 'import doggy', ModuleNotFoundError), + (['tmp_mod'], '4doggy', SyntaxError), ], - ids=['no_mods', 'this_mod', 'this_mod_bad_func'], + ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import', + 'fail_on_syntax'], ) -def test_rpc_errors(arb_addr, to_call): +def test_rpc_errors(arb_addr, to_call, testdir): """Test errors when making various RPC requests to an actor that either doesn't have the requested module exposed or doesn't define the named function. """ - exposed_mods, funcname = to_call + exposed_mods, funcname, inside_err = to_call + subactor_exposed_mods = [] func_defined = globals().get(funcname, False) + subactor_requests_to = 'arbiter' + remote_err = tractor.RemoteActorError + + # remote module that fails at import time + if exposed_mods == ['tmp_mod']: + # create an importable module with a bad import + testdir.syspathinsert() + # module should cause raise a ModuleNotFoundError at import + testdir.makefile('.py', tmp_mod=funcname) + + # no need to exposed module to the subactor + subactor_exposed_mods = exposed_mods + exposed_mods = [] + func_defined = False + # subactor should not try to invoke anything + subactor_requests_to = None + remote_err = trio.MultiError async def main(): actor = tractor.current_actor() @@ -54,12 +80,13 @@ def test_rpc_errors(arb_addr, to_call): await n.run_in_actor( 'subactor', sleep_back_actor, - actor_name=actor.name, - # function from this module the subactor will invoke - # when it RPCs back to this actor + actor_name=subactor_requests_to, + # function from the local exposed module space + # the subactor will invoke when it RPCs back to this actor func_name=funcname, exposed_mods=exposed_mods, func_defined=True if func_defined else False, + rpc_module_paths=subactor_exposed_mods, ) def run(): @@ -73,8 +100,18 @@ def test_rpc_errors(arb_addr, to_call): if exposed_mods and func_defined: run() else: - # underlying errors are propogated upwards (yet) - with pytest.raises(tractor.RemoteActorError) as err: + # underlying errors are propagated upwards (yet) + with pytest.raises(remote_err) as err: run() - assert err.value.type is tractor.RemoteActorError + # get raw instance from pytest wrapper + value = err.value + + # might get multiple `trio.Cancelled`s as well inside an inception + if isinstance(value, trio.MultiError): + value = next(itertools.dropwhile( + lambda exc: not isinstance(exc, tractor.RemoteActorError), + value.exceptions + )) + + assert value.type is inside_err From fbb6af47f8bec54e07fa7756f0c120f17ced2d66 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Jan 2019 12:19:01 -0500 Subject: [PATCH 09/31] Add a pub-sub messaging decorator API Add a draft pub-sub API `@tractor.msg.pub` which allows for decorating an asyn generator which can stream topic keyed dictionaries for delivery to multiple calling / consuming tasks. --- tractor/__init__.py | 2 + tractor/msg.py | 154 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 tractor/msg.py diff --git a/tractor/__init__.py b/tractor/__init__.py index 5ffcfd5..4214c2d 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -18,6 +18,7 @@ from ._actor import ( from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed +from . import msg __all__ = [ @@ -30,6 +31,7 @@ __all__ = [ 'MultiError', 'RemoteActorError', 'ModuleNotExposed', + 'msg' ] diff --git a/tractor/msg.py b/tractor/msg.py new file mode 100644 index 0000000..16c3982 --- /dev/null +++ b/tractor/msg.py @@ -0,0 +1,154 @@ +""" +Messaging pattern APIs and helpers. +""" +import typing +from typing import Dict +from functools import partial + +import trio +import wrapt + +from ._ipc import Context +from .log import get_logger +from . import current_actor + +__all__ = ['pub'] + +log = get_logger('messaging') + + +async def fan_out_to_ctxs( + pub_gen: typing.AsyncGenerator, + topics2ctxs: Dict[str, Context], + topic_key: str = 'key', +) -> None: + """Request and fan out quotes to each subscribed actor channel. + """ + def get_topics(): + return tuple(topics2ctxs.keys()) + + async for published in pub_gen( + get_topics=get_topics, + ): + ctx_payloads = {} + for packet_key, data in published.items(): + # grab each suscription topic using provided key for lookup + topic = data[topic_key] + # build a new dict packet for passing to multiple channels + packet = {packet_key: data} + for ctx in topics2ctxs.get(topic, set()): + ctx_payloads.setdefault(ctx, {}).update(packet), + + # deliver to each subscriber (fan out) + if ctx_payloads: + for ctx, payload in ctx_payloads.items(): + try: + await ctx.send_yield(payload) + except ( + # That's right, anything you can think of... + trio.ClosedStreamError, ConnectionResetError, + ConnectionRefusedError, + ): + log.warn(f"{ctx.chan} went down?") + for ctx_set in topics2ctxs.values(): + ctx_set.discard(ctx) + + if not any(topics2ctxs.values()): + log.warn(f"No subscribers left for {pub_gen.__name__}") + break + + +def modify_subs(topics2ctxs, topics, ctx): + """Absolute symbol subscription list for each quote stream. + + Effectively a symbol subscription api. + """ + log.info(f"{ctx.chan} changed subscription to {topics}") + + # update map from each symbol to requesting client's chan + for ticker in topics: + topics2ctxs.setdefault(ticker, set()).add(ctx) + + # remove any existing symbol subscriptions if symbol is not + # found in ``symbols`` + # TODO: this can likely be factored out into the pub-sub api + for ticker in filter( + lambda topic: topic not in topics, topics2ctxs.copy() + ): + ctx_set = topics2ctxs.get(ticker) + ctx_set.discard(ctx) + + if not ctx_set: + # pop empty sets which will trigger bg quoter task termination + topics2ctxs.pop(ticker) + + +def pub(*, tasks=()): + """Publisher async generator decorator. + + A publisher can be called many times from different actor's + remote tasks but will only spawn one internal task to deliver + values to all callers. Values yielded from the decorated + async generator are sent back to each calling task, filtered by + topic on the producer (server) side. + + Must be called with a topic as the first arg. + """ + task2lock = {} + for name in tasks: + task2lock[name] = trio.StrictFIFOLock() + + @wrapt.decorator + async def wrapper(agen, instance, args, kwargs): + if tasks: + task_key = kwargs.pop('task_key') + if not task_key: + raise TypeError( + f"{agen} must be called with a `task_key` named argument " + f"with a falue from {tasks}") + + # pop required kwargs used internally + ctx = kwargs.pop('ctx') + topics = kwargs.pop('topics') + topic_key = kwargs.pop('topic_key') + + lock = task2lock[task_key] + ss = current_actor().statespace + all_subs = ss.setdefault('_subs', {}) + topics2ctxs = all_subs.setdefault(task_key, {}) + + try: + modify_subs(topics2ctxs, topics, ctx) + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # we'll take over and spawn it again + # update map from each symbol to requesting client's chan + async with lock: + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False + log.info(f"Spawning data feed task for {agen.__name__}") + try: + # unblocks when no more symbols subscriptions exist + # and the streamer task terminates + await fan_out_to_ctxs( + pub_gen=partial(agen, *args, **kwargs), + topics2ctxs=topics2ctxs, + topic_key=topic_key, + ) + log.info(f"Terminating stream task for {task_key}" + f" for {agen.__name__}") + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True + finally: + # remove all subs for this context + modify_subs(topics2ctxs, (), ctx) + + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(topics2ctxs.values()): + log.info(f"No more subscriptions for publisher {task_key}") + + return wrapper From 7cec62d585bb371547752742e1d13b61d34362bc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Jan 2019 13:41:49 -0500 Subject: [PATCH 10/31] Add wrapt --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9103ab7..885d992 100755 --- a/setup.py +++ b/setup.py @@ -37,7 +37,8 @@ setup( 'tractor', 'tractor.testing', ], - install_requires=['msgpack', 'trio>0.8', 'async_generator', 'colorlog'], + install_requires=[ + 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'], tests_require=['pytest'], python_requires=">=3.6", keywords=[ From c58a6ea80f6a8af6ffc14d9d29e506dfa351547a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Jan 2019 16:50:30 -0500 Subject: [PATCH 11/31] Fix type annots --- tractor/msg.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index 16c3982..52d240e 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -2,13 +2,12 @@ Messaging pattern APIs and helpers. """ import typing -from typing import Dict +from typing import Dict, Any from functools import partial import trio import wrapt -from ._ipc import Context from .log import get_logger from . import current_actor @@ -18,8 +17,8 @@ log = get_logger('messaging') async def fan_out_to_ctxs( - pub_gen: typing.AsyncGenerator, - topics2ctxs: Dict[str, Context], + pub_gen: typing.Callable, # it's an async gen ... gd mypy + topics2ctxs: Dict[str, set], topic_key: str = 'key', ) -> None: """Request and fan out quotes to each subscribed actor channel. @@ -30,7 +29,7 @@ async def fan_out_to_ctxs( async for published in pub_gen( get_topics=get_topics, ): - ctx_payloads = {} + ctx_payloads: Dict[str, Any] = {} for packet_key, data in published.items(): # grab each suscription topic using provided key for lookup topic = data[topic_key] From 76f7ae5cf47c7134d1f0fc362a695d15ab914bf0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Jan 2019 17:09:30 -0500 Subject: [PATCH 12/31] Log about the loglevel --- tractor/_actor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_actor.py b/tractor/_actor.py index 0cf6c30..eb6002e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -453,6 +453,8 @@ class Actor: self._forkserver_info = forkserver_info from ._trionics import ctx if self.loglevel is not None: + log.info( + f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) log.info( f"Started new {ctx.current_process()} for {self.uid}") From b403a20f32776806abda87dca3ac8a6a89aef891 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Jan 2019 23:19:29 -0500 Subject: [PATCH 13/31] Document context api --- README.rst | 76 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 20 deletions(-) diff --git a/README.rst b/README.rst index 8af6799..5b3602b 100644 --- a/README.rst +++ b/README.rst @@ -61,7 +61,6 @@ Philosophy .. _pulsar: http://quantmind.github.io/pulsar/design.html .. _execnet: https://codespeak.net/execnet/ - Install ------- No PyPi release yet! @@ -199,7 +198,7 @@ method: - actors can be spawned to *live forever* using the ``start_actor()`` method and act like an RPC daemon that runs indefinitely (the - ``with tractor.open_nursery()`` wont' exit) until cancelled_ + ``with tractor.open_nursery()`` won't exit) until cancelled_ Had we wanted the latter form in our example it would have looked like: @@ -255,8 +254,8 @@ to all others with ease over standard network protocols). .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor -Transparent remote function calling using *portals* ---------------------------------------------------- +Async IPC using *portals* +------------------------- ``tractor`` introduces the concept of a *portal* which is an API borrowed_ from ``trio``. A portal may seem similar to the idea of a RPC future_ except a *portal* allows invoking remote *async* functions and @@ -494,12 +493,12 @@ a ``Supervisor`` type. .. _erlang strategies: http://learnyousomeerlang.com/supervisors -Shared task state ------------------ +Actor local variables +--------------------- Although ``tractor`` uses a *shared-nothing* architecture between processes -you can of course share state within an actor. ``trio`` tasks spawned via -multiple RPC calls to an actor can access global data using the per actor -``statespace`` dictionary: +you can of course share state between tasks running *within* an actor. +``trio`` tasks spawned via multiple RPC calls to an actor can access global +state using the per actor ``statespace`` dictionary: .. code:: python @@ -561,16 +560,45 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. -Using ``Channel`` directly (undocumented) +Streaming and using channels and contexts ----------------------------------------- -You can use the ``Channel`` api if necessary by simply defining a -``chan`` and ``cid`` *kwarg* in your async function definition. -``tractor`` will treat such async functions like async generators on -the calling side (for now anyway) such that you can push stream values -a little more granularly if you find *yielding* values to be restrictive. -I am purposely not documenting this feature with code because I'm not yet -sure yet how it should be used correctly. If you'd like more details -please feel free to ask me on the `trio gitter channel`_. +A ``Channel`` is the API which wraps an underlying *transport* and *interchange* +format to support *inter-actor-communication* (IAC). In its present state ``tractor`` +uses TCP and msgpack_ but hopefully more alternatives in the future. + +If you aren't fond of having to write an async generator to stream data +between actors (or need something more flexible) you can instead use a +``Context``. A context wraps an actor-local spawned task and a ``Channel`` +so that tasks executing across multiple processes can stream data +to one another using a low level, request oriented, API. + +As an example if you wanted to create a streaming server without writing +an async generator that *yields* values (which are implicitly +shipped over a channel to the caller) you instead define an async +function: + +.. code:: python + + async def streamer(ctx, rate=2): + """A simple web response streaming server. + """ + while True: + val = await web_request('http://data.feed.com') + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) + + +All that's required is declaring a ``ctx`` argument name somewhere in +your function signature and ``tractor`` will treat the async function +like async generator as a streaming function on the client task's side. +As you can imagine, this turns out to be handy particularly if you have +multiple tasks streaming responses concurrently. + +The context idea comes from the `protocol context`_ in nanomsg_. +It allows you to perfom IPC in a task or protocol specific way. Running actors standalone (without spawning) @@ -616,12 +644,20 @@ Stuff I'd like to see ``tractor`` do real soon: - an extensive `chaos engineering`_ test suite - support for reactive programming primitives and native support for asyncitertools_ like libs -If you're interested in tackling any of these please do shout about it on the -`trio gitter channel`_! + +Feel like saying hi? +-------------------- +This project is very much coupled to the ongoing development of +``trio`` (i.e. ``tractor`` gets all its ideas from that brilliant +community). If you want to help, have suggestions or just want to +say hi, please feel free to ping me on the `trio gitter channel`_! + .. _supervisors: https://github.com/tgoodlet/tractor/issues/22 .. _nanomsg: https://github.com/tgoodlet/tractor/issues/19 +.. _nng context: https://github.com/tgoodlet/tractor/issues/19 .. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol .. _trio gitter channel: https://gitter.im/python-trio/general .. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html .. _pdb++: https://github.com/antocuni/pdb +.. _msgpack: https://en.wikipedia.org/wiki/MessagePack From 251ee177fae833e0963d7c55911a8d02874d9f53 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 20 Jan 2019 21:47:08 -0500 Subject: [PATCH 14/31] Make the `Context` a dataclass --- tractor/_ipc.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 2da8e12..0477740 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,6 +1,7 @@ """ Inter-process comms abstractions """ +from dataclasses import dataclass import typing from typing import Any, Tuple, Optional @@ -205,6 +206,7 @@ class Channel: return self.squeue.connected() if self.squeue else False +@dataclass(frozen=True) class Context: """An IAC (inter-actor communication) context. @@ -212,13 +214,12 @@ class Context: actors. A unique context is created on the receiving end for every request to a remote actor. """ - def __init__( - self, - channel: Channel, - command_id: str, - ) -> None: - self.chan: Channel = channel - self.cid: str = command_id + chan: Channel + cid: str + + # TODO: we should probably attach the actor-task + # cancel scope here now that trio is exposing it + # as a public object async def send_yield(self, data: Any) -> None: await self.chan.send({'yield': data, 'cid': self.cid}) From 03e00886dad4cae854c2248f0deadd42d2ffb5fe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 00:16:20 -0500 Subject: [PATCH 15/31] Add `Actor.cancel_task()` Enable cancelling specific tasks from a peer actor such that when a actor task or the actor itself is cancelled, remotely spawned tasks can also be cancelled. In much that same way that you'd expect a node (task) in the `trio` task tree to cancel any subtasks, actors should be able to cancel any tasks they spawn in separate processes. To enable this: - track rpc tasks in a flat dict keyed by (chan, cid) - store a `is_complete` event to enable waiting on specific tasks to complete - allow for shielding the msg loop inside an internal cancel scope if requested by the caller; there was an issue with `open_portal()` where the channel would be torn down because the current task was cancelled but we still need messaging to continue until the portal block is exited - throw an error if the arbiter tries to find itself for now --- tractor/_actor.py | 245 ++++++++++++++++++++++++++-------------------- 1 file changed, 139 insertions(+), 106 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index eb6002e..3d0a4f5 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -140,18 +140,14 @@ async def _invoke( task_status.started(err) finally: # RPC task bookeeping - tasks = actor._rpc_tasks.get(chan, None) - if tasks: - try: - scope, func = tasks.pop(cid) - except ValueError: - # If we're cancelled before the task returns then the - # cancel scope will not have been inserted yet - log.warn( - f"Task {func} was likely cancelled before it was started") - - if not tasks: - actor._rpc_tasks.pop(chan, None) + try: + scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) + is_complete.set() + except KeyError: + # If we're cancelled before the task returns then the + # cancel scope will not have been inserted yet + log.warn( + f"Task {func} was likely cancelled before it was started") if not actor._rpc_tasks: log.info(f"All RPC tasks have completed") @@ -197,9 +193,10 @@ class Actor: self._no_more_rpc_tasks = trio.Event() self._no_more_rpc_tasks.set() + # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: Dict[ - Channel, - Dict[str, Tuple[trio._core._run.CancelScope, typing.Callable]] + Tuple[Channel, str], + Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} @@ -268,7 +265,7 @@ class Actor: log.warning( f"already have channel(s) for {uid}:{chans}?" ) - log.debug(f"Registered {chan} for {uid}") + log.trace(f"Registered {chan} for {uid}") # append new channel self._peers[uid].append(chan) @@ -295,8 +292,9 @@ class Actor: if chan.connected(): log.debug(f"Disconnecting channel {chan}") try: + # send our msg loop terminate sentinel await chan.send(None) - await chan.aclose() + # await chan.aclose() except trio.BrokenResourceError: log.exception( f"Channel for {chan.uid} was already zonked..") @@ -334,7 +332,10 @@ class Actor: return cid, q async def _process_messages( - self, chan: Channel, treat_as_gen: bool = False + self, chan: Channel, + treat_as_gen: bool = False, + shield: bool = False, + task_status=trio.TASK_STATUS_IGNORED, ) -> None: """Process messages for the channel async-RPC style. @@ -342,91 +343,90 @@ class Actor: """ # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! + msg = None log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - async for msg in chan: - if msg is None: # terminate sentinel - log.debug( - f"Cancelling all tasks for {chan} from {chan.uid}") - for cid, (scope, func) in self._rpc_tasks.pop( - chan, {} - ).items(): - scope.cancel() - log.debug( - f"Msg loop signalled to terminate for" - f" {chan} from {chan.uid}") - break - log.debug(f"Received msg {msg} from {chan.uid}") - cid = msg.get('cid') - if cid: - cancel = msg.get('cancel') - if cancel: - # right now this is only implicitly used by - # async generator IPC - scope, func = self._rpc_tasks[chan][cid] + # internal scope allows for keeping this message + # loop running despite the current task having been + # cancelled (eg. `open_portal()` may call this method from + # a locally spawned task) + with trio.open_cancel_scope(shield=shield) as cs: + task_status.started(cs) + async for msg in chan: + if msg is None: # loop terminate sentinel log.debug( - f"Received cancel request for task {cid}" - f" from {chan.uid}") - scope.cancel() - else: + f"Cancelling all tasks for {chan} from {chan.uid}") + for (channel, cid) in self._rpc_tasks: + if channel is chan: + self.cancel_task(cid, Context(channel, cid)) + log.debug( + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") + break + + log.debug(f"Received msg {msg} from {chan.uid}") + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter await self._push_result(chan.uid, cid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") - continue - - # process command request - try: - ns, funcname, kwargs, actorid, cid = msg['cmd'] - except KeyError: - # This is the non-rpc error case, that is, an - # error **not** raised inside a call to ``_invoke()`` - # (i.e. no cid was provided in the msg - see above). - # Push this error to all local channel consumers - # (normally portals) by marking the channel as errored - assert chan.uid - exc = unpack_error(msg, chan=chan) - chan._exc = exc - raise exc - - log.debug( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - if ns == 'self': - func = getattr(self, funcname) - else: - # complain to client about restricted modules - try: - func = self._get_rpc_func(ns, funcname) - except (ModuleNotExposed, AttributeError) as err: - err_msg = pack_error(err) - err_msg['cid'] = cid - await chan.send(err_msg) continue - # spin up a task for the requested function - log.debug(f"Spawning task for {func}") - cs = await self._root_nursery.start( - _invoke, self, cid, chan, func, kwargs, - name=funcname - ) - # never allow cancelling cancel requests (results in - # deadlock and other weird behaviour) - if func != self.cancel: - if isinstance(cs, Exception): - log.warn(f"Task for RPC func {func} failed with {cs}") + # process command request + try: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + except KeyError: + # This is the non-rpc error case, that is, an + # error **not** raised inside a call to ``_invoke()`` + # (i.e. no cid was provided in the msg - see above). + # Push this error to all local channel consumers + # (normally portals) by marking the channel as errored + assert chan.uid + exc = unpack_error(msg, chan=chan) + chan._exc = exc + raise exc + + log.debug( + f"Processing request from {actorid}\n" + f"{ns}.{funcname}({kwargs})") + if ns == 'self': + func = getattr(self, funcname) else: - # mark that we have ongoing rpc tasks - self._no_more_rpc_tasks.clear() - log.info(f"RPC func is {func}") - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - self._rpc_tasks.setdefault(chan, {})[cid] = (cs, func) - log.debug( - f"Waiting on next msg for {chan} from {chan.uid}") - else: - # channel disconnect - log.debug(f"{chan} from {chan.uid} disconnected") + # complain to client about restricted modules + try: + func = self._get_rpc_func(ns, funcname) + except (ModuleNotExposed, AttributeError) as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + await chan.send(err_msg) + continue + + # spin up a task for the requested function + log.debug(f"Spawning task for {func}") + cs = await self._root_nursery.start( + _invoke, self, cid, chan, func, kwargs, + name=funcname + ) + # never allow cancelling cancel requests (results in + # deadlock and other weird behaviour) + if func != self.cancel: + if isinstance(cs, Exception): + log.warn(f"Task for RPC func {func} failed with" + f"{cs}") + else: + # mark that we have ongoing rpc tasks + self._no_more_rpc_tasks.clear() + log.info(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks[(chan, cid)] = ( + cs, func, trio.Event()) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") + else: + # channel disconnect + log.debug(f"{chan} from {chan.uid} disconnected") except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") @@ -439,8 +439,14 @@ class Actor: raise # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" + except trio.Cancelled: + # debugging only + log.debug("Msg loop was cancelled") + raise finally: - log.debug(f"Exiting msg loop for {chan} from {chan.uid}") + log.debug( + f"Exiting msg loop for {chan} from {chan.uid} " + f"with last msg:\n{msg}") def _fork_main( self, @@ -541,8 +547,7 @@ class Actor: if self._parent_chan: try: # internal error so ship to parent without cid - await self._parent_chan.send( - pack_error(err)) + await self._parent_chan.send(pack_error(err)) except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " @@ -627,21 +632,47 @@ class Actor: self.cancel_server() self._root_nursery.cancel_scope.cancel() + async def cancel_task(self, cid, ctx): + """Cancel a local task. + + Note this method will be treated as a streaming funciton + by remote actor-callers due to the declaration of ``ctx`` + in the signature (for now). + """ + # right now this is only implicitly called by + # streaming IPC but it should be called + # to cancel any remotely spawned task + chan = ctx.chan + # the ``dict.get()`` ensures the requested task to be cancelled + # was indeed spawned by a request from this channel + scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + log.debug( + f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" + f"peer: {chan.uid}\n") + + # if func is self.cancel_task: + # return + + scope.cancel() + # wait for _invoke to mark the task complete + await is_complete.wait() + log.debug( + f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" + f"peer: {chan.uid}\n") + async def cancel_rpc_tasks(self) -> None: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ tasks = self._rpc_tasks - log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks}") - for chan, cids2scopes in tasks.items(): - log.debug(f"Cancelling all tasks for {chan.uid}") - for cid, (scope, func) in cids2scopes.items(): - log.debug(f"Cancelling task for {func}") - scope.cancel() - if tasks: - log.info( - f"Waiting for remaining rpc tasks to complete {tasks}") - await self._no_more_rpc_tasks.wait() + log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") + for (chan, cid) in tasks.copy(): + # TODO: this should really done in a nursery batch + await self.cancel_task(cid, Context(chan, cid)) + # if tasks: + log.info( + f"Waiting for remaining rpc tasks to complete {tasks}") + await self._no_more_rpc_tasks.wait() def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby @@ -810,7 +841,9 @@ async def find_actor( sockaddr = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just # the last one that registered - if sockaddr: + if name == 'arbiter' and actor.is_arbiter: + raise RuntimeError("The current actor is the arbiter") + elif sockaddr: async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal From 97f709cc143ebdc2cc5da08325d4c143b2c2e8d7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 00:45:54 -0500 Subject: [PATCH 16/31] Cancel remote streaming tasks on a local cancel Use the new `Actor.cancel_task()` api to remotely cancel streaming tasks spawned by a portal. This guarantees that if an actor is cancelled all its (remote) portal spawned tasks will be as well. On portal teardown only cancel all async generator calls (though we should cancel all RPC requests in general eventually) and don't close the channel since it may have been passed in from some other context that wishes to keep it connected. In `open_portal()` run the message loop shielded so that if the local task is cancelled, messaging will continue until the internal scope is cancelled at end of block. --- tractor/_portal.py | 86 ++++++++++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 30 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index fff39db..6a47330 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -5,9 +5,11 @@ import importlib import inspect import typing from typing import Tuple, Any, Dict, Optional, Set +from functools import partial +from dataclasses import dataclass import trio -from async_generator import asynccontextmanager +from async_generator import asynccontextmanager, aclosing from ._state import current_actor from ._ipc import Channel @@ -53,7 +55,7 @@ class Portal: underlying ``tractor.Channel`` as though the remote (async) function / generator was invoked locally. - Think of this like an native async IPC API. + Think of this like a native async IPC API. """ def __init__(self, channel: Channel) -> None: self.channel = channel @@ -124,7 +126,7 @@ class Portal: # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': + if resptype == 'yield': # stream response async def yield_from_q(): try: @@ -140,12 +142,21 @@ class Portal: "Received internal error at portal?") raise unpack_error(msg, self.channel) - except GeneratorExit: - # for now this msg cancels an ongoing remote task - await self.channel.send({'cancel': True, 'cid': cid}) - log.warn( + except (GeneratorExit, trio.Cancelled): + log.warning( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") + with trio.open_cancel_scope() as cleanup_scope: + cleanup_scope.shield = True + # TODO: yeah.. it'd be nice if this was just an + # async func on the far end. Gotta figure out a + # better way then implicitly feeding the ctx + # to declaring functions; likely a decorator + # sytsem. + agen = await self.run('self', 'cancel_task', cid=cid) + async with aclosing(agen) as agen: + async for _ in agen: + pass raise # TODO: use AsyncExitStack to aclose() all agens @@ -154,7 +165,7 @@ class Portal: self._agens.add(agen) return agen - elif resptype == 'return': + elif resptype == 'return': # single response msg = await q.get() try: return msg['return'] @@ -176,7 +187,7 @@ class Portal: # not expecting a "main" result if self._expect_result is None: - log.warn( + log.warning( f"Portal for {self.channel.uid} not expecting a final" " result?\nresult() should only be called if subactor" " was spawned with `ActorNursery.run_in_actor()`") @@ -198,22 +209,34 @@ class Portal: return self._result + async def _cancel_streams(self): + # terminate all locally running async generator + # IPC calls + if self._agens: + log.warning( + f"Cancelling all streams with {self.channel.uid}") + for agen in self._agens: + await agen.aclose() + async def close(self) -> None: - # trigger remote msg loop `break` - chan = self.channel - log.debug(f"Closing portal for {chan} to {chan.uid}") - await self.channel.send(None) + await self._cancel_streams() async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ + if not self.channel.connected(): + log.warning("This portal is already closed can't cancel") + return False + + await self._cancel_streams() + log.warning( - f"Sending cancel request to {self.channel.uid} on " + f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") try: - with trio.move_on_after(0.1) as cancel_scope: + # send cancel cmd - might not get response + with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True - # send cancel cmd - might not get response await self.run('self', 'cancel') return True except trio.ClosedResourceError: @@ -225,17 +248,14 @@ class Portal: return False +@dataclass class LocalPortal: """A 'portal' to a local ``Actor``. A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__( - self, - actor: 'Actor' # type: ignore - ) -> None: - self.actor = actor + actor: 'Actor' # type: ignore async def run(self, ns: str, func_name: str, **kwargs) -> Any: """Run a requested function locally and return it's result. @@ -270,20 +290,26 @@ async def open_portal( if channel.uid is None: await _do_handshake(actor, channel) - nursery.start_soon(actor._process_messages, channel) + msg_loop_cs = await nursery.start( + partial( + actor._process_messages, + channel, + # if the local task is cancelled we want to keep + # the msg loop running until our block ends + shield=True, + ) + ) portal = Portal(channel) try: yield portal finally: - # tear down all async generators - for agen in portal._agens: - await agen.aclose() + await portal.close() - # cancel remote channel-msg loop - if channel.connected(): - await portal.close() + if was_connected: + # cancel remote channel-msg loop + await channel.send(None) + await channel.aclose() # cancel background msg loop task + msg_loop_cs.cancel() nursery.cancel_scope.cancel() - if was_connected: - await channel.aclose() From b6cc1e8c229b4e682f8a194d12e12d493fad3f47 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 08:35:43 -0500 Subject: [PATCH 17/31] More pub decorator improvements - when calling the async gen func provided by the user wrap it in `@async_generator.aclosing` to ensure correct teardown at cancel time - expect the gen to yield a dict with topic keys and data values - add a `packetizer` function argument to the api allowing a user to format the data to be published in whatever way desired - support using the decorator without the parentheses (using default arguments) - use a `wrapt` "adapter" to override the signature presented to the `_actor._invoke` inspection machinery - handle the default case where `tasks` isn't provided; allow only one concurrent publisher task - store task locks in an actor local variable - add a comprehensive doc string --- tractor/msg.py | 190 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 137 insertions(+), 53 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index 52d240e..f34d32a 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -2,8 +2,9 @@ Messaging pattern APIs and helpers. """ import typing -from typing import Dict, Any +from typing import Dict, Any, Sequence from functools import partial +from async_generator import aclosing import trio import wrapt @@ -17,44 +18,46 @@ log = get_logger('messaging') async def fan_out_to_ctxs( - pub_gen: typing.Callable, # it's an async gen ... gd mypy + pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy topics2ctxs: Dict[str, set], - topic_key: str = 'key', + packetizer: typing.Callable = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. """ def get_topics(): return tuple(topics2ctxs.keys()) - async for published in pub_gen( - get_topics=get_topics, - ): - ctx_payloads: Dict[str, Any] = {} - for packet_key, data in published.items(): - # grab each suscription topic using provided key for lookup - topic = data[topic_key] - # build a new dict packet for passing to multiple channels - packet = {packet_key: data} - for ctx in topics2ctxs.get(topic, set()): - ctx_payloads.setdefault(ctx, {}).update(packet), + agen = pub_async_gen_func(get_topics=get_topics) + async with aclosing(agen) as pub_gen: + async for published in pub_gen: + ctx_payloads: Dict[str, Any] = {} + for topic, data in published.items(): + log.debug(f"publishing {topic, data}") + # build a new dict packet or invoke provided packetizer + if packetizer is None: + packet = {topic: data} + else: + packet = packetizer(topic, data) + for ctx in topics2ctxs.get(topic, set()): + ctx_payloads.setdefault(ctx, {}).update(packet), - # deliver to each subscriber (fan out) - if ctx_payloads: - for ctx, payload in ctx_payloads.items(): - try: - await ctx.send_yield(payload) - except ( - # That's right, anything you can think of... - trio.ClosedStreamError, ConnectionResetError, - ConnectionRefusedError, - ): - log.warn(f"{ctx.chan} went down?") - for ctx_set in topics2ctxs.values(): - ctx_set.discard(ctx) + # deliver to each subscriber (fan out) + if ctx_payloads: + for ctx, payload in ctx_payloads.items(): + try: + await ctx.send_yield(payload) + except ( + # That's right, anything you can think of... + trio.ClosedStreamError, ConnectionResetError, + ConnectionRefusedError, + ): + log.warning(f"{ctx.chan} went down?") + for ctx_set in topics2ctxs.values(): + ctx_set.discard(ctx) - if not any(topics2ctxs.values()): - log.warn(f"No subscribers left for {pub_gen.__name__}") - break + if not get_topics(): + log.warning(f"No subscribers left for {pub_gen}") + break def modify_subs(topics2ctxs, topics, ctx): @@ -62,7 +65,7 @@ def modify_subs(topics2ctxs, topics, ctx): Effectively a symbol subscription api. """ - log.info(f"{ctx.chan} changed subscription to {topics}") + log.info(f"{ctx.chan.uid} changed subscription to {topics}") # update map from each symbol to requesting client's chan for ticker in topics: @@ -82,46 +85,127 @@ def modify_subs(topics2ctxs, topics, ctx): topics2ctxs.pop(ticker) -def pub(*, tasks=()): +def pub( + wrapped: typing.Callable = None, + *, + tasks: Sequence[str] = set(), +): """Publisher async generator decorator. - A publisher can be called many times from different actor's - remote tasks but will only spawn one internal task to deliver - values to all callers. Values yielded from the decorated - async generator are sent back to each calling task, filtered by - topic on the producer (server) side. + A publisher can be called multiple times from different actors + but will only spawn a finite set of internal tasks to stream values + to each caller. The ``tasks` argument to the decorator (``Set[str]``) + specifies the names of the mutex set of publisher tasks. + When the publisher function is called, an argument ``task_name`` must be + passed to specify which task (of the set named in ``tasks``) should be + used. This allows for using the same publisher with different input + (arguments) without allowing more concurrent tasks then necessary. - Must be called with a topic as the first arg. + Values yielded from the decorated async generator + must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the + topic string an determines which subscription the packet will be delivered + to and the value is a packet ``Dict[str, Any]`` by default of the form: + + .. ::python + + {topic: value} + + The caller can instead opt to pass a ``packetizer`` callback who's return + value will be delivered as the published response. + + The decorated function must *accept* an argument :func:`get_topics` which + dynamically returns the tuple of current subscriber topics: + + .. code:: python + + from tractor.msg import pub + + @pub(tasks={'source_1', 'source_2'}) + async def pub_service(get_topics): + data = await web_request(endpoints=get_topics()) + for item in data: + yield data['key'], data + + + The publisher must be called passing in the following arguments: + - ``topics: Sequence[str]`` the topic sequence or "subscriptions" + - ``task_name: str`` the task to use (if ``tasks`` was passed) + - ``ctx: Context`` the tractor context (only needed if calling the + pub func without a nursery, otherwise this is provided implicitly) + - packetizer: ``Callable[[str, Any], Any]`` a callback who receives + the topic and value from the publisher function each ``yield`` such that + whatever is returned is sent as the published value to subscribers of + that topic. By default this is a dict ``{topic: value}``. + + As an example, to make a subscriber call the above function: + + .. code:: python + + from functools import partial + import tractor + + async with tractor.open_nursery() as n: + portal = n.run_in_actor( + 'publisher', # actor name + partial( # func to execute in it + pub_service, + topics=('clicks', 'users'), + task_name='source1', + ) + ) + async for value in portal.result(): + print(f"Subscriber received {value}") + + + Here, you don't need to provide the ``ctx`` argument since the remote actor + provides it automatically to the spawned task. If you were to call + ``pub_service()`` directly from a wrapping function you would need to + provide this explicitly. + + Remember you only need this if you need *a finite set of tasks* running in + a single actor to stream data to an arbitrary number of subscribers. If you + are ok to have a new task running for every call to ``pub_service()`` then + probably don't need this. """ - task2lock = {} + # handle the decorator not called with () case + if wrapped is None: + return partial(pub, tasks=tasks) + + task2lock = {None: trio.StrictFIFOLock()} for name in tasks: task2lock[name] = trio.StrictFIFOLock() - @wrapt.decorator + async def takes_ctx(get_topics, ctx=None): + pass + + @wrapt.decorator(adapter=takes_ctx) async def wrapper(agen, instance, args, kwargs): + task_name = None if tasks: - task_key = kwargs.pop('task_key') - if not task_key: + try: + task_name = kwargs.pop('task_name') + except KeyError: raise TypeError( - f"{agen} must be called with a `task_key` named argument " + f"{agen} must be called with a `task_name` named argument " f"with a falue from {tasks}") # pop required kwargs used internally ctx = kwargs.pop('ctx') topics = kwargs.pop('topics') - topic_key = kwargs.pop('topic_key') + packetizer = kwargs.pop('packetizer', None) - lock = task2lock[task_key] ss = current_actor().statespace + lockmap = ss.setdefault('_pubtask2lock', task2lock) + lock = lockmap[task_name] + all_subs = ss.setdefault('_subs', {}) - topics2ctxs = all_subs.setdefault(task_key, {}) + topics2ctxs = all_subs.setdefault(task_name, {}) try: modify_subs(topics2ctxs, topics, ctx) # block and let existing feed task deliver # stream data until it is cancelled in which case - # we'll take over and spawn it again - # update map from each symbol to requesting client's chan + # the next waiting task will take over and spawn it again async with lock: # no data feeder task yet; so start one respawn = True @@ -132,11 +216,11 @@ def pub(*, tasks=()): # unblocks when no more symbols subscriptions exist # and the streamer task terminates await fan_out_to_ctxs( - pub_gen=partial(agen, *args, **kwargs), + pub_async_gen_func=partial(agen, *args, **kwargs), topics2ctxs=topics2ctxs, - topic_key=topic_key, + packetizer=packetizer, ) - log.info(f"Terminating stream task for {task_key}" + log.info(f"Terminating stream task {task_name or ''}" f" for {agen.__name__}") except trio.BrokenResourceError: log.exception("Respawning failed data feed task") @@ -148,6 +232,6 @@ def pub(*, tasks=()): # if there are truly no more subscriptions with this broker # drop from broker subs dict if not any(topics2ctxs.values()): - log.info(f"No more subscriptions for publisher {task_key}") + log.info(f"No more subscriptions for publisher {task_name}") - return wrapper + return wrapper(wrapped) From 276782eb4587688e1c79a1e198fa5b8f636d434f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 11:38:09 -0500 Subject: [PATCH 18/31] Add context examples --- README.rst | 55 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/README.rst b/README.rst index 5b3602b..1b5fc9c 100644 --- a/README.rst +++ b/README.rst @@ -42,6 +42,9 @@ down. A great place to start is the `trio docs`_ and this `blog post`_. .. _modern async Python: https://www.python.org/dev/peps/pep-0525/ +.. contents:: Table of Contents + + Philosophy ---------- ``tractor``'s tenets non-comprehensively include: @@ -560,21 +563,20 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. -Streaming and using channels and contexts ------------------------------------------ -A ``Channel`` is the API which wraps an underlying *transport* and *interchange* -format to support *inter-actor-communication* (IAC). In its present state ``tractor`` -uses TCP and msgpack_ but hopefully more alternatives in the future. +Streaming using channels and contexts +------------------------------------- +``Channel`` is the API which wraps an underlying *transport* and *interchange* +format to enable *inter-actor-communication*. In its present state ``tractor`` +uses TCP and msgpack_. If you aren't fond of having to write an async generator to stream data between actors (or need something more flexible) you can instead use a ``Context``. A context wraps an actor-local spawned task and a ``Channel`` so that tasks executing across multiple processes can stream data -to one another using a low level, request oriented, API. +to one another using a low level, request oriented API. As an example if you wanted to create a streaming server without writing -an async generator that *yields* values (which are implicitly -shipped over a channel to the caller) you instead define an async +an async generator that *yields* values you instead define an async function: .. code:: python @@ -593,16 +595,35 @@ function: All that's required is declaring a ``ctx`` argument name somewhere in your function signature and ``tractor`` will treat the async function -like async generator as a streaming function on the client task's side. -As you can imagine, this turns out to be handy particularly if you have -multiple tasks streaming responses concurrently. +like an async generator - as a streaming function from the client side. +This turns out to be handy particularly if you have +multiple tasks streaming responses concurrently: -The context idea comes from the `protocol context`_ in nanomsg_. -It allows you to perfom IPC in a task or protocol specific way. +.. code:: python + + async def streamer(ctx, url, rate=2): + """A simple web response streaming server. + """ + while True: + val = await web_request(url) + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) -Running actors standalone (without spawning) --------------------------------------------- + async def stream_multiple_sources(ctx, sources): + async with trio.open_nursery() as n: + for url in sources: + n.start_soon(streamer, ctx, url) + + +The context notion comes from the context_ in nanomsg_. + + +Running actors standalone +------------------------- You don't have to spawn any actors using ``open_nursery()`` if you just want to run a single actor that connects to an existing cluster. All the comms and arbiter registration stuff still works. This can @@ -654,8 +675,8 @@ say hi, please feel free to ping me on the `trio gitter channel`_! .. _supervisors: https://github.com/tgoodlet/tractor/issues/22 -.. _nanomsg: https://github.com/tgoodlet/tractor/issues/19 -.. _nng context: https://github.com/tgoodlet/tractor/issues/19 +.. _nanomsg: https://nanomsg.github.io/nng/index.html +.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 .. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol .. _trio gitter channel: https://gitter.im/python-trio/general .. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html From 19349f8cff689182de8d0880241155d0ff22091b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 11:56:33 -0500 Subject: [PATCH 19/31] Add TOC and examples subsections --- README.rst | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/README.rst b/README.rst index 1b5fc9c..3e16684 100644 --- a/README.rst +++ b/README.rst @@ -42,7 +42,7 @@ down. A great place to start is the `trio docs`_ and this `blog post`_. .. _modern async Python: https://www.python.org/dev/peps/pep-0525/ -.. contents:: Table of Contents +.. contents:: Philosophy @@ -64,6 +64,7 @@ Philosophy .. _pulsar: http://quantmind.github.io/pulsar/design.html .. _execnet: https://codespeak.net/execnet/ + Install ------- No PyPi release yet! @@ -73,8 +74,12 @@ No PyPi release yet! pip install git+git://github.com/tgoodlet/tractor.git +Examples +-------- + + A trynamic first scene ----------------------- +********************** Let's direct a couple *actors* and have them run their lines for the hip new film we're shooting: @@ -131,7 +136,7 @@ this case our "director" executing ``main()``). Actor spawning and causality ----------------------------- +**************************** ``tractor`` tries to take ``trio``'s concept of causal task lifetimes to multi-process land. Accordingly, ``tractor``'s *actor nursery* behaves similar to ``trio``'s nursery_. That is, ``tractor.open_nursery()`` @@ -258,7 +263,7 @@ to all others with ease over standard network protocols). Async IPC using *portals* -------------------------- +************************* ``tractor`` introduces the concept of a *portal* which is an API borrowed_ from ``trio``. A portal may seem similar to the idea of a RPC future_ except a *portal* allows invoking remote *async* functions and @@ -324,6 +329,9 @@ generator function running in a separate actor: tractor.run(main) + +A full fledged streaming service +******************************** Alright, let's get fancy. Say you wanted to spawn two actors which each pull data feeds from @@ -445,7 +453,7 @@ as ``multiprocessing`` calls it) which is running ``main()``. Cancellation ------------- +************ ``tractor`` supports ``trio``'s cancellation_ system verbatim. Cancelling a nursery block cancels all actors spawned by it. Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. @@ -454,7 +462,7 @@ Eventually ``tractor`` plans to support different `supervision strategies`_ like Remote error propagation ------------------------- +************************ Any task invoked in a remote actor should ship any error(s) back to the calling actor where it is raised and expected to be dealt with. This way remote actors are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. @@ -497,7 +505,7 @@ a ``Supervisor`` type. Actor local variables ---------------------- +********************* Although ``tractor`` uses a *shared-nothing* architecture between processes you can of course share state between tasks running *within* an actor. ``trio`` tasks spawned via multiple RPC calls to an actor can access global @@ -530,7 +538,7 @@ out a state sharing system per-actor is totally up to you. How do actors find each other (a poor man's *service discovery*)? ------------------------------------------------------------------ +***************************************************************** Though it will be built out much more in the near future, ``tractor`` currently keeps track of actors by ``(name: str, id: str)`` using a special actor called the *arbiter*. Currently the *arbiter* must exist @@ -564,7 +572,7 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as Streaming using channels and contexts -------------------------------------- +************************************* ``Channel`` is the API which wraps an underlying *transport* and *interchange* format to enable *inter-actor-communication*. In its present state ``tractor`` uses TCP and msgpack_. @@ -623,7 +631,7 @@ The context notion comes from the context_ in nanomsg_. Running actors standalone -------------------------- +************************* You don't have to spawn any actors using ``open_nursery()`` if you just want to run a single actor that connects to an existing cluster. All the comms and arbiter registration stuff still works. This can @@ -637,7 +645,7 @@ need to hop into a debugger. You just need to pass the existing Enabling logging ----------------- +**************** Considering how complicated distributed software can become it helps to know what exactly it's doing (even at the lowest levels). Luckily ``tractor`` has tons of logging throughout the core. ``tractor`` isn't opinionated on From 5e5c917081770fb5dbfdfc1f79c9f1d275d3bc28 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 11:59:44 -0500 Subject: [PATCH 20/31] Fix run_in_excutor() link --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 3e16684..dfd7466 100644 --- a/README.rst +++ b/README.rst @@ -193,7 +193,7 @@ What's going on? much like you'd expect from a future_. This ``run_in_actor()`` API should look very familiar to users of -``asyncio``'s run_in_executor_ which uses a ``concurrent.futures`` Executor_. +``asyncio``'s `run_in_executor()`_ which uses a ``concurrent.futures`` Executor_. Since you might also want to spawn long running *worker* or *daemon* actors, each actor's *lifetime* can be determined based on the spawn @@ -258,7 +258,7 @@ to all others with ease over standard network protocols). .. _nursery: https://trio.readthedocs.io/en/latest/reference-core.html#nurseries-and-spawning .. _causal: https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#causality .. _cancelled: https://trio.readthedocs.io/en/latest/reference-core.html#child-tasks-and-cancellation -.. _run_in_executor: https://docs.python.org/3/library/asyncio-eventloop.html#executor +.. _run_in_executor(): https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor From 1e18c70ad1bee8205427625d2f2ad22443a5d4c7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 12:07:58 -0500 Subject: [PATCH 21/31] Fix func name mismatch --- README.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index dfd7466..ad0f1e6 100644 --- a/README.rst +++ b/README.rst @@ -111,6 +111,7 @@ the hip new film we're shooting: donny = await n.run_in_actor( 'donny', say_hello, + # arguments are always named other_actor='gretchen', ) gretchen = await n.run_in_actor( @@ -162,7 +163,7 @@ and use the ``run_in_actor()`` method: """ async with tractor.open_nursery() as n: - portal = await n.run_in_actor('frank', movie_theatre_question) + portal = await n.run_in_actor('teacher', cellar_door) # The ``async with`` will unblock here since the 'frank' # actor has completed its main task ``movie_theatre_question()``. From 36ee6695fba758bacc91349e2198c55903d01c4a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 12:31:03 -0500 Subject: [PATCH 22/31] Add initial pubsub test --- tests/test_pubsub.py | 81 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 tests/test_pubsub.py diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py new file mode 100644 index 0000000..fef3c14 --- /dev/null +++ b/tests/test_pubsub.py @@ -0,0 +1,81 @@ +from functools import partial +from itertools import cycle + +import pytest +import trio +import tractor +from async_generator import aclosing +from tractor.testing import tractor_test + + +def is_even(i): + return i % 2 == 0 + + +@tractor.msg.pub +async def pubber(get_topics): + for i in cycle(range(10)): + topics = get_topics() + yield {'even' if is_even(i) else 'odd': i} + await trio.sleep(0.1) + + +async def subs(which): + if len(which) == 1: + if which[0] == 'even': + pred = is_even + else: + pred = lambda i: not is_even(i) + else: + pred = lambda i: isinstance(i, int) + + async with tractor.find_actor('streamer') as portal: + agen = await portal.run(__name__, 'pubber', topics=which) + async with aclosing(agen) as agen: + async for pkt in agen: + for topic, value in pkt.items(): + assert pred(value) + + +def test_pubsub_multi_actor_subs( + loglevel, + arb_addr, +): + async def main(): + async with tractor.open_nursery() as n: + # start the publisher as a daemon + master_portal = await n.start_actor( + 'streamer', + rpc_module_paths=[__name__], + ) + + even_portal = await n.run_in_actor('evens', subs, which=['even']) + odd_portal = await n.run_in_actor('odds', subs, which=['odd']) + + async with tractor.wait_for_actor('odds'): + # block until 2nd actor is initialized + pass + + # TODO: how to make this work when the arbiter gets + # a portal to itself? Currently this causes a hang + # when the channel server is torn down due to a lingering + # loopback channel + # with trio.move_on_after(1): + # await subs(['even', 'odd']) + + # XXX: this would cause infinite + # blocking due to actor never terminating loop + # await even_portal.result() + + await trio.sleep(1) + await even_portal.cancel_actor() + await trio.sleep(1) + await odd_portal.cancel_actor() + + await master_portal.cancel_actor() + + tractor.run( + main, + arbiter_addr=arb_addr, + rpc_module_paths=[__name__], + ) From 226312042a0b4f67eda616eb67f20373fbcefb91 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jan 2019 00:41:45 -0500 Subject: [PATCH 23/31] Fix type annots --- tractor/msg.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index f34d32a..9c0bd2c 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -2,7 +2,7 @@ Messaging pattern APIs and helpers. """ import typing -from typing import Dict, Any, Sequence +from typing import Dict, Any, Set, Union from functools import partial from async_generator import aclosing @@ -88,7 +88,7 @@ def modify_subs(topics2ctxs, topics, ctx): def pub( wrapped: typing.Callable = None, *, - tasks: Sequence[str] = set(), + tasks: Set[str] = set(), ): """Publisher async generator decorator. @@ -128,7 +128,7 @@ def pub( The publisher must be called passing in the following arguments: - - ``topics: Sequence[str]`` the topic sequence or "subscriptions" + - ``topics: Set[str]`` the topic sequence or "subscriptions" - ``task_name: str`` the task to use (if ``tasks`` was passed) - ``ctx: Context`` the tractor context (only needed if calling the pub func without a nursery, otherwise this is provided implicitly) @@ -171,7 +171,8 @@ def pub( if wrapped is None: return partial(pub, tasks=tasks) - task2lock = {None: trio.StrictFIFOLock()} + task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = { + None: trio.StrictFIFOLock()} for name in tasks: task2lock[name] = trio.StrictFIFOLock() From 9f41297298bf8f8c02c3237e3a174ed9cfd200d6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jan 2019 19:17:03 -0500 Subject: [PATCH 24/31] Timeout on remote task cancellation Turns out you get a bad situation if the target actor who's task you're trying to cancel has already died (eg. from an external `KeyboardInterrupt` or other error) and so we need to eventually bail on the RPC request. Also don't bother closing the channel created in `open_portal()` manually since the cancel scope should take care of all that. --- tractor/_portal.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 6a47330..2683d88 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -146,8 +146,8 @@ class Portal: log.warning( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") - with trio.open_cancel_scope() as cleanup_scope: - cleanup_scope.shield = True + with trio.move_on_after(0.5) as cs: + cs.shield = True # TODO: yeah.. it'd be nice if this was just an # async func on the far end. Gotta figure out a # better way then implicitly feeding the ctx @@ -157,6 +157,11 @@ class Portal: async with aclosing(agen) as agen: async for _ in agen: pass + if cs.cancelled_caught: + if not self.channel.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self.channel.uid}") raise # TODO: use AsyncExitStack to aclose() all agens @@ -239,13 +244,13 @@ class Portal: cancel_scope.shield = True await self.run('self', 'cancel') return True + if cancel_scope.cancelled_caught: + log.warning(f"May have failed to cancel {self.channel.uid}") + return False except trio.ClosedResourceError: log.warning( f"{self.channel} for {self.channel.uid} was already closed?") return False - else: - log.warning(f"May have failed to cancel {self.channel.uid}") - return False @dataclass @@ -308,8 +313,8 @@ async def open_portal( if was_connected: # cancel remote channel-msg loop await channel.send(None) - await channel.aclose() # cancel background msg loop task msg_loop_cs.cancel() + nursery.cancel_scope.cancel() From 855f959768dd50d3b2180135a8049886bd008844 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jan 2019 20:00:57 -0500 Subject: [PATCH 25/31] Don't log traceback on kb interrupt --- tractor/_trionics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index d9a5ee0..44f8dcd 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -317,7 +317,7 @@ class ActorNursery: # the `else:` block here might not complete? # For now, shield both. with trio.open_cancel_scope(shield=True): - if etype is trio.Cancelled: + if etype in (trio.Cancelled, KeyboardInterrupt): log.warning( f"Nursery for {current_actor().uid} was " f"cancelled with {etype}") From 3b19e1530615ff41252d3fc8527c258607e4e96a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jan 2019 20:01:29 -0500 Subject: [PATCH 26/31] Don't allow cancelling a cancel_task() task --- tractor/_actor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 3d0a4f5..ceee16b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -265,7 +265,7 @@ class Actor: log.warning( f"already have channel(s) for {uid}:{chans}?" ) - log.trace(f"Registered {chan} for {uid}") + log.trace(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -650,8 +650,10 @@ class Actor: f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") - # if func is self.cancel_task: - # return + # don't allow cancelling this function mid-execution + # (is this necessary?) + if func is self.cancel_task: + return scope.cancel() # wait for _invoke to mark the task complete From 7675b01722f686932647d2f728fdf6f7722359f8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jan 2019 20:02:51 -0500 Subject: [PATCH 27/31] Drop py3.6 since we're using @dataclass --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 8837171..6df2b7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ language: python matrix: include: - - python: 3.6 + # - python: 3.6 - python: 3.7 dist: xenial sudo: required From 2b1e8773bb4896b969e6e213dbf8fac54707ccee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jan 2019 22:35:04 -0500 Subject: [PATCH 28/31] Verify subs topics at each step --- tests/test_local.py | 1 - tests/test_pubsub.py | 82 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/tests/test_local.py b/tests/test_local.py index 2f9e5ee..eb0c676 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -7,7 +7,6 @@ import pytest import trio import tractor - from conftest import tractor_test diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index fef3c14..e5f37ef 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -1,3 +1,4 @@ +import time from functools import partial from itertools import cycle @@ -14,13 +15,18 @@ def is_even(i): @tractor.msg.pub async def pubber(get_topics): + ss = tractor.current_actor().statespace + for i in cycle(range(10)): - topics = get_topics() + + # ensure topic subscriptions are as expected + ss['get_topics'] = get_topics + yield {'even' if is_even(i) else 'odd': i} await trio.sleep(0.1) -async def subs(which): +async def subs(which, pub_actor_name): if len(which) == 1: if which[0] == 'even': pred = is_even @@ -29,7 +35,7 @@ async def subs(which): else: pred = lambda i: isinstance(i, int) - async with tractor.find_actor('streamer') as portal: + async with tractor.find_actor(pub_actor_name) as portal: agen = await portal.run(__name__, 'pubber', topics=which) async with aclosing(agen) as agen: async for pkt in agen: @@ -37,25 +43,60 @@ async def subs(which): assert pred(value) +@pytest.mark.parametrize( + 'pub_actor', + ['streamer', 'arbiter'] +) def test_pubsub_multi_actor_subs( loglevel, arb_addr, + pub_actor, ): + """Try out the neato @pub decorator system. + """ async def main(): - async with tractor.open_nursery() as n: - # start the publisher as a daemon - master_portal = await n.start_actor( - 'streamer', - rpc_module_paths=[__name__], - ) + ss = tractor.current_actor().statespace - even_portal = await n.run_in_actor('evens', subs, which=['even']) - odd_portal = await n.run_in_actor('odds', subs, which=['odd']) + async with tractor.open_nursery() as n: + + name = 'arbiter' + + if pub_actor is 'streamer': + # start the publisher as a daemon + master_portal = await n.start_actor( + 'streamer', + rpc_module_paths=[__name__], + ) + + even_portal = await n.run_in_actor( + 'evens', subs, which=['even'], pub_actor_name=name) + odd_portal = await n.run_in_actor( + 'odds', subs, which=['odd'], pub_actor_name=name) + + async with tractor.wait_for_actor('evens'): + # block until 2nd actor is initialized + pass + + if pub_actor is 'arbiter': + # wait for publisher task to be spawned in a local RPC task + while not ss.get('get_topics'): + await trio.sleep(0.1) + + get_topics = ss.get('get_topics') + + assert 'even' in get_topics() async with tractor.wait_for_actor('odds'): # block until 2nd actor is initialized pass + if pub_actor is 'arbiter': + start = time.time() + while 'odd' not in get_topics(): + await trio.sleep(0.1) + if time.time() - start > 1: + pytest.fail("odds subscription never arrived?") + # TODO: how to make this work when the arbiter gets # a portal to itself? Currently this causes a hang # when the channel server is torn down due to a lingering @@ -67,12 +108,23 @@ def test_pubsub_multi_actor_subs( # blocking due to actor never terminating loop # await even_portal.result() - await trio.sleep(1) + await trio.sleep(0.5) await even_portal.cancel_actor() - await trio.sleep(1) - await odd_portal.cancel_actor() + await trio.sleep(0.5) - await master_portal.cancel_actor() + if pub_actor is 'arbiter': + assert 'even' not in get_topics() + + await odd_portal.cancel_actor() + await trio.sleep(1) + + if pub_actor is 'arbiter': + while get_topics(): + await trio.sleep(0.1) + if time.time() - start > 1: + pytest.fail("odds subscription never dropped?") + else: + await master_portal.cancel_actor() tractor.run( main, From 1b405ab4fe4fcfb442369ae0bd6f3e74603457d6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jan 2019 22:35:59 -0500 Subject: [PATCH 29/31] s/tickers/topics --- tractor/msg.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index 9c0bd2c..53c36fd 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -68,21 +68,21 @@ def modify_subs(topics2ctxs, topics, ctx): log.info(f"{ctx.chan.uid} changed subscription to {topics}") # update map from each symbol to requesting client's chan - for ticker in topics: - topics2ctxs.setdefault(ticker, set()).add(ctx) + for topic in topics: + topics2ctxs.setdefault(topic, set()).add(ctx) # remove any existing symbol subscriptions if symbol is not # found in ``symbols`` # TODO: this can likely be factored out into the pub-sub api - for ticker in filter( + for topic in filter( lambda topic: topic not in topics, topics2ctxs.copy() ): - ctx_set = topics2ctxs.get(ticker) + ctx_set = topics2ctxs.get(topic) ctx_set.discard(ctx) if not ctx_set: # pop empty sets which will trigger bg quoter task termination - topics2ctxs.pop(ticker) + topics2ctxs.pop(topic) def pub( From 3d0de25f9368ec030e619ded372b089e129b8970 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jan 2019 00:10:13 -0500 Subject: [PATCH 30/31] Do proper `wrapt` arg extraction for type checking Use an inner function / closure to properly process required arguments at call time as is recommended in the `wrap` docs. Do async gen and arg introspection at decorate time and raise appropriate type errors. --- tractor/msg.py | 119 +++++++++++++++++++++++++++++-------------------- 1 file changed, 71 insertions(+), 48 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index 53c36fd..ebed198 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -1,8 +1,9 @@ """ Messaging pattern APIs and helpers. """ +import inspect import typing -from typing import Dict, Any, Set, Union +from typing import Dict, Any, Set, Union, Callable from functools import partial from async_generator import aclosing @@ -11,6 +12,7 @@ import wrapt from .log import get_logger from . import current_actor +from ._ipc import Context __all__ = ['pub'] @@ -181,58 +183,79 @@ def pub( @wrapt.decorator(adapter=takes_ctx) async def wrapper(agen, instance, args, kwargs): - task_name = None - if tasks: - try: - task_name = kwargs.pop('task_name') - except KeyError: + # this is used to extract arguments properly as per + # the `wrapt` docs + async def _execute( + ctx: Context, + topics: Set[str], + *args, + # *, + task_name: str = None, + packetizer: Callable = None, + **kwargs, + ): + if tasks and task_name is None: raise TypeError( - f"{agen} must be called with a `task_name` named argument " - f"with a falue from {tasks}") + f"{agen} must be called with a `task_name` named " + f"argument with a falue from {tasks}") - # pop required kwargs used internally - ctx = kwargs.pop('ctx') - topics = kwargs.pop('topics') - packetizer = kwargs.pop('packetizer', None) + ss = current_actor().statespace + lockmap = ss.setdefault('_pubtask2lock', task2lock) + lock = lockmap[task_name] - ss = current_actor().statespace - lockmap = ss.setdefault('_pubtask2lock', task2lock) - lock = lockmap[task_name] + all_subs = ss.setdefault('_subs', {}) + topics2ctxs = all_subs.setdefault(task_name, {}) - all_subs = ss.setdefault('_subs', {}) - topics2ctxs = all_subs.setdefault(task_name, {}) + try: + modify_subs(topics2ctxs, topics, ctx) + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # the next waiting task will take over and spawn it again + async with lock: + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False + log.info( + f"Spawning data feed task for {funcname}") + try: + # unblocks when no more symbols subscriptions exist + # and the streamer task terminates + await fan_out_to_ctxs( + pub_async_gen_func=partial( + agen, *args, **kwargs), + topics2ctxs=topics2ctxs, + packetizer=packetizer, + ) + log.info( + f"Terminating stream task {task_name or ''}" + f" for {agen.__name__}") + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True + finally: + # remove all subs for this context + modify_subs(topics2ctxs, (), ctx) - try: - modify_subs(topics2ctxs, topics, ctx) - # block and let existing feed task deliver - # stream data until it is cancelled in which case - # the next waiting task will take over and spawn it again - async with lock: - # no data feeder task yet; so start one - respawn = True - while respawn: - respawn = False - log.info(f"Spawning data feed task for {agen.__name__}") - try: - # unblocks when no more symbols subscriptions exist - # and the streamer task terminates - await fan_out_to_ctxs( - pub_async_gen_func=partial(agen, *args, **kwargs), - topics2ctxs=topics2ctxs, - packetizer=packetizer, - ) - log.info(f"Terminating stream task {task_name or ''}" - f" for {agen.__name__}") - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True - finally: - # remove all subs for this context - modify_subs(topics2ctxs, (), ctx) + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(topics2ctxs.values()): + log.info( + f"No more subscriptions for publisher {task_name}") - # if there are truly no more subscriptions with this broker - # drop from broker subs dict - if not any(topics2ctxs.values()): - log.info(f"No more subscriptions for publisher {task_name}") + # invoke it + await _execute(*args, **kwargs) + + + funcname = wrapped.__name__ + if not inspect.isasyncgenfunction(wrapped): + raise TypeError( + f"Publisher {funcname} must be an async generator function" + ) + if 'get_topics' not in inspect.signature(wrapped).parameters: + raise TypeError( + f"Publisher async gen {funcname} must define a " + "`get_topics` argument" + ) return wrapper(wrapped) From b0b35284f4a7d962486cbb290a9ba4b26ffe8f54 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jan 2019 00:13:13 -0500 Subject: [PATCH 31/31] Add call/decorate time type checking tests --- tests/test_pubsub.py | 60 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index e5f37ef..f0c4beb 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -1,5 +1,4 @@ import time -from functools import partial from itertools import cycle import pytest @@ -9,6 +8,23 @@ from async_generator import aclosing from tractor.testing import tractor_test +def test_type_checks(): + + with pytest.raises(TypeError) as err: + @tractor.msg.pub + async def no_get_topics(yo): + yield + + assert "must define a `get_topics`" in str(err.value) + + with pytest.raises(TypeError) as err: + @tractor.msg.pub + def not_async_gen(yo): + pass + + assert "must be an async generator function" in str(err.value) + + def is_even(i): return i % 2 == 0 @@ -31,9 +47,11 @@ async def subs(which, pub_actor_name): if which[0] == 'even': pred = is_even else: - pred = lambda i: not is_even(i) + def pred(i): + return not is_even(i) else: - pred = lambda i: isinstance(i, int) + def pred(i): + return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: agen = await portal.run(__name__, 'pubber', topics=which) @@ -43,6 +61,42 @@ async def subs(which, pub_actor_name): assert pred(value) +@tractor.msg.pub(tasks=['one', 'two']) +async def multilock_pubber(get_topics): + yield {'doggy': 10} + + +@pytest.mark.parametrize( + 'callwith_expecterror', + [ + (pubber, {}, TypeError), + # missing a `topics` + (multilock_pubber, {'ctx': None}, TypeError), + # missing a `task_name` + (multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError), + # should work + (multilock_pubber, + {'ctx': None, 'topics': ['topic1'], 'task_name': 'one'}, + None), + ], +) +@tractor_test +async def test_required_args(callwith_expecterror): + func, kwargs, err = callwith_expecterror + + if err is not None: + with pytest.raises(err): + await func(**kwargs) + else: + async with tractor.open_nursery() as n: + # await func(**kwargs) + portal = await n.run_in_actor( + 'sub', multilock_pubber, **kwargs) + + async for val in await portal.result(): + assert val == {'doggy': 10} + + @pytest.mark.parametrize( 'pub_actor', ['streamer', 'arbiter']