From 2f773fc8839a7bfe1dbc42b61368ff5430558607 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Mar 2019 14:55:13 -0400 Subject: [PATCH] Reorg streaming section --- README.rst | 269 +++++++++++++++++++++++++++++------------------------ 1 file changed, 145 insertions(+), 124 deletions(-) diff --git a/README.rst b/README.rst index f8bf9d3..4f7a057 100644 --- a/README.rst +++ b/README.rst @@ -280,8 +280,60 @@ to all others with ease over standard network protocols). .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor -Async IPC using *portals* -************************* +Cancellation +************ +``tractor`` supports ``trio``'s cancellation_ system verbatim. +Cancelling a nursery block cancels all actors spawned by it. +Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. + +.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags + + +Remote error propagation +************************ +Any task invoked in a remote actor should ship any error(s) back to the calling +actor where it is raised and expected to be dealt with. This way remote actors +are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. + +.. code:: python + + async def assert_err(): + assert 0 + + + async def main(): + async with tractor.open_nursery() as n: + real_actors = [] + for i in range(3): + real_actors.append(await n.start_actor( + f'actor_{i}', + rpc_module_paths=[__name__], + )) + + # start one actor that will fail immediately + await n.run_in_actor('extra', assert_err) + + # should error here with a ``RemoteActorError`` containing + # an ``AssertionError`` and all the other actors have been cancelled + + try: + # also raises + tractor.run(main) + except tractor.RemoteActorError: + print("Look Maa that actor failed hard, hehhh!") + + +You'll notice the nursery cancellation conducts a *one-cancels-all* +supervisory strategy `exactly like trio`_. The plan is to add more +`erlang strategies`_ in the near future by allowing nurseries to accept +a ``Supervisor`` type. + +.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics +.. _erlang strategies: http://learnyousomeerlang.com/supervisors + + +IPC using *portals* +******************* ``tractor`` introduces the concept of a *portal* which is an API borrowed_ from ``trio``. A portal may seem similar to the idea of a RPC future_ except a *portal* allows invoking remote *async* functions and @@ -305,10 +357,26 @@ channels_ system or shipping code over the network. This *portal* approach turns out to be paricularly exciting with the introduction of `asynchronous generators`_ in Python 3.6! It means that -actors can compose nicely in a data processing pipeline. +actors can compose nicely in a data streaming pipeline. -As an example here's an actor that streams for 1 second from a remote async -generator function running in a separate actor: + +Streaming +********* +By now you've figured out that ``tractor`` lets you spawn +process based actors that can invoke cross-process async functions +between each other and all with structured concurrency built in, but, +the **real power** is the ability to accomplish cross-process *streaming*. + + +Asynchronous generators ++++++++++++++++++++++++ +The default streaming function is simply an async generator definition. +Every value *yielded* from the generator is delivered to the calling +portal exactly like if you had invoked the function in-process meaning +you can ``async for`` to receive each value on the calling side. + +As an example here's a parent actor that streams for 1 second from a +spawned subactor: .. code:: python @@ -346,10 +414,79 @@ generator function running in a separate actor: tractor.run(main) +By default async generator functions are treated as inter-actor +*streams* when invoked via a portal (how else could you really interface +with them anyway) so no special syntax to denote the streaming *service* +is necessary. + + +Channels and Contexts ++++++++++++++++++++++ +If you aren't fond of having to write an async generator to stream data +between actors (or need something more flexible) you can instead use a +``Context``. A context wraps an actor-local spawned task and a ``Channel`` +so that tasks executing across multiple processes can stream data +to one another using a low level, request oriented API. + +``Channel`` is the API which wraps an underlying *transport* and *interchange* +format to enable *inter-actor-communication*. In its present state ``tractor`` +uses TCP and msgpack_. + +As an example if you wanted to create a streaming server without writing +an async generator that *yields* values you instead define an async +function: + +.. code:: python + + async def streamer(ctx: tractor.Context, rate: int = 2) -> None: + """A simple web response streaming server. + """ + while True: + val = await web_request('http://data.feed.com') + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) + + +All that's required is declaring a ``ctx`` argument name somewhere in +your function signature and ``tractor`` will treat the async function +like an async generator - as a streaming function from the client side. +This turns out to be handy particularly if you have +multiple tasks streaming responses concurrently: + +.. code:: python + + async def streamer(ctx: tractor.Context, rate: int = 2) -> None: + """A simple web response streaming server. + """ + while True: + val = await web_request(url) + + # this is the same as ``yield`` in the async gen case + await ctx.send_yield(val) + + await trio.sleep(1 / rate) + + + async def stream_multiple_sources( + ctx: tractor.Context, sources: List[str] + ) -> None: + async with trio.open_nursery() as n: + for url in sources: + n.start_soon(streamer, ctx, url) + + +The context notion comes from the context_ in nanomsg_. + +.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 +.. _msgpack: https://en.wikipedia.org/wiki/MessagePack + A full fledged streaming service -******************************** +++++++++++++++++++++++++++++++++ Alright, let's get fancy. Say you wanted to spawn two actors which each pull data feeds from @@ -471,58 +608,6 @@ as ``multiprocessing`` calls it) which is running ``main()``. .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i -Cancellation -************ -``tractor`` supports ``trio``'s cancellation_ system verbatim. -Cancelling a nursery block cancels all actors spawned by it. -Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``. - -.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags - - -Remote error propagation -************************ -Any task invoked in a remote actor should ship any error(s) back to the calling -actor where it is raised and expected to be dealt with. This way remote actors -are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself. - -.. code:: python - - async def assert_err(): - assert 0 - - - async def main(): - async with tractor.open_nursery() as n: - real_actors = [] - for i in range(3): - real_actors.append(await n.start_actor( - f'actor_{i}', - rpc_module_paths=[__name__], - )) - - # start one actor that will fail immediately - await n.run_in_actor('extra', assert_err) - - # should error here with a ``RemoteActorError`` containing - # an ``AssertionError`` and all the other actors have been cancelled - - try: - # also raises - tractor.run(main) - except tractor.RemoteActorError: - print("Look Maa that actor failed hard, hehhh!") - - -You'll notice the nursery cancellation conducts a *one-cancels-all* -supervisory strategy `exactly like trio`_. The plan is to add more -`erlang strategies`_ in the near future by allowing nurseries to accept -a ``Supervisor`` type. - -.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics -.. _erlang strategies: http://learnyousomeerlang.com/supervisors - - Actor local variables ********************* Although ``tractor`` uses a *shared-nothing* architecture between processes @@ -556,8 +641,8 @@ a convenience for passing simple data to newly spawned actors); building out a state sharing system per-actor is totally up to you. -How do actors find each other (a poor man's *service discovery*)? -***************************************************************** +Service Discovery +***************** Though it will be built out much more in the near future, ``tractor`` currently keeps track of actors by ``(name: str, id: str)`` using a special actor called the *arbiter*. Currently the *arbiter* must exist @@ -590,70 +675,6 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. -Streaming using channels and contexts -************************************* -``Channel`` is the API which wraps an underlying *transport* and *interchange* -format to enable *inter-actor-communication*. In its present state ``tractor`` -uses TCP and msgpack_. - -If you aren't fond of having to write an async generator to stream data -between actors (or need something more flexible) you can instead use a -``Context``. A context wraps an actor-local spawned task and a ``Channel`` -so that tasks executing across multiple processes can stream data -to one another using a low level, request oriented API. - -As an example if you wanted to create a streaming server without writing -an async generator that *yields* values you instead define an async -function: - -.. code:: python - - async def streamer(ctx: tractor.Context, rate: int = 2) -> None: - """A simple web response streaming server. - """ - while True: - val = await web_request('http://data.feed.com') - - # this is the same as ``yield`` in the async gen case - await ctx.send_yield(val) - - await trio.sleep(1 / rate) - - -All that's required is declaring a ``ctx`` argument name somewhere in -your function signature and ``tractor`` will treat the async function -like an async generator - as a streaming function from the client side. -This turns out to be handy particularly if you have -multiple tasks streaming responses concurrently: - -.. code:: python - - async def streamer(ctx: tractor.Context, rate: int = 2) -> None: - """A simple web response streaming server. - """ - while True: - val = await web_request(url) - - # this is the same as ``yield`` in the async gen case - await ctx.send_yield(val) - - await trio.sleep(1 / rate) - - - async def stream_multiple_sources( - ctx: tractor.Context, sources: List[str] - ) -> None: - async with trio.open_nursery() as n: - for url in sources: - n.start_soon(streamer, ctx, url) - - -The context notion comes from the context_ in nanomsg_. - -.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 -.. _msgpack: https://en.wikipedia.org/wiki/MessagePack - - Running actors standalone ************************* You don't have to spawn any actors using ``open_nursery()`` if you just