diff --git a/README.rst b/README.rst index 4f7a057..b3a5181 100644 --- a/README.rst +++ b/README.rst @@ -362,10 +362,10 @@ actors can compose nicely in a data streaming pipeline. Streaming ********* -By now you've figured out that ``tractor`` lets you spawn -process based actors that can invoke cross-process async functions -between each other and all with structured concurrency built in, but, -the **real power** is the ability to accomplish cross-process *streaming*. +By now you've figured out that ``tractor`` lets you spawn process based +*actors* that can invoke cross-process (async) functions and all with +structured concurrency built in. But the **real cool stuff** is the +native support for cross-process *streaming*. Asynchronous generators @@ -423,21 +423,22 @@ is necessary. Channels and Contexts +++++++++++++++++++++ If you aren't fond of having to write an async generator to stream data -between actors (or need something more flexible) you can instead use a -``Context``. A context wraps an actor-local spawned task and a ``Channel`` -so that tasks executing across multiple processes can stream data -to one another using a low level, request oriented API. +between actors (or need something more flexible) you can instead use +a ``Context``. A context wraps an actor-local spawned task and +a ``Channel`` so that tasks executing across multiple processes can +stream data to one another using a low level, request oriented API. -``Channel`` is the API which wraps an underlying *transport* and *interchange* -format to enable *inter-actor-communication*. In its present state ``tractor`` +A ``Channel`` wraps an underlying *transport* and *interchange* format +to enable *inter-actor-communication*. In its present state ``tractor`` uses TCP and msgpack_. As an example if you wanted to create a streaming server without writing -an async generator that *yields* values you instead define an async -function: +an async generator that *yields* values you instead define a decorated +async function: .. code:: python + @tractor.stream async def streamer(ctx: tractor.Context, rate: int = 2) -> None: """A simple web response streaming server. """ @@ -450,15 +451,20 @@ function: await trio.sleep(1 / rate) -All that's required is declaring a ``ctx`` argument name somewhere in -your function signature and ``tractor`` will treat the async function -like an async generator - as a streaming function from the client side. -This turns out to be handy particularly if you have -multiple tasks streaming responses concurrently: +You must decorate the function with ``@tractor.stream`` and declare +a ``ctx`` argument as the first in your function signature and then +``tractor`` will treat the async function like an async generator - as +a stream from the calling/client side. + +This turns out to be handy particularly if you have multiple tasks +pushing responses concurrently: .. code:: python - async def streamer(ctx: tractor.Context, rate: int = 2) -> None: + async def streamer( + ctx: tractor.Context, + rate: int = 2 + ) -> None: """A simple web response streaming server. """ while True: @@ -470,8 +476,10 @@ multiple tasks streaming responses concurrently: await trio.sleep(1 / rate) + @tractor.stream async def stream_multiple_sources( - ctx: tractor.Context, sources: List[str] + ctx: tractor.Context, + sources: List[str] ) -> None: async with trio.open_nursery() as n: for url in sources: