From 2aa6ffce60f58511979dbf22b5266a64c357c80b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 23 Mar 2019 13:50:23 -0400 Subject: [PATCH 1/8] Provide each task's cancel scope to every `Context` This begins moving toward explicitly decorated "streaming functions" instead of checking for a `ctx` arg in the signature. - provide each context with its task's top level `trio.CancelScope` such that tasks can cancel themselves explictly if needed via calling `Context.cancel_scope()` - make `Actor.cancel_task()` a private method (`_cancel_task()`) and handle remote rpc calls specially such that the caller does not need to provide the `chan` argument; non-primitive types can't be passed on the wire and we don't want the client actor be require knowledge of the channel instance the request is associated with. This also ties into how we're tracking tasks right now (`Actor._rpc_tasks` is keyed by the call id, a UUID, *plus* the channel). - make `_do_handshake` a private actor method - use UUID version 4 --- tractor/_actor.py | 73 ++++++++++++++++++++++++++++++++-------------- tractor/_ipc.py | 5 +--- tractor/_portal.py | 37 ++++++----------------- 3 files changed, 61 insertions(+), 54 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1b38d88..4fa3808 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -23,7 +23,6 @@ from ._exceptions import ( from ._portal import ( Portal, open_portal, - _do_handshake, LocalPortal, ) from . import _state @@ -50,7 +49,8 @@ async def _invoke( sig = inspect.signature(func) treat_as_gen = False cs = None - ctx = Context(chan, cid) + cancel_scope = trio.CancelScope() + ctx = Context(chan, cid, cancel_scope) if 'ctx' in sig.parameters: kwargs['ctx'] = ctx # TODO: eventually we want to be more stringent @@ -73,7 +73,7 @@ async def _invoke( not is_async_gen_partial ): await chan.send({'functype': 'function', 'cid': cid}) - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await chan.send({'return': func(**kwargs), 'cid': cid}) else: @@ -88,7 +88,7 @@ async def _invoke( # have to properly handle the closing (aclosing) # of the async gen in order to be sure the cancel # is propagated! - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) async with aclosing(coro) as agen: async for item in agen: @@ -113,7 +113,7 @@ async def _invoke( # back values like an async-generator would but must # manualy construct the response dict-packet-responses as # above - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await coro if not cs.cancelled_caught: @@ -122,7 +122,7 @@ async def _invoke( await chan.send({'stop': True, 'cid': cid}) else: await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except Exception as err: @@ -174,7 +174,7 @@ class Actor: arbiter_addr: Optional[Tuple[str, int]] = None, ) -> None: self.name = name - self.uid = (name, uid or str(uuid.uuid1())) + self.uid = (name, uid or str(uuid.uuid4())) self.rpc_module_paths = rpc_module_paths self._mods: dict = {} # TODO: consider making this a dynamically defined @@ -247,7 +247,7 @@ class Actor: # send/receive initial handshake response try: - uid = await _do_handshake(self, chan) + uid = await self._do_handshake(chan) except StopAsyncIteration: log.warning(f"Channel {chan} failed to handshake") return @@ -351,7 +351,7 @@ class Actor: caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ - cid = str(uuid.uuid1()) + cid = str(uuid.uuid4()) assert chan.uid recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") @@ -373,11 +373,12 @@ class Actor: msg = None log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - # internal scope allows for keeping this message - # loop running despite the current task having been - # cancelled (eg. `open_portal()` may call this method from - # a locally spawned task) with trio.CancelScope(shield=shield) as cs: + # this internal scope allows for keeping this message + # loop running despite the current task having been + # cancelled (eg. `open_portal()` may call this method from + # a locally spawned task) and recieve this scope using + # ``scope = Nursery.start()`` task_status.started(cs) async for msg in chan: if msg is None: # loop terminate sentinel @@ -385,7 +386,7 @@ class Actor: f"Cancelling all tasks for {chan} from {chan.uid}") for (channel, cid) in self._rpc_tasks: if channel is chan: - self.cancel_task(cid, Context(channel, cid)) + self._cancel_task(cid, channel) log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") @@ -419,6 +420,16 @@ class Actor: f"{ns}.{funcname}({kwargs})") if ns == 'self': func = getattr(self, funcname) + if funcname == '_cancel_task': + # XXX: a special case is made here for + # remote calls since we don't want the + # remote actor have to know which channel + # the task is associated with and we can't + # pass non-primitive types between actors. + # This means you can use: + # Portal.run('self', '_cancel_task, cid=did) + # without passing the `chan` arg. + kwargs['chan'] = chan else: # complain to client about restricted modules try: @@ -537,7 +548,7 @@ class Actor: ) await chan.connect() # initial handshake, report who we are, who they are - await _do_handshake(self, chan) + await self._do_handshake(chan) except OSError: # failed to connect log.warning( f"Failed to connect to parent @ {parent_addr}," @@ -661,21 +672,20 @@ class Actor: self.cancel_server() self._root_nursery.cancel_scope.cancel() - async def cancel_task(self, cid, ctx): - """Cancel a local task. + async def _cancel_task(self, cid, chan): + """Cancel a local task by call-id / channel. - Note this method will be treated as a streaming funciton + Note this method will be treated as a streaming function by remote actor-callers due to the declaration of ``ctx`` in the signature (for now). """ # right now this is only implicitly called by # streaming IPC but it should be called # to cancel any remotely spawned task - chan = ctx.chan try: # this ctx based lookup ensures the requested task to # be cancelled was indeed spawned by a request from this channel - scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + scope, func, is_complete = self._rpc_tasks[(chan, cid)] except KeyError: log.warning(f"{cid} has already completed/terminated?") return @@ -686,7 +696,7 @@ class Actor: # don't allow cancelling this function mid-execution # (is this necessary?) - if func is self.cancel_task: + if func is self._cancel_task: return scope.cancel() @@ -704,7 +714,7 @@ class Actor: log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for (chan, cid) in tasks.copy(): # TODO: this should really done in a nursery batch - await self.cancel_task(cid, Context(chan, cid)) + await self._cancel_task(cid, chan) # if tasks: log.info( f"Waiting for remaining rpc tasks to complete {tasks}") @@ -735,6 +745,25 @@ class Actor: """Return all channels to the actor with provided uid.""" return self._peers[uid] + async def _do_handshake( + self, + chan: Channel + ) -> Tuple[str, str]: + """Exchange (name, UUIDs) identifiers as the first communication step. + + These are essentially the "mailbox addresses" found in actor model + parlance. + """ + await chan.send(self.uid) + uid: Tuple[str, str] = await chan.recv() + + if not isinstance(uid, tuple): + raise ValueError(f"{uid} is not a valid uid?!") + + chan.uid = uid + log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + return uid + class Arbiter(Actor): """A special actor who knows all the other actors and always has diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 1cddeca..5acb79a 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -215,10 +215,7 @@ class Context: """ chan: Channel cid: str - - # TODO: we should probably attach the actor-task - # cancel scope here now that trio is exposing it - # as a public object + cancel_scope: trio.CancelScope async def send_yield(self, data: Any) -> None: await self.chan.send({'yield': data, 'cid': self.cid}) diff --git a/tractor/_portal.py b/tractor/_portal.py index 160db19..925e3ad 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -33,21 +33,6 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): yield nursery -async def _do_handshake( - actor: 'Actor', # type: ignore - chan: Channel -) -> Any: - await chan.send(actor.uid) - uid: Tuple[str, str] = await chan.recv() - - if not isinstance(uid, tuple): - raise ValueError(f"{uid} is not a valid uid?!") - - chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") - return uid - - class StreamReceiveChannel(trio.abc.ReceiveChannel): """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with special behaviour for signalling stream termination across an @@ -95,8 +80,8 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): raise unpack_error(msg, self._portal.channel) async def aclose(self): - """Cancel associate remote actor task on close - as well as the local memory channel. + """Cancel associated remote actor task and local memory channel + on close. """ if self._rx_chan._closed: log.warning(f"{self} is already closed") @@ -107,15 +92,10 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): log.warning( f"Cancelling stream {cid} to " f"{self._portal.channel.uid}") - # TODO: yeah.. it'd be nice if this was just an - # async func on the far end. Gotta figure out a - # better way then implicitly feeding the ctx - # to declaring functions; likely a decorator - # system. - rchan = await self._portal.run( - 'self', 'cancel_task', cid=cid) - async for _ in rchan: - pass + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run('self', '_cancel_task', cid=cid) if cs.cancelled_caught: # XXX: there's no way to know if the remote task was indeed @@ -153,6 +133,7 @@ class Portal: Tuple[str, Any, str, Dict[str, Any]] ] = None self._streams: Set[StreamReceiveChannel] = set() + self.actor = current_actor() async def _submit( self, @@ -167,7 +148,7 @@ class Portal: This is an async call. """ # ship a function call request to the remote actor - cid, recv_chan = await current_actor().send_cmd( + cid, recv_chan = await self.actor.send_cmd( self.channel, ns, func, kwargs) # wait on first response msg and handle (this should be @@ -345,7 +326,7 @@ async def open_portal( was_connected = True if channel.uid is None: - await _do_handshake(actor, channel) + await actor._do_handshake(channel) msg_loop_cs = await nursery.start( partial( From 4ee35038fb3eedba22f2da49f03f50124a4945f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Mar 2019 11:37:11 -0400 Subject: [PATCH 2/8] Move discovery functions to their own module --- tractor/__init__.py | 5 ++- tractor/_actor.py | 77 +++---------------------------------------- tractor/_discovery.py | 77 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 75 deletions(-) create mode 100644 tractor/_discovery.py diff --git a/tractor/__init__.py b/tractor/__init__.py index 2f6e7d7..d14717b 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -12,9 +12,8 @@ from trio import MultiError from . import log from ._ipc import _connect_chan, Channel, Context -from ._actor import ( - Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor -) +from ._discovery import get_arbiter, find_actor, wait_for_actor +from ._actor import Actor, _start_actor, Arbiter from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed diff --git a/tractor/_actor.py b/tractor/_actor.py index 4fa3808..496c68a 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -8,25 +8,21 @@ import importlib import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional, Union +from typing import Dict, List, Tuple, Any, Optional import trio # type: ignore -from async_generator import asynccontextmanager, aclosing +from async_generator import aclosing -from ._ipc import Channel, _connect_chan, Context +from ._ipc import Channel, Context from .log import get_console_log, get_logger from ._exceptions import ( pack_error, unpack_error, ModuleNotExposed ) -from ._portal import ( - Portal, - open_portal, - LocalPortal, -) +from ._discovery import get_arbiter +from ._portal import Portal from . import _state -from ._state import current_actor log = get_logger('tractor') @@ -869,66 +865,3 @@ async def _start_actor( log.info("Completed async main") return result - - -@asynccontextmanager -async def get_arbiter( - host: str, port: int -) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: - """Return a portal instance connected to a local or remote - arbiter. - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - if actor.is_arbiter: - # we're already the arbiter - # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor) - else: - async with _connect_chan(host, port) as chan: - async with open_portal(chan) as arb_portal: - yield arb_portal - - -@asynccontextmanager -async def find_actor( - name: str, arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Optional[Portal], None]: - """Ask the arbiter to find actor(s) by name. - - Returns a connected portal to the last registered matching actor - known to the arbiter. - """ - actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddr = await arb_portal.run('self', 'find_actor', name=name) - # TODO: return portals to all available actors - for now just - # the last one that registered - if name == 'arbiter' and actor.is_arbiter: - raise RuntimeError("The current actor is the arbiter") - elif sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None - - -@asynccontextmanager -async def wait_for_actor( - name: str, - arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Portal, None]: - """Wait on an actor to register with the arbiter. - - A portal to the first registered actor is returned. - """ - actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) - sockaddr = sockaddrs[-1] - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal diff --git a/tractor/_discovery.py b/tractor/_discovery.py new file mode 100644 index 0000000..ae2e3be --- /dev/null +++ b/tractor/_discovery.py @@ -0,0 +1,77 @@ +""" +Actor discovery API. +""" +import typing +from typing import Tuple, Optional, Union +from async_generator import asynccontextmanager + +from ._ipc import _connect_chan +from ._portal import ( + Portal, + open_portal, + LocalPortal, +) +from ._state import current_actor + + +@asynccontextmanager +async def get_arbiter( + host: str, port: int +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: + """Return a portal instance connected to a local or remote + arbiter. + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + if actor.is_arbiter: + # we're already the arbiter + # (likely a re-entrant call from the arbiter actor) + yield LocalPortal(actor) + else: + async with _connect_chan(host, port) as chan: + async with open_portal(chan) as arb_portal: + yield arb_portal + + +@asynccontextmanager +async def find_actor( + name: str, arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Optional[Portal], None]: + """Ask the arbiter to find actor(s) by name. + + Returns a connected portal to the last registered matching actor + known to the arbiter. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddr = await arb_portal.run('self', 'find_actor', name=name) + # TODO: return portals to all available actors - for now just + # the last one that registered + if name == 'arbiter' and actor.is_arbiter: + raise RuntimeError("The current actor is the arbiter") + elif sockaddr: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield None + + +@asynccontextmanager +async def wait_for_actor( + name: str, + arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Portal, None]: + """Wait on an actor to register with the arbiter. + + A portal to the first registered actor is returned. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal From 2f773fc8839a7bfe1dbc42b61368ff5430558607 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Mar 2019 14:55:13 -0400 Subject: [PATCH 3/8] Reorg streaming section --- README.rst | 269 +++++++++++++++++++++++++++++------------------------ 1 file changed, 145 insertions(+), 124 deletions(-) diff --git a/README.rst b/README.rst index f8bf9d3..4f7a057 100644 --- a/README.rst +++ b/README.rst @@ -280,8 +280,60 @@ to all others with ease over standard network protocols). .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor -Async IPC using *portals* -************************* +Cancellation +************ +``tractor`` supports ``trio``'s cancellation_ system verbatim. +Cancelling a nursery block cancels all actors spawned by it. +Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. + +.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags + + +Remote error propagation +************************ +Any task invoked in a remote actor should ship any error(s) back to the calling +actor where it is raised and expected to be dealt with. This way remote actors +are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. + +.. code:: python + + async def assert_err(): + assert 0 + + + async def main(): + async with tractor.open_nursery() as n: + real_actors = [] + for i in range(3): + real_actors.append(await n.start_actor( + f'actor_{i}', + rpc_module_paths=[__name__], + )) + + # start one actor that will fail immediately + await n.run_in_actor('extra', assert_err) + + # should error here with a ``RemoteActorError`` containing + # an ``AssertionError`` and all the other actors have been cancelled + + try: + # also raises + tractor.run(main) + except tractor.RemoteActorError: + print("Look Maa that actor failed hard, hehhh!") + + +You'll notice the nursery cancellation conducts a *one-cancels-all* +supervisory strategy `exactly like trio`_. The plan is to add more +`erlang strategies`_ in the near future by allowing nurseries to accept +a ``Supervisor`` type. + +.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics +.. _erlang strategies: http://learnyousomeerlang.com/supervisors + + +IPC using *portals* +******************* ``tractor`` introduces the concept of a *portal* which is an API borrowed_ from ``trio``. A portal may seem similar to the idea of a RPC future_ except a *portal* allows invoking remote *async* functions and @@ -305,10 +357,26 @@ channels_ system or shipping code over the network. This *portal* approach turns out to be paricularly exciting with the introduction of `asynchronous generators`_ in Python 3.6! It means that -actors can compose nicely in a data processing pipeline. +actors can compose nicely in a data streaming pipeline. -As an example here's an actor that streams for 1 second from a remote async -generator function running in a separate actor: + +Streaming +********* +By now you've figured out that ``tractor`` lets you spawn +process based actors that can invoke cross-process async functions +between each other and all with structured concurrency built in, but, +the **real power** is the ability to accomplish cross-process *streaming*. + + +Asynchronous generators ++++++++++++++++++++++++ +The default streaming function is simply an async generator definition. +Every value *yielded* from the generator is delivered to the calling +portal exactly like if you had invoked the function in-process meaning +you can ``async for`` to receive each value on the calling side. + +As an example here's a parent actor that streams for 1 second from a +spawned subactor: .. code:: python @@ -346,10 +414,79 @@ generator function running in a separate actor: tractor.run(main) +By default async generator functions are treated as inter-actor +*streams* when invoked via a portal (how else could you really interface +with them anyway) so no special syntax to denote the streaming *service* +is necessary. + + +Channels and Contexts ++++++++++++++++++++++ +If you aren't fond of having to write an async generator to stream data +between actors (or need something more flexible) you can instead use a +``Context``. A context wraps an actor-local spawned task and a ``Channel`` +so that tasks executing across multiple processes can stream data +to one another using a low level, request oriented API. + +``Channel`` is the API which wraps an underlying *transport* and *interchange* +format to enable *inter-actor-communication*. In its present state ``tractor`` +uses TCP and msgpack_. + +As an example if you wanted to create a streaming server without writing +an async generator that *yields* values you instead define an async +function: + +.. code:: python + + async def streamer(ctx: tractor.Context, rate: int = 2) -> None: + """A simple web response streaming server. + """ + while True: + val = await web_request('http://data.feed.com') + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) + + +All that's required is declaring a ``ctx`` argument name somewhere in +your function signature and ``tractor`` will treat the async function +like an async generator - as a streaming function from the client side. +This turns out to be handy particularly if you have +multiple tasks streaming responses concurrently: + +.. code:: python + + async def streamer(ctx: tractor.Context, rate: int = 2) -> None: + """A simple web response streaming server. + """ + while True: + val = await web_request(url) + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) + + + async def stream_multiple_sources( + ctx: tractor.Context, sources: List[str] + ) -> None: + async with trio.open_nursery() as n: + for url in sources: + n.start_soon(streamer, ctx, url) + + +The context notion comes from the context_ in nanomsg_. + +.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 +.. _msgpack: https://en.wikipedia.org/wiki/MessagePack + A full fledged streaming service -******************************** +++++++++++++++++++++++++++++++++ Alright, let's get fancy. Say you wanted to spawn two actors which each pull data feeds from @@ -471,58 +608,6 @@ as ``multiprocessing`` calls it) which is running ``main()``. .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i -Cancellation -************ -``tractor`` supports ``trio``'s cancellation_ system verbatim. -Cancelling a nursery block cancels all actors spawned by it. -Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. - -.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags - - -Remote error propagation -************************ -Any task invoked in a remote actor should ship any error(s) back to the calling -actor where it is raised and expected to be dealt with. This way remote actors -are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. - -.. code:: python - - async def assert_err(): - assert 0 - - - async def main(): - async with tractor.open_nursery() as n: - real_actors = [] - for i in range(3): - real_actors.append(await n.start_actor( - f'actor_{i}', - rpc_module_paths=[__name__], - )) - - # start one actor that will fail immediately - await n.run_in_actor('extra', assert_err) - - # should error here with a ``RemoteActorError`` containing - # an ``AssertionError`` and all the other actors have been cancelled - - try: - # also raises - tractor.run(main) - except tractor.RemoteActorError: - print("Look Maa that actor failed hard, hehhh!") - - -You'll notice the nursery cancellation conducts a *one-cancels-all* -supervisory strategy `exactly like trio`_. The plan is to add more -`erlang strategies`_ in the near future by allowing nurseries to accept -a ``Supervisor`` type. - -.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics -.. _erlang strategies: http://learnyousomeerlang.com/supervisors - - Actor local variables ********************* Although ``tractor`` uses a *shared-nothing* architecture between processes @@ -556,8 +641,8 @@ a convenience for passing simple data to newly spawned actors); building out a state sharing system per-actor is totally up to you. -How do actors find each other (a poor man's *service discovery*)? -***************************************************************** +Service Discovery +***************** Though it will be built out much more in the near future, ``tractor`` currently keeps track of actors by ``(name: str, id: str)`` using a special actor called the *arbiter*. Currently the *arbiter* must exist @@ -590,70 +675,6 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. -Streaming using channels and contexts -************************************* -``Channel`` is the API which wraps an underlying *transport* and *interchange* -format to enable *inter-actor-communication*. In its present state ``tractor`` -uses TCP and msgpack_. - -If you aren't fond of having to write an async generator to stream data -between actors (or need something more flexible) you can instead use a -``Context``. A context wraps an actor-local spawned task and a ``Channel`` -so that tasks executing across multiple processes can stream data -to one another using a low level, request oriented API. - -As an example if you wanted to create a streaming server without writing -an async generator that *yields* values you instead define an async -function: - -.. code:: python - - async def streamer(ctx: tractor.Context, rate: int = 2) -> None: - """A simple web response streaming server. - """ - while True: - val = await web_request('http://data.feed.com') - - # this is the same as ``yield`` in the async gen case - await ctx.send_yield(val) - - await trio.sleep(1 / rate) - - -All that's required is declaring a ``ctx`` argument name somewhere in -your function signature and ``tractor`` will treat the async function -like an async generator - as a streaming function from the client side. -This turns out to be handy particularly if you have -multiple tasks streaming responses concurrently: - -.. code:: python - - async def streamer(ctx: tractor.Context, rate: int = 2) -> None: - """A simple web response streaming server. - """ - while True: - val = await web_request(url) - - # this is the same as ``yield`` in the async gen case - await ctx.send_yield(val) - - await trio.sleep(1 / rate) - - - async def stream_multiple_sources( - ctx: tractor.Context, sources: List[str] - ) -> None: - async with trio.open_nursery() as n: - for url in sources: - n.start_soon(streamer, ctx, url) - - -The context notion comes from the context_ in nanomsg_. - -.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 -.. _msgpack: https://en.wikipedia.org/wiki/MessagePack - - Running actors standalone ************************* You don't have to spawn any actors using ``open_nursery()`` if you just From e51f84af9004c1a5bbd550ef47179f17844b79a2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 25 Mar 2019 21:36:13 -0400 Subject: [PATCH 4/8] Require explicit marking of non async gen streaming funcs Add `@tractor.stream` which must be used to denote non async generator streaming functions which use the `tractor.Context` API to push values. This enforces a more explicit denotation as well as allows enforcing the declaration of the `ctx` argument in definitions. --- tractor/__init__.py | 4 +++- tractor/_actor.py | 11 +++++++++-- tractor/_ipc.py | 20 -------------------- tractor/_streaming.py | 43 +++++++++++++++++++++++++++++++++++++++++++ tractor/msg.py | 4 ++-- 5 files changed, 57 insertions(+), 25 deletions(-) create mode 100644 tractor/_streaming.py diff --git a/tractor/__init__.py b/tractor/__init__.py index d14717b..fb9b095 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -11,7 +11,8 @@ import trio # type: ignore from trio import MultiError from . import log -from ._ipc import _connect_chan, Channel, Context +from ._ipc import _connect_chan, Channel +from ._streaming import Context, stream from ._discovery import get_arbiter, find_actor, wait_for_actor from ._actor import Actor, _start_actor, Arbiter from ._trionics import open_nursery @@ -29,6 +30,7 @@ __all__ = [ 'wait_for_actor', 'Channel', 'Context', + 'stream', 'MultiError', 'RemoteActorError', 'ModuleNotExposed', diff --git a/tractor/_actor.py b/tractor/_actor.py index 496c68a..52b301f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -13,7 +13,8 @@ from typing import Dict, List, Tuple, Any, Optional import trio # type: ignore from async_generator import aclosing -from ._ipc import Channel, Context +from ._ipc import Channel +from ._streaming import Context, _context from .log import get_console_log, get_logger from ._exceptions import ( pack_error, @@ -47,7 +48,13 @@ async def _invoke( cs = None cancel_scope = trio.CancelScope() ctx = Context(chan, cid, cancel_scope) - if 'ctx' in sig.parameters: + _context.set(ctx) + if getattr(func, '_tractor_stream_function', False): + if 'ctx' not in sig.parameters: + raise TypeError( + "The first argument to the stream function " + f"{func.__name__} must be `ctx: tractor.Context`" + ) kwargs['ctx'] = ctx # TODO: eventually we want to be more stringent # about what is considered a far-end async-generator. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5acb79a..94f978f 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,7 +1,6 @@ """ Inter-process comms abstractions """ -from dataclasses import dataclass import typing from typing import Any, Tuple, Optional @@ -205,25 +204,6 @@ class Channel: return self.msgstream.connected() if self.msgstream else False -@dataclass(frozen=True) -class Context: - """An IAC (inter-actor communication) context. - - Allows maintaining task or protocol specific state between communicating - actors. A unique context is created on the receiving end for every request - to a remote actor. - """ - chan: Channel - cid: str - cancel_scope: trio.CancelScope - - async def send_yield(self, data: Any) -> None: - await self.chan.send({'yield': data, 'cid': self.cid}) - - async def send_stop(self) -> None: - await self.chan.send({'stop': True, 'cid': self.cid}) - - @asynccontextmanager async def _connect_chan( host: str, port: int diff --git a/tractor/_streaming.py b/tractor/_streaming.py new file mode 100644 index 0000000..0ccf5fc --- /dev/null +++ b/tractor/_streaming.py @@ -0,0 +1,43 @@ +import contextvars +from dataclasses import dataclass +from typing import Any + +import trio + +from ._ipc import Channel + + +_context = contextvars.ContextVar('context') + + +@dataclass(frozen=True) +class Context: + """An IAC (inter-actor communication) context. + + Allows maintaining task or protocol specific state between communicating + actors. A unique context is created on the receiving end for every request + to a remote actor. + """ + chan: Channel + cid: str + cancel_scope: trio.CancelScope + + async def send_yield(self, data: Any) -> None: + await self.chan.send({'yield': data, 'cid': self.cid}) + + async def send_stop(self) -> None: + await self.chan.send({'stop': True, 'cid': self.cid}) + + +def current_context(): + """Get the current streaming task's context instance. + + """ + return _context.get() + + +def stream(func): + """Mark an async function as a streaming routine. + """ + func._tractor_stream_function = True + return func diff --git a/tractor/msg.py b/tractor/msg.py index 73e39d5..59842ef 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -12,7 +12,7 @@ import wrapt from .log import get_logger from . import current_actor -from ._ipc import Context +from ._streaming import Context, stream __all__ = ['pub'] @@ -261,4 +261,4 @@ def pub( "`get_topics` argument" ) - return wrapper(wrapped) + return wrapper(stream(wrapped)) From 096d211ed29dc006a23826862d1425a78fd46c7d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 25 Mar 2019 22:02:22 -0400 Subject: [PATCH 5/8] Document `@tractor.stream` --- README.rst | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/README.rst b/README.rst index 4f7a057..b3a5181 100644 --- a/README.rst +++ b/README.rst @@ -362,10 +362,10 @@ actors can compose nicely in a data streaming pipeline. Streaming ********* -By now you've figured out that ``tractor`` lets you spawn -process based actors that can invoke cross-process async functions -between each other and all with structured concurrency built in, but, -the **real power** is the ability to accomplish cross-process *streaming*. +By now you've figured out that ``tractor`` lets you spawn process based +*actors* that can invoke cross-process (async) functions and all with +structured concurrency built in. But the **real cool stuff** is the +native support for cross-process *streaming*. Asynchronous generators @@ -423,21 +423,22 @@ is necessary. Channels and Contexts +++++++++++++++++++++ If you aren't fond of having to write an async generator to stream data -between actors (or need something more flexible) you can instead use a -``Context``. A context wraps an actor-local spawned task and a ``Channel`` -so that tasks executing across multiple processes can stream data -to one another using a low level, request oriented API. +between actors (or need something more flexible) you can instead use +a ``Context``. A context wraps an actor-local spawned task and +a ``Channel`` so that tasks executing across multiple processes can +stream data to one another using a low level, request oriented API. -``Channel`` is the API which wraps an underlying *transport* and *interchange* -format to enable *inter-actor-communication*. In its present state ``tractor`` +A ``Channel`` wraps an underlying *transport* and *interchange* format +to enable *inter-actor-communication*. In its present state ``tractor`` uses TCP and msgpack_. As an example if you wanted to create a streaming server without writing -an async generator that *yields* values you instead define an async -function: +an async generator that *yields* values you instead define a decorated +async function: .. code:: python + @tractor.stream async def streamer(ctx: tractor.Context, rate: int = 2) -> None: """A simple web response streaming server. """ @@ -450,15 +451,20 @@ function: await trio.sleep(1 / rate) -All that's required is declaring a ``ctx`` argument name somewhere in -your function signature and ``tractor`` will treat the async function -like an async generator - as a streaming function from the client side. -This turns out to be handy particularly if you have -multiple tasks streaming responses concurrently: +You must decorate the function with ``@tractor.stream`` and declare +a ``ctx`` argument as the first in your function signature and then +``tractor`` will treat the async function like an async generator - as +a stream from the calling/client side. + +This turns out to be handy particularly if you have multiple tasks +pushing responses concurrently: .. code:: python - async def streamer(ctx: tractor.Context, rate: int = 2) -> None: + async def streamer( + ctx: tractor.Context, + rate: int = 2 + ) -> None: """A simple web response streaming server. """ while True: @@ -470,8 +476,10 @@ multiple tasks streaming responses concurrently: await trio.sleep(1 / rate) + @tractor.stream async def stream_multiple_sources( - ctx: tractor.Context, sources: List[str] + ctx: tractor.Context, + sources: List[str] ) -> None: async with trio.open_nursery() as n: for url in sources: From 5c0ae47cf5adc86a969097a09161e7ad569e04fe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 25 Mar 2019 22:16:40 -0400 Subject: [PATCH 6/8] Fix type annotation --- tractor/_streaming.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 0ccf5fc..df7a783 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -1,4 +1,4 @@ -import contextvars +from contextvars import ContextVar from dataclasses import dataclass from typing import Any @@ -7,7 +7,7 @@ import trio from ._ipc import Channel -_context = contextvars.ContextVar('context') +_context: ContextVar['Context'] = ContextVar('context') @dataclass(frozen=True) @@ -30,8 +30,7 @@ class Context: def current_context(): - """Get the current streaming task's context instance. - + """Get the current task's context instance. """ return _context.get() From f885b02c730fa6e3aff5ef7ce462fe719d30cff9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 29 Mar 2019 19:10:32 -0400 Subject: [PATCH 7/8] Validate stream functions at decorate time --- tractor/_actor.py | 14 ++------- tractor/_streaming.py | 7 +++++ tractor/msg.py | 67 ++++++++++++++++++++++--------------------- 3 files changed, 44 insertions(+), 44 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 52b301f..3f2a89c 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -41,26 +41,16 @@ async def _invoke( kwargs: Dict[str, Any], task_status=trio.TASK_STATUS_IGNORED ): - """Invoke local func and return results over provided channel. + """Invoke local func and deliver result(s) over provided channel. """ - sig = inspect.signature(func) treat_as_gen = False cs = None cancel_scope = trio.CancelScope() ctx = Context(chan, cid, cancel_scope) _context.set(ctx) if getattr(func, '_tractor_stream_function', False): - if 'ctx' not in sig.parameters: - raise TypeError( - "The first argument to the stream function " - f"{func.__name__} must be `ctx: tractor.Context`" - ) + # handle decorated ``@tractor.stream`` async functions kwargs['ctx'] = ctx - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `ctx` kwarg in its - # signature will be treated as one. treat_as_gen = True try: is_async_partial = False diff --git a/tractor/_streaming.py b/tractor/_streaming.py index df7a783..9ed0f14 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -1,3 +1,4 @@ +import inspect from contextvars import ContextVar from dataclasses import dataclass from typing import Any @@ -39,4 +40,10 @@ def stream(func): """Mark an async function as a streaming routine. """ func._tractor_stream_function = True + sig = inspect.signature(func) + if 'ctx' not in sig.parameters: + raise TypeError( + "The first argument to the stream function " + f"{func.__name__} must be `ctx: tractor.Context`" + ) return func diff --git a/tractor/msg.py b/tractor/msg.py index 59842ef..cd064a0 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -12,7 +12,7 @@ import wrapt from .log import get_logger from . import current_actor -from ._streaming import Context, stream +from ._streaming import Context __all__ = ['pub'] @@ -97,29 +97,32 @@ def pub( ): """Publisher async generator decorator. - A publisher can be called multiple times from different actors - but will only spawn a finite set of internal tasks to stream values - to each caller. The ``tasks` argument to the decorator (``Set[str]``) - specifies the names of the mutex set of publisher tasks. - When the publisher function is called, an argument ``task_name`` must be - passed to specify which task (of the set named in ``tasks``) should be - used. This allows for using the same publisher with different input - (arguments) without allowing more concurrent tasks then necessary. + A publisher can be called multiple times from different actors but + will only spawn a finite set of internal tasks to stream values to + each caller. The ``tasks: Set[str]`` argument to the decorator + specifies the names of the mutex set of publisher tasks. When the + publisher function is called, an argument ``task_name`` must be + passed to specify which task (of the set named in ``tasks``) should + be used. This allows for using the same publisher with different + input (arguments) without allowing more concurrent tasks then + necessary. - Values yielded from the decorated async generator - must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the - topic string an determines which subscription the packet will be delivered - to and the value is a packet ``Dict[str, Any]`` by default of the form: + Values yielded from the decorated async generator must be + ``Dict[str, Dict[str, Any]]`` where the fist level key is the topic + string and determines which subscription the packet will be + delivered to and the value is a packet ``Dict[str, Any]`` by default + of the form: .. ::python - {topic: value} + {topic: str: value: Any} - The caller can instead opt to pass a ``packetizer`` callback who's return - value will be delivered as the published response. + The caller can instead opt to pass a ``packetizer`` callback who's + return value will be delivered as the published response. - The decorated function must *accept* an argument :func:`get_topics` which - dynamically returns the tuple of current subscriber topics: + The decorated async generator function must accept an argument + :func:`get_topics` which dynamically returns the tuple of current + subscriber topics: .. code:: python @@ -162,15 +165,15 @@ def pub( print(f"Subscriber received {value}") - Here, you don't need to provide the ``ctx`` argument since the remote actor - provides it automatically to the spawned task. If you were to call - ``pub_service()`` directly from a wrapping function you would need to - provide this explicitly. + Here, you don't need to provide the ``ctx`` argument since the + remote actor provides it automatically to the spawned task. If you + were to call ``pub_service()`` directly from a wrapping function you + would need to provide this explicitly. - Remember you only need this if you need *a finite set of tasks* running in - a single actor to stream data to an arbitrary number of subscribers. If you - are ok to have a new task running for every call to ``pub_service()`` then - probably don't need this. + Remember you only need this if you need *a finite set of tasks* + running in a single actor to stream data to an arbitrary number of + subscribers. If you are ok to have a new task running for every call + to ``pub_service()`` then probably don't need this. """ # handle the decorator not called with () case if wrapped is None: @@ -181,10 +184,7 @@ def pub( for name in tasks: task2lock[name] = trio.StrictFIFOLock() - async def takes_ctx(get_topics, ctx=None): - pass - - @wrapt.decorator(adapter=takes_ctx) + @wrapt.decorator async def wrapper(agen, instance, args, kwargs): # this is used to extract arguments properly as per # the `wrapt` docs @@ -249,7 +249,6 @@ def pub( # invoke it await _execute(*args, **kwargs) - funcname = wrapped.__name__ if not inspect.isasyncgenfunction(wrapped): raise TypeError( @@ -261,4 +260,8 @@ def pub( "`get_topics` argument" ) - return wrapper(stream(wrapped)) + # XXX: manually monkey the wrapped function since + # ``wrapt.decorator`` doesn't seem to want to play nice with its + # whole "adapter" thing... + wrapped._tractor_stream_function = True # type: ignore + return wrapper(wrapped) From b965d20cba464b468d6228a6d11053b9bb427d2b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 29 Mar 2019 19:10:56 -0400 Subject: [PATCH 8/8] Add stream func tests --- tests/test_streaming.py | 55 +++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 1ada466..4fc7e37 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -2,13 +2,29 @@ Streaming via async gen api """ import time +from functools import partial import trio import tractor import pytest -async def stream_seq(sequence): + +def test_must_define_ctx(): + + with pytest.raises(TypeError) as err: + @tractor.stream + async def no_ctx(): + pass + + assert "no_ctx must be `ctx: tractor.Context" in str(err.value) + + @tractor.stream + async def no_ctx(ctx): + pass + + +async def async_gen_stream(sequence): for i in sequence: yield i await trio.sleep(0.1) @@ -20,10 +36,23 @@ async def stream_seq(sequence): assert cs.cancelled_caught -async def stream_from_single_subactor(): +@tractor.stream +async def context_stream(ctx, sequence): + for i in sequence: + await ctx.send_yield(i) + await trio.sleep(0.1) + + # block indefinitely waiting to be cancelled by ``aclose()`` call + with trio.CancelScope() as cs: + await trio.sleep(float('inf')) + assert 0 + assert cs.cancelled_caught + + +async def stream_from_single_subactor(stream_func_name): """Verify we can spawn a daemon actor and retrieve streamed data. """ - async with tractor.find_actor('brokerd') as portals: + async with tractor.find_actor('streamerd') as portals: if not portals: # only one per host address, spawns an actor if None async with tractor.open_nursery() as nursery: @@ -36,37 +65,43 @@ async def stream_from_single_subactor(): seq = range(10) - agen = await portal.run( + stream = await portal.run( __name__, - 'stream_seq', # the func above + stream_func_name, # one of the funcs above sequence=list(seq), # has to be msgpack serializable ) # it'd sure be nice to have an asyncitertools here... iseq = iter(seq) ival = next(iseq) - async for val in agen: + async for val in stream: assert val == ival try: ival = next(iseq) except StopIteration: # should cancel far end task which will be # caught and no error is raised - await agen.aclose() + await stream.aclose() await trio.sleep(0.3) try: - await agen.__anext__() + await stream.__anext__() except StopAsyncIteration: # stop all spawned subactors await portal.cancel_actor() # await nursery.cancel() -def test_stream_from_single_subactor(arb_addr, start_method): +@pytest.mark.parametrize( + 'stream_func', ['async_gen_stream', 'context_stream'] +) +def test_stream_from_single_subactor(arb_addr, start_method, stream_func): """Verify streaming from a spawned async generator. """ tractor.run( - stream_from_single_subactor, + partial( + stream_from_single_subactor, + stream_func_name=stream_func, + ), arbiter_addr=arb_addr, start_method=start_method, )