forked from goodboy/tractor
1
0
Fork 0

Reorg streaming section

stream_functions
Tyler Goodlet 2019-03-24 14:55:13 -04:00
parent 4ee35038fb
commit 2f773fc883
1 changed files with 145 additions and 124 deletions

View File

@ -280,8 +280,60 @@ to all others with ease over standard network protocols).
.. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor .. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor
Async IPC using *portals* Cancellation
************************* ************
``tractor`` supports ``trio``'s cancellation_ system verbatim.
Cancelling a nursery block cancels all actors spawned by it.
Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``.
.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags
Remote error propagation
************************
Any task invoked in a remote actor should ship any error(s) back to the calling
actor where it is raised and expected to be dealt with. This way remote actors
are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself.
.. code:: python
async def assert_err():
assert 0
async def main():
async with tractor.open_nursery() as n:
real_actors = []
for i in range(3):
real_actors.append(await n.start_actor(
f'actor_{i}',
rpc_module_paths=[__name__],
))
# start one actor that will fail immediately
await n.run_in_actor('extra', assert_err)
# should error here with a ``RemoteActorError`` containing
# an ``AssertionError`` and all the other actors have been cancelled
try:
# also raises
tractor.run(main)
except tractor.RemoteActorError:
print("Look Maa that actor failed hard, hehhh!")
You'll notice the nursery cancellation conducts a *one-cancels-all*
supervisory strategy `exactly like trio`_. The plan is to add more
`erlang strategies`_ in the near future by allowing nurseries to accept
a ``Supervisor`` type.
.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics
.. _erlang strategies: http://learnyousomeerlang.com/supervisors
IPC using *portals*
*******************
``tractor`` introduces the concept of a *portal* which is an API ``tractor`` introduces the concept of a *portal* which is an API
borrowed_ from ``trio``. A portal may seem similar to the idea of borrowed_ from ``trio``. A portal may seem similar to the idea of
a RPC future_ except a *portal* allows invoking remote *async* functions and a RPC future_ except a *portal* allows invoking remote *async* functions and
@ -305,10 +357,26 @@ channels_ system or shipping code over the network.
This *portal* approach turns out to be paricularly exciting with the This *portal* approach turns out to be paricularly exciting with the
introduction of `asynchronous generators`_ in Python 3.6! It means that introduction of `asynchronous generators`_ in Python 3.6! It means that
actors can compose nicely in a data processing pipeline. actors can compose nicely in a data streaming pipeline.
As an example here's an actor that streams for 1 second from a remote async
generator function running in a separate actor: Streaming
*********
By now you've figured out that ``tractor`` lets you spawn
process based actors that can invoke cross-process async functions
between each other and all with structured concurrency built in, but,
the **real power** is the ability to accomplish cross-process *streaming*.
Asynchronous generators
+++++++++++++++++++++++
The default streaming function is simply an async generator definition.
Every value *yielded* from the generator is delivered to the calling
portal exactly like if you had invoked the function in-process meaning
you can ``async for`` to receive each value on the calling side.
As an example here's a parent actor that streams for 1 second from a
spawned subactor:
.. code:: python .. code:: python
@ -346,10 +414,79 @@ generator function running in a separate actor:
tractor.run(main) tractor.run(main)
By default async generator functions are treated as inter-actor
*streams* when invoked via a portal (how else could you really interface
with them anyway) so no special syntax to denote the streaming *service*
is necessary.
Channels and Contexts
+++++++++++++++++++++
If you aren't fond of having to write an async generator to stream data
between actors (or need something more flexible) you can instead use a
``Context``. A context wraps an actor-local spawned task and a ``Channel``
so that tasks executing across multiple processes can stream data
to one another using a low level, request oriented API.
``Channel`` is the API which wraps an underlying *transport* and *interchange*
format to enable *inter-actor-communication*. In its present state ``tractor``
uses TCP and msgpack_.
As an example if you wanted to create a streaming server without writing
an async generator that *yields* values you instead define an async
function:
.. code:: python
async def streamer(ctx: tractor.Context, rate: int = 2) -> None:
"""A simple web response streaming server.
"""
while True:
val = await web_request('http://data.feed.com')
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
All that's required is declaring a ``ctx`` argument name somewhere in
your function signature and ``tractor`` will treat the async function
like an async generator - as a streaming function from the client side.
This turns out to be handy particularly if you have
multiple tasks streaming responses concurrently:
.. code:: python
async def streamer(ctx: tractor.Context, rate: int = 2) -> None:
"""A simple web response streaming server.
"""
while True:
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
async def stream_multiple_sources(
ctx: tractor.Context, sources: List[str]
) -> None:
async with trio.open_nursery() as n:
for url in sources:
n.start_soon(streamer, ctx, url)
The context notion comes from the context_ in nanomsg_.
.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5
.. _msgpack: https://en.wikipedia.org/wiki/MessagePack
A full fledged streaming service A full fledged streaming service
******************************** ++++++++++++++++++++++++++++++++
Alright, let's get fancy. Alright, let's get fancy.
Say you wanted to spawn two actors which each pull data feeds from Say you wanted to spawn two actors which each pull data feeds from
@ -471,58 +608,6 @@ as ``multiprocessing`` calls it) which is running ``main()``.
.. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i
Cancellation
************
``tractor`` supports ``trio``'s cancellation_ system verbatim.
Cancelling a nursery block cancels all actors spawned by it.
Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``.
.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags
Remote error propagation
************************
Any task invoked in a remote actor should ship any error(s) back to the calling
actor where it is raised and expected to be dealt with. This way remote actors
are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself.
.. code:: python
async def assert_err():
assert 0
async def main():
async with tractor.open_nursery() as n:
real_actors = []
for i in range(3):
real_actors.append(await n.start_actor(
f'actor_{i}',
rpc_module_paths=[__name__],
))
# start one actor that will fail immediately
await n.run_in_actor('extra', assert_err)
# should error here with a ``RemoteActorError`` containing
# an ``AssertionError`` and all the other actors have been cancelled
try:
# also raises
tractor.run(main)
except tractor.RemoteActorError:
print("Look Maa that actor failed hard, hehhh!")
You'll notice the nursery cancellation conducts a *one-cancels-all*
supervisory strategy `exactly like trio`_. The plan is to add more
`erlang strategies`_ in the near future by allowing nurseries to accept
a ``Supervisor`` type.
.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics
.. _erlang strategies: http://learnyousomeerlang.com/supervisors
Actor local variables Actor local variables
********************* *********************
Although ``tractor`` uses a *shared-nothing* architecture between processes Although ``tractor`` uses a *shared-nothing* architecture between processes
@ -556,8 +641,8 @@ a convenience for passing simple data to newly spawned actors); building
out a state sharing system per-actor is totally up to you. out a state sharing system per-actor is totally up to you.
How do actors find each other (a poor man's *service discovery*)? Service Discovery
***************************************************************** *****************
Though it will be built out much more in the near future, ``tractor`` Though it will be built out much more in the near future, ``tractor``
currently keeps track of actors by ``(name: str, id: str)`` using a currently keeps track of actors by ``(name: str, id: str)`` using a
special actor called the *arbiter*. Currently the *arbiter* must exist special actor called the *arbiter*. Currently the *arbiter* must exist
@ -590,70 +675,6 @@ The ``name`` value you should pass to ``find_actor()`` is the one you passed as
*first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``.
Streaming using channels and contexts
*************************************
``Channel`` is the API which wraps an underlying *transport* and *interchange*
format to enable *inter-actor-communication*. In its present state ``tractor``
uses TCP and msgpack_.
If you aren't fond of having to write an async generator to stream data
between actors (or need something more flexible) you can instead use a
``Context``. A context wraps an actor-local spawned task and a ``Channel``
so that tasks executing across multiple processes can stream data
to one another using a low level, request oriented API.
As an example if you wanted to create a streaming server without writing
an async generator that *yields* values you instead define an async
function:
.. code:: python
async def streamer(ctx: tractor.Context, rate: int = 2) -> None:
"""A simple web response streaming server.
"""
while True:
val = await web_request('http://data.feed.com')
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
All that's required is declaring a ``ctx`` argument name somewhere in
your function signature and ``tractor`` will treat the async function
like an async generator - as a streaming function from the client side.
This turns out to be handy particularly if you have
multiple tasks streaming responses concurrently:
.. code:: python
async def streamer(ctx: tractor.Context, rate: int = 2) -> None:
"""A simple web response streaming server.
"""
while True:
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await ctx.send_yield(val)
await trio.sleep(1 / rate)
async def stream_multiple_sources(
ctx: tractor.Context, sources: List[str]
) -> None:
async with trio.open_nursery() as n:
for url in sources:
n.start_soon(streamer, ctx, url)
The context notion comes from the context_ in nanomsg_.
.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5
.. _msgpack: https://en.wikipedia.org/wiki/MessagePack
Running actors standalone Running actors standalone
************************* *************************
You don't have to spawn any actors using ``open_nursery()`` if you just You don't have to spawn any actors using ``open_nursery()`` if you just