forked from goodboy/tractor
1
0
Fork 0

Document `@tractor.stream`

stream_functions
Tyler Goodlet 2019-03-25 22:02:22 -04:00
parent e51f84af90
commit 096d211ed2
1 changed files with 27 additions and 19 deletions

View File

@ -362,10 +362,10 @@ actors can compose nicely in a data streaming pipeline.
Streaming Streaming
********* *********
By now you've figured out that ``tractor`` lets you spawn By now you've figured out that ``tractor`` lets you spawn process based
process based actors that can invoke cross-process async functions *actors* that can invoke cross-process (async) functions and all with
between each other and all with structured concurrency built in, but, structured concurrency built in. But the **real cool stuff** is the
the **real power** is the ability to accomplish cross-process *streaming*. native support for cross-process *streaming*.
Asynchronous generators Asynchronous generators
@ -423,21 +423,22 @@ is necessary.
Channels and Contexts Channels and Contexts
+++++++++++++++++++++ +++++++++++++++++++++
If you aren't fond of having to write an async generator to stream data If you aren't fond of having to write an async generator to stream data
between actors (or need something more flexible) you can instead use a between actors (or need something more flexible) you can instead use
``Context``. A context wraps an actor-local spawned task and a ``Channel`` a ``Context``. A context wraps an actor-local spawned task and
so that tasks executing across multiple processes can stream data a ``Channel`` so that tasks executing across multiple processes can
to one another using a low level, request oriented API. stream data to one another using a low level, request oriented API.
``Channel`` is the API which wraps an underlying *transport* and *interchange* A ``Channel`` wraps an underlying *transport* and *interchange* format
format to enable *inter-actor-communication*. In its present state ``tractor`` to enable *inter-actor-communication*. In its present state ``tractor``
uses TCP and msgpack_. uses TCP and msgpack_.
As an example if you wanted to create a streaming server without writing As an example if you wanted to create a streaming server without writing
an async generator that *yields* values you instead define an async an async generator that *yields* values you instead define a decorated
function: async function:
.. code:: python .. code:: python
@tractor.stream
async def streamer(ctx: tractor.Context, rate: int = 2) -> None: async def streamer(ctx: tractor.Context, rate: int = 2) -> None:
"""A simple web response streaming server. """A simple web response streaming server.
""" """
@ -450,15 +451,20 @@ function:
await trio.sleep(1 / rate) await trio.sleep(1 / rate)
All that's required is declaring a ``ctx`` argument name somewhere in You must decorate the function with ``@tractor.stream`` and declare
your function signature and ``tractor`` will treat the async function a ``ctx`` argument as the first in your function signature and then
like an async generator - as a streaming function from the client side. ``tractor`` will treat the async function like an async generator - as
This turns out to be handy particularly if you have a stream from the calling/client side.
multiple tasks streaming responses concurrently:
This turns out to be handy particularly if you have multiple tasks
pushing responses concurrently:
.. code:: python .. code:: python
async def streamer(ctx: tractor.Context, rate: int = 2) -> None: async def streamer(
ctx: tractor.Context,
rate: int = 2
) -> None:
"""A simple web response streaming server. """A simple web response streaming server.
""" """
while True: while True:
@ -470,8 +476,10 @@ multiple tasks streaming responses concurrently:
await trio.sleep(1 / rate) await trio.sleep(1 / rate)
@tractor.stream
async def stream_multiple_sources( async def stream_multiple_sources(
ctx: tractor.Context, sources: List[str] ctx: tractor.Context,
sources: List[str]
) -> None: ) -> None:
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for url in sources: for url in sources: