diff --git a/README.rst b/README.rst index f8bf9d3..b3a5181 100644 --- a/README.rst +++ b/README.rst @@ -280,8 +280,60 @@ to all others with ease over standard network protocols). .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor -Async IPC using *portals* -************************* +Cancellation +************ +``tractor`` supports ``trio``'s cancellation_ system verbatim. +Cancelling a nursery block cancels all actors spawned by it. +Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. + +.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags + + +Remote error propagation +************************ +Any task invoked in a remote actor should ship any error(s) back to the calling +actor where it is raised and expected to be dealt with. This way remote actors +are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. + +.. code:: python + + async def assert_err(): + assert 0 + + + async def main(): + async with tractor.open_nursery() as n: + real_actors = [] + for i in range(3): + real_actors.append(await n.start_actor( + f'actor_{i}', + rpc_module_paths=[__name__], + )) + + # start one actor that will fail immediately + await n.run_in_actor('extra', assert_err) + + # should error here with a ``RemoteActorError`` containing + # an ``AssertionError`` and all the other actors have been cancelled + + try: + # also raises + tractor.run(main) + except tractor.RemoteActorError: + print("Look Maa that actor failed hard, hehhh!") + + +You'll notice the nursery cancellation conducts a *one-cancels-all* +supervisory strategy `exactly like trio`_. The plan is to add more +`erlang strategies`_ in the near future by allowing nurseries to accept +a ``Supervisor`` type. + +.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics +.. _erlang strategies: http://learnyousomeerlang.com/supervisors + + +IPC using *portals* +******************* ``tractor`` introduces the concept of a *portal* which is an API borrowed_ from ``trio``. A portal may seem similar to the idea of a RPC future_ except a *portal* allows invoking remote *async* functions and @@ -305,10 +357,26 @@ channels_ system or shipping code over the network. This *portal* approach turns out to be paricularly exciting with the introduction of `asynchronous generators`_ in Python 3.6! It means that -actors can compose nicely in a data processing pipeline. +actors can compose nicely in a data streaming pipeline. -As an example here's an actor that streams for 1 second from a remote async -generator function running in a separate actor: + +Streaming +********* +By now you've figured out that ``tractor`` lets you spawn process based +*actors* that can invoke cross-process (async) functions and all with +structured concurrency built in. But the **real cool stuff** is the +native support for cross-process *streaming*. + + +Asynchronous generators ++++++++++++++++++++++++ +The default streaming function is simply an async generator definition. +Every value *yielded* from the generator is delivered to the calling +portal exactly like if you had invoked the function in-process meaning +you can ``async for`` to receive each value on the calling side. + +As an example here's a parent actor that streams for 1 second from a +spawned subactor: .. code:: python @@ -346,10 +414,87 @@ generator function running in a separate actor: tractor.run(main) +By default async generator functions are treated as inter-actor +*streams* when invoked via a portal (how else could you really interface +with them anyway) so no special syntax to denote the streaming *service* +is necessary. + + +Channels and Contexts ++++++++++++++++++++++ +If you aren't fond of having to write an async generator to stream data +between actors (or need something more flexible) you can instead use +a ``Context``. A context wraps an actor-local spawned task and +a ``Channel`` so that tasks executing across multiple processes can +stream data to one another using a low level, request oriented API. + +A ``Channel`` wraps an underlying *transport* and *interchange* format +to enable *inter-actor-communication*. In its present state ``tractor`` +uses TCP and msgpack_. + +As an example if you wanted to create a streaming server without writing +an async generator that *yields* values you instead define a decorated +async function: + +.. code:: python + + @tractor.stream + async def streamer(ctx: tractor.Context, rate: int = 2) -> None: + """A simple web response streaming server. + """ + while True: + val = await web_request('http://data.feed.com') + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) + + +You must decorate the function with ``@tractor.stream`` and declare +a ``ctx`` argument as the first in your function signature and then +``tractor`` will treat the async function like an async generator - as +a stream from the calling/client side. + +This turns out to be handy particularly if you have multiple tasks +pushing responses concurrently: + +.. code:: python + + async def streamer( + ctx: tractor.Context, + rate: int = 2 + ) -> None: + """A simple web response streaming server. + """ + while True: + val = await web_request(url) + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) + + + @tractor.stream + async def stream_multiple_sources( + ctx: tractor.Context, + sources: List[str] + ) -> None: + async with trio.open_nursery() as n: + for url in sources: + n.start_soon(streamer, ctx, url) + + +The context notion comes from the context_ in nanomsg_. + +.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 +.. _msgpack: https://en.wikipedia.org/wiki/MessagePack + A full fledged streaming service -******************************** +++++++++++++++++++++++++++++++++ Alright, let's get fancy. Say you wanted to spawn two actors which each pull data feeds from @@ -471,58 +616,6 @@ as ``multiprocessing`` calls it) which is running ``main()``. .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i -Cancellation -************ -``tractor`` supports ``trio``'s cancellation_ system verbatim. -Cancelling a nursery block cancels all actors spawned by it. -Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. - -.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags - - -Remote error propagation -************************ -Any task invoked in a remote actor should ship any error(s) back to the calling -actor where it is raised and expected to be dealt with. This way remote actors -are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. - -.. code:: python - - async def assert_err(): - assert 0 - - - async def main(): - async with tractor.open_nursery() as n: - real_actors = [] - for i in range(3): - real_actors.append(await n.start_actor( - f'actor_{i}', - rpc_module_paths=[__name__], - )) - - # start one actor that will fail immediately - await n.run_in_actor('extra', assert_err) - - # should error here with a ``RemoteActorError`` containing - # an ``AssertionError`` and all the other actors have been cancelled - - try: - # also raises - tractor.run(main) - except tractor.RemoteActorError: - print("Look Maa that actor failed hard, hehhh!") - - -You'll notice the nursery cancellation conducts a *one-cancels-all* -supervisory strategy `exactly like trio`_. The plan is to add more -`erlang strategies`_ in the near future by allowing nurseries to accept -a ``Supervisor`` type. - -.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics -.. _erlang strategies: http://learnyousomeerlang.com/supervisors - - Actor local variables ********************* Although ``tractor`` uses a *shared-nothing* architecture between processes @@ -556,8 +649,8 @@ a convenience for passing simple data to newly spawned actors); building out a state sharing system per-actor is totally up to you. -How do actors find each other (a poor man's *service discovery*)? -***************************************************************** +Service Discovery +***************** Though it will be built out much more in the near future, ``tractor`` currently keeps track of actors by ``(name: str, id: str)`` using a special actor called the *arbiter*. Currently the *arbiter* must exist @@ -590,70 +683,6 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. -Streaming using channels and contexts -************************************* -``Channel`` is the API which wraps an underlying *transport* and *interchange* -format to enable *inter-actor-communication*. In its present state ``tractor`` -uses TCP and msgpack_. - -If you aren't fond of having to write an async generator to stream data -between actors (or need something more flexible) you can instead use a -``Context``. A context wraps an actor-local spawned task and a ``Channel`` -so that tasks executing across multiple processes can stream data -to one another using a low level, request oriented API. - -As an example if you wanted to create a streaming server without writing -an async generator that *yields* values you instead define an async -function: - -.. code:: python - - async def streamer(ctx: tractor.Context, rate: int = 2) -> None: - """A simple web response streaming server. - """ - while True: - val = await web_request('http://data.feed.com') - - # this is the same as ``yield`` in the async gen case - await ctx.send_yield(val) - - await trio.sleep(1 / rate) - - -All that's required is declaring a ``ctx`` argument name somewhere in -your function signature and ``tractor`` will treat the async function -like an async generator - as a streaming function from the client side. -This turns out to be handy particularly if you have -multiple tasks streaming responses concurrently: - -.. code:: python - - async def streamer(ctx: tractor.Context, rate: int = 2) -> None: - """A simple web response streaming server. - """ - while True: - val = await web_request(url) - - # this is the same as ``yield`` in the async gen case - await ctx.send_yield(val) - - await trio.sleep(1 / rate) - - - async def stream_multiple_sources( - ctx: tractor.Context, sources: List[str] - ) -> None: - async with trio.open_nursery() as n: - for url in sources: - n.start_soon(streamer, ctx, url) - - -The context notion comes from the context_ in nanomsg_. - -.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 -.. _msgpack: https://en.wikipedia.org/wiki/MessagePack - - Running actors standalone ************************* You don't have to spawn any actors using ``open_nursery()`` if you just diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 1ada466..4fc7e37 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -2,13 +2,29 @@ Streaming via async gen api """ import time +from functools import partial import trio import tractor import pytest -async def stream_seq(sequence): + +def test_must_define_ctx(): + + with pytest.raises(TypeError) as err: + @tractor.stream + async def no_ctx(): + pass + + assert "no_ctx must be `ctx: tractor.Context" in str(err.value) + + @tractor.stream + async def no_ctx(ctx): + pass + + +async def async_gen_stream(sequence): for i in sequence: yield i await trio.sleep(0.1) @@ -20,10 +36,23 @@ async def stream_seq(sequence): assert cs.cancelled_caught -async def stream_from_single_subactor(): +@tractor.stream +async def context_stream(ctx, sequence): + for i in sequence: + await ctx.send_yield(i) + await trio.sleep(0.1) + + # block indefinitely waiting to be cancelled by ``aclose()`` call + with trio.CancelScope() as cs: + await trio.sleep(float('inf')) + assert 0 + assert cs.cancelled_caught + + +async def stream_from_single_subactor(stream_func_name): """Verify we can spawn a daemon actor and retrieve streamed data. """ - async with tractor.find_actor('brokerd') as portals: + async with tractor.find_actor('streamerd') as portals: if not portals: # only one per host address, spawns an actor if None async with tractor.open_nursery() as nursery: @@ -36,37 +65,43 @@ async def stream_from_single_subactor(): seq = range(10) - agen = await portal.run( + stream = await portal.run( __name__, - 'stream_seq', # the func above + stream_func_name, # one of the funcs above sequence=list(seq), # has to be msgpack serializable ) # it'd sure be nice to have an asyncitertools here... iseq = iter(seq) ival = next(iseq) - async for val in agen: + async for val in stream: assert val == ival try: ival = next(iseq) except StopIteration: # should cancel far end task which will be # caught and no error is raised - await agen.aclose() + await stream.aclose() await trio.sleep(0.3) try: - await agen.__anext__() + await stream.__anext__() except StopAsyncIteration: # stop all spawned subactors await portal.cancel_actor() # await nursery.cancel() -def test_stream_from_single_subactor(arb_addr, start_method): +@pytest.mark.parametrize( + 'stream_func', ['async_gen_stream', 'context_stream'] +) +def test_stream_from_single_subactor(arb_addr, start_method, stream_func): """Verify streaming from a spawned async generator. """ tractor.run( - stream_from_single_subactor, + partial( + stream_from_single_subactor, + stream_func_name=stream_func, + ), arbiter_addr=arb_addr, start_method=start_method, ) diff --git a/tractor/__init__.py b/tractor/__init__.py index 2f6e7d7..fb9b095 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -11,10 +11,10 @@ import trio # type: ignore from trio import MultiError from . import log -from ._ipc import _connect_chan, Channel, Context -from ._actor import ( - Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor -) +from ._ipc import _connect_chan, Channel +from ._streaming import Context, stream +from ._discovery import get_arbiter, find_actor, wait_for_actor +from ._actor import Actor, _start_actor, Arbiter from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed @@ -30,6 +30,7 @@ __all__ = [ 'wait_for_actor', 'Channel', 'Context', + 'stream', 'MultiError', 'RemoteActorError', 'ModuleNotExposed', diff --git a/tractor/_actor.py b/tractor/_actor.py index 1b38d88..3f2a89c 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -8,26 +8,22 @@ import importlib import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional, Union +from typing import Dict, List, Tuple, Any, Optional import trio # type: ignore -from async_generator import asynccontextmanager, aclosing +from async_generator import aclosing -from ._ipc import Channel, _connect_chan, Context +from ._ipc import Channel +from ._streaming import Context, _context from .log import get_console_log, get_logger from ._exceptions import ( pack_error, unpack_error, ModuleNotExposed ) -from ._portal import ( - Portal, - open_portal, - _do_handshake, - LocalPortal, -) +from ._discovery import get_arbiter +from ._portal import Portal from . import _state -from ._state import current_actor log = get_logger('tractor') @@ -45,19 +41,16 @@ async def _invoke( kwargs: Dict[str, Any], task_status=trio.TASK_STATUS_IGNORED ): - """Invoke local func and return results over provided channel. + """Invoke local func and deliver result(s) over provided channel. """ - sig = inspect.signature(func) treat_as_gen = False cs = None - ctx = Context(chan, cid) - if 'ctx' in sig.parameters: + cancel_scope = trio.CancelScope() + ctx = Context(chan, cid, cancel_scope) + _context.set(ctx) + if getattr(func, '_tractor_stream_function', False): + # handle decorated ``@tractor.stream`` async functions kwargs['ctx'] = ctx - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `ctx` kwarg in its - # signature will be treated as one. treat_as_gen = True try: is_async_partial = False @@ -73,7 +66,7 @@ async def _invoke( not is_async_gen_partial ): await chan.send({'functype': 'function', 'cid': cid}) - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await chan.send({'return': func(**kwargs), 'cid': cid}) else: @@ -88,7 +81,7 @@ async def _invoke( # have to properly handle the closing (aclosing) # of the async gen in order to be sure the cancel # is propagated! - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) async with aclosing(coro) as agen: async for item in agen: @@ -113,7 +106,7 @@ async def _invoke( # back values like an async-generator would but must # manualy construct the response dict-packet-responses as # above - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await coro if not cs.cancelled_caught: @@ -122,7 +115,7 @@ async def _invoke( await chan.send({'stop': True, 'cid': cid}) else: await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except Exception as err: @@ -174,7 +167,7 @@ class Actor: arbiter_addr: Optional[Tuple[str, int]] = None, ) -> None: self.name = name - self.uid = (name, uid or str(uuid.uuid1())) + self.uid = (name, uid or str(uuid.uuid4())) self.rpc_module_paths = rpc_module_paths self._mods: dict = {} # TODO: consider making this a dynamically defined @@ -247,7 +240,7 @@ class Actor: # send/receive initial handshake response try: - uid = await _do_handshake(self, chan) + uid = await self._do_handshake(chan) except StopAsyncIteration: log.warning(f"Channel {chan} failed to handshake") return @@ -351,7 +344,7 @@ class Actor: caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ - cid = str(uuid.uuid1()) + cid = str(uuid.uuid4()) assert chan.uid recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") @@ -373,11 +366,12 @@ class Actor: msg = None log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - # internal scope allows for keeping this message - # loop running despite the current task having been - # cancelled (eg. `open_portal()` may call this method from - # a locally spawned task) with trio.CancelScope(shield=shield) as cs: + # this internal scope allows for keeping this message + # loop running despite the current task having been + # cancelled (eg. `open_portal()` may call this method from + # a locally spawned task) and recieve this scope using + # ``scope = Nursery.start()`` task_status.started(cs) async for msg in chan: if msg is None: # loop terminate sentinel @@ -385,7 +379,7 @@ class Actor: f"Cancelling all tasks for {chan} from {chan.uid}") for (channel, cid) in self._rpc_tasks: if channel is chan: - self.cancel_task(cid, Context(channel, cid)) + self._cancel_task(cid, channel) log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") @@ -419,6 +413,16 @@ class Actor: f"{ns}.{funcname}({kwargs})") if ns == 'self': func = getattr(self, funcname) + if funcname == '_cancel_task': + # XXX: a special case is made here for + # remote calls since we don't want the + # remote actor have to know which channel + # the task is associated with and we can't + # pass non-primitive types between actors. + # This means you can use: + # Portal.run('self', '_cancel_task, cid=did) + # without passing the `chan` arg. + kwargs['chan'] = chan else: # complain to client about restricted modules try: @@ -537,7 +541,7 @@ class Actor: ) await chan.connect() # initial handshake, report who we are, who they are - await _do_handshake(self, chan) + await self._do_handshake(chan) except OSError: # failed to connect log.warning( f"Failed to connect to parent @ {parent_addr}," @@ -661,21 +665,20 @@ class Actor: self.cancel_server() self._root_nursery.cancel_scope.cancel() - async def cancel_task(self, cid, ctx): - """Cancel a local task. + async def _cancel_task(self, cid, chan): + """Cancel a local task by call-id / channel. - Note this method will be treated as a streaming funciton + Note this method will be treated as a streaming function by remote actor-callers due to the declaration of ``ctx`` in the signature (for now). """ # right now this is only implicitly called by # streaming IPC but it should be called # to cancel any remotely spawned task - chan = ctx.chan try: # this ctx based lookup ensures the requested task to # be cancelled was indeed spawned by a request from this channel - scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + scope, func, is_complete = self._rpc_tasks[(chan, cid)] except KeyError: log.warning(f"{cid} has already completed/terminated?") return @@ -686,7 +689,7 @@ class Actor: # don't allow cancelling this function mid-execution # (is this necessary?) - if func is self.cancel_task: + if func is self._cancel_task: return scope.cancel() @@ -704,7 +707,7 @@ class Actor: log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for (chan, cid) in tasks.copy(): # TODO: this should really done in a nursery batch - await self.cancel_task(cid, Context(chan, cid)) + await self._cancel_task(cid, chan) # if tasks: log.info( f"Waiting for remaining rpc tasks to complete {tasks}") @@ -735,6 +738,25 @@ class Actor: """Return all channels to the actor with provided uid.""" return self._peers[uid] + async def _do_handshake( + self, + chan: Channel + ) -> Tuple[str, str]: + """Exchange (name, UUIDs) identifiers as the first communication step. + + These are essentially the "mailbox addresses" found in actor model + parlance. + """ + await chan.send(self.uid) + uid: Tuple[str, str] = await chan.recv() + + if not isinstance(uid, tuple): + raise ValueError(f"{uid} is not a valid uid?!") + + chan.uid = uid + log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + return uid + class Arbiter(Actor): """A special actor who knows all the other actors and always has @@ -840,66 +862,3 @@ async def _start_actor( log.info("Completed async main") return result - - -@asynccontextmanager -async def get_arbiter( - host: str, port: int -) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: - """Return a portal instance connected to a local or remote - arbiter. - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - if actor.is_arbiter: - # we're already the arbiter - # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor) - else: - async with _connect_chan(host, port) as chan: - async with open_portal(chan) as arb_portal: - yield arb_portal - - -@asynccontextmanager -async def find_actor( - name: str, arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Optional[Portal], None]: - """Ask the arbiter to find actor(s) by name. - - Returns a connected portal to the last registered matching actor - known to the arbiter. - """ - actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddr = await arb_portal.run('self', 'find_actor', name=name) - # TODO: return portals to all available actors - for now just - # the last one that registered - if name == 'arbiter' and actor.is_arbiter: - raise RuntimeError("The current actor is the arbiter") - elif sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None - - -@asynccontextmanager -async def wait_for_actor( - name: str, - arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Portal, None]: - """Wait on an actor to register with the arbiter. - - A portal to the first registered actor is returned. - """ - actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) - sockaddr = sockaddrs[-1] - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal diff --git a/tractor/_discovery.py b/tractor/_discovery.py new file mode 100644 index 0000000..ae2e3be --- /dev/null +++ b/tractor/_discovery.py @@ -0,0 +1,77 @@ +""" +Actor discovery API. +""" +import typing +from typing import Tuple, Optional, Union +from async_generator import asynccontextmanager + +from ._ipc import _connect_chan +from ._portal import ( + Portal, + open_portal, + LocalPortal, +) +from ._state import current_actor + + +@asynccontextmanager +async def get_arbiter( + host: str, port: int +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: + """Return a portal instance connected to a local or remote + arbiter. + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + if actor.is_arbiter: + # we're already the arbiter + # (likely a re-entrant call from the arbiter actor) + yield LocalPortal(actor) + else: + async with _connect_chan(host, port) as chan: + async with open_portal(chan) as arb_portal: + yield arb_portal + + +@asynccontextmanager +async def find_actor( + name: str, arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Optional[Portal], None]: + """Ask the arbiter to find actor(s) by name. + + Returns a connected portal to the last registered matching actor + known to the arbiter. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddr = await arb_portal.run('self', 'find_actor', name=name) + # TODO: return portals to all available actors - for now just + # the last one that registered + if name == 'arbiter' and actor.is_arbiter: + raise RuntimeError("The current actor is the arbiter") + elif sockaddr: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield None + + +@asynccontextmanager +async def wait_for_actor( + name: str, + arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Portal, None]: + """Wait on an actor to register with the arbiter. + + A portal to the first registered actor is returned. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 1cddeca..94f978f 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,7 +1,6 @@ """ Inter-process comms abstractions """ -from dataclasses import dataclass import typing from typing import Any, Tuple, Optional @@ -205,28 +204,6 @@ class Channel: return self.msgstream.connected() if self.msgstream else False -@dataclass(frozen=True) -class Context: - """An IAC (inter-actor communication) context. - - Allows maintaining task or protocol specific state between communicating - actors. A unique context is created on the receiving end for every request - to a remote actor. - """ - chan: Channel - cid: str - - # TODO: we should probably attach the actor-task - # cancel scope here now that trio is exposing it - # as a public object - - async def send_yield(self, data: Any) -> None: - await self.chan.send({'yield': data, 'cid': self.cid}) - - async def send_stop(self) -> None: - await self.chan.send({'stop': True, 'cid': self.cid}) - - @asynccontextmanager async def _connect_chan( host: str, port: int diff --git a/tractor/_portal.py b/tractor/_portal.py index 160db19..925e3ad 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -33,21 +33,6 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): yield nursery -async def _do_handshake( - actor: 'Actor', # type: ignore - chan: Channel -) -> Any: - await chan.send(actor.uid) - uid: Tuple[str, str] = await chan.recv() - - if not isinstance(uid, tuple): - raise ValueError(f"{uid} is not a valid uid?!") - - chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") - return uid - - class StreamReceiveChannel(trio.abc.ReceiveChannel): """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with special behaviour for signalling stream termination across an @@ -95,8 +80,8 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): raise unpack_error(msg, self._portal.channel) async def aclose(self): - """Cancel associate remote actor task on close - as well as the local memory channel. + """Cancel associated remote actor task and local memory channel + on close. """ if self._rx_chan._closed: log.warning(f"{self} is already closed") @@ -107,15 +92,10 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): log.warning( f"Cancelling stream {cid} to " f"{self._portal.channel.uid}") - # TODO: yeah.. it'd be nice if this was just an - # async func on the far end. Gotta figure out a - # better way then implicitly feeding the ctx - # to declaring functions; likely a decorator - # system. - rchan = await self._portal.run( - 'self', 'cancel_task', cid=cid) - async for _ in rchan: - pass + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run('self', '_cancel_task', cid=cid) if cs.cancelled_caught: # XXX: there's no way to know if the remote task was indeed @@ -153,6 +133,7 @@ class Portal: Tuple[str, Any, str, Dict[str, Any]] ] = None self._streams: Set[StreamReceiveChannel] = set() + self.actor = current_actor() async def _submit( self, @@ -167,7 +148,7 @@ class Portal: This is an async call. """ # ship a function call request to the remote actor - cid, recv_chan = await current_actor().send_cmd( + cid, recv_chan = await self.actor.send_cmd( self.channel, ns, func, kwargs) # wait on first response msg and handle (this should be @@ -345,7 +326,7 @@ async def open_portal( was_connected = True if channel.uid is None: - await _do_handshake(actor, channel) + await actor._do_handshake(channel) msg_loop_cs = await nursery.start( partial( diff --git a/tractor/_streaming.py b/tractor/_streaming.py new file mode 100644 index 0000000..9ed0f14 --- /dev/null +++ b/tractor/_streaming.py @@ -0,0 +1,49 @@ +import inspect +from contextvars import ContextVar +from dataclasses import dataclass +from typing import Any + +import trio + +from ._ipc import Channel + + +_context: ContextVar['Context'] = ContextVar('context') + + +@dataclass(frozen=True) +class Context: + """An IAC (inter-actor communication) context. + + Allows maintaining task or protocol specific state between communicating + actors. A unique context is created on the receiving end for every request + to a remote actor. + """ + chan: Channel + cid: str + cancel_scope: trio.CancelScope + + async def send_yield(self, data: Any) -> None: + await self.chan.send({'yield': data, 'cid': self.cid}) + + async def send_stop(self) -> None: + await self.chan.send({'stop': True, 'cid': self.cid}) + + +def current_context(): + """Get the current task's context instance. + """ + return _context.get() + + +def stream(func): + """Mark an async function as a streaming routine. + """ + func._tractor_stream_function = True + sig = inspect.signature(func) + if 'ctx' not in sig.parameters: + raise TypeError( + "The first argument to the stream function " + f"{func.__name__} must be `ctx: tractor.Context`" + ) + return func diff --git a/tractor/msg.py b/tractor/msg.py index 73e39d5..cd064a0 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -12,7 +12,7 @@ import wrapt from .log import get_logger from . import current_actor -from ._ipc import Context +from ._streaming import Context __all__ = ['pub'] @@ -97,29 +97,32 @@ def pub( ): """Publisher async generator decorator. - A publisher can be called multiple times from different actors - but will only spawn a finite set of internal tasks to stream values - to each caller. The ``tasks` argument to the decorator (``Set[str]``) - specifies the names of the mutex set of publisher tasks. - When the publisher function is called, an argument ``task_name`` must be - passed to specify which task (of the set named in ``tasks``) should be - used. This allows for using the same publisher with different input - (arguments) without allowing more concurrent tasks then necessary. + A publisher can be called multiple times from different actors but + will only spawn a finite set of internal tasks to stream values to + each caller. The ``tasks: Set[str]`` argument to the decorator + specifies the names of the mutex set of publisher tasks. When the + publisher function is called, an argument ``task_name`` must be + passed to specify which task (of the set named in ``tasks``) should + be used. This allows for using the same publisher with different + input (arguments) without allowing more concurrent tasks then + necessary. - Values yielded from the decorated async generator - must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the - topic string an determines which subscription the packet will be delivered - to and the value is a packet ``Dict[str, Any]`` by default of the form: + Values yielded from the decorated async generator must be + ``Dict[str, Dict[str, Any]]`` where the fist level key is the topic + string and determines which subscription the packet will be + delivered to and the value is a packet ``Dict[str, Any]`` by default + of the form: .. ::python - {topic: value} + {topic: str: value: Any} - The caller can instead opt to pass a ``packetizer`` callback who's return - value will be delivered as the published response. + The caller can instead opt to pass a ``packetizer`` callback who's + return value will be delivered as the published response. - The decorated function must *accept* an argument :func:`get_topics` which - dynamically returns the tuple of current subscriber topics: + The decorated async generator function must accept an argument + :func:`get_topics` which dynamically returns the tuple of current + subscriber topics: .. code:: python @@ -162,15 +165,15 @@ def pub( print(f"Subscriber received {value}") - Here, you don't need to provide the ``ctx`` argument since the remote actor - provides it automatically to the spawned task. If you were to call - ``pub_service()`` directly from a wrapping function you would need to - provide this explicitly. + Here, you don't need to provide the ``ctx`` argument since the + remote actor provides it automatically to the spawned task. If you + were to call ``pub_service()`` directly from a wrapping function you + would need to provide this explicitly. - Remember you only need this if you need *a finite set of tasks* running in - a single actor to stream data to an arbitrary number of subscribers. If you - are ok to have a new task running for every call to ``pub_service()`` then - probably don't need this. + Remember you only need this if you need *a finite set of tasks* + running in a single actor to stream data to an arbitrary number of + subscribers. If you are ok to have a new task running for every call + to ``pub_service()`` then probably don't need this. """ # handle the decorator not called with () case if wrapped is None: @@ -181,10 +184,7 @@ def pub( for name in tasks: task2lock[name] = trio.StrictFIFOLock() - async def takes_ctx(get_topics, ctx=None): - pass - - @wrapt.decorator(adapter=takes_ctx) + @wrapt.decorator async def wrapper(agen, instance, args, kwargs): # this is used to extract arguments properly as per # the `wrapt` docs @@ -249,7 +249,6 @@ def pub( # invoke it await _execute(*args, **kwargs) - funcname = wrapped.__name__ if not inspect.isasyncgenfunction(wrapped): raise TypeError( @@ -261,4 +260,8 @@ def pub( "`get_topics` argument" ) + # XXX: manually monkey the wrapped function since + # ``wrapt.decorator`` doesn't seem to want to play nice with its + # whole "adapter" thing... + wrapped._tractor_stream_function = True # type: ignore return wrapper(wrapped)