diff --git a/docs/index.rst b/docs/index.rst index e0e7523..e0a093f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -108,49 +108,7 @@ A trynamic first scene Let's direct a couple *actors* and have them run their lines for the hip new film we're shooting: -.. code:: python - - import tractor - - _this_module = __name__ - the_line = 'Hi my name is {}' - - - async def hi(): - return the_line.format(tractor.current_actor().name) - - - async def say_hello(other_actor): - async with tractor.wait_for_actor(other_actor) as portal: - return await portal.run(_this_module, 'hi') - - - async def main(): - """Main tractor entry point, the "master" process (for now - acts as the "director"). - """ - async with tractor.open_nursery() as n: - print("Alright... Action!") - - donny = await n.run_in_actor( - 'donny', - say_hello, - # arguments are always named - other_actor='gretchen', - ) - gretchen = await n.run_in_actor( - 'gretchen', - say_hello, - other_actor='donny', - ) - print(await gretchen.result()) - print(await donny.result()) - print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...") - - - if __name__ == '__main__': - tractor.run(main) - +.. literalinclude:: ../examples/a_trynamic_first_scene.py We spawn two *actors*, *donny* and *gretchen*. Each actor starts up and executes their *main task* defined by an @@ -174,31 +132,7 @@ spawned by the same nursery to be cancelled_. To spawn an actor and run a function in it, open a *nursery block* and use the ``run_in_actor()`` method: -.. code:: python - - import tractor - - - def cellar_door(): - return "Dang that's beautiful" - - - async def main(): - """The main ``tractor`` routine. - """ - async with tractor.open_nursery() as n: - - portal = await n.run_in_actor('some_linguist', cellar_door) - - # The ``async with`` will unblock here since the 'some_linguist' - # actor has completed its main task ``cellar_door``. - - print(await portal.result()) - - - if __name__ == '__main__': - tractor.run(main) - +.. literalinclude:: ../examples/actor_spawning_and_causality.py What's going on? @@ -388,41 +322,7 @@ you can ``async for`` to receive each value on the calling side. As an example here's a parent actor that streams for 1 second from a spawned subactor: -.. code:: python - - from itertools import repeat - import trio - import tractor - - - async def stream_forever(): - for i in repeat("I can see these little future bubble things"): - # each yielded value is sent over the ``Channel`` to the - # parent actor - yield i - await trio.sleep(0.01) - - - async def main(): - # stream for at most 1 seconds - with trio.move_on_after(1) as cancel_scope: - async with tractor.open_nursery() as n: - portal = await n.start_actor( - f'donny', - rpc_module_paths=[__name__], - ) - - # this async for loop streams values from the above - # async generator running in a separate process - async for letter in await portal.run(__name__, 'stream_forever'): - print(letter) - - # we support trio's cancellation system - assert cancel_scope.cancelled_caught - assert n.cancelled - - - tractor.run(main) +.. literalinclude:: ../examples/asynchronous_generators.py By default async generator functions are treated as inter-actor *streams* when invoked via a portal (how else could you really interface @@ -513,104 +413,7 @@ You also want to aggregate these feeds, do some processing on them and then deliver the final result stream to a client (or in this case parent) actor and print the results to your screen: -.. code:: python - - import time - import trio - import tractor - - - # this is the first 2 actors, streamer_1 and streamer_2 - async def stream_data(seed): - for i in range(seed): - yield i - await trio.sleep(0) # trigger scheduler - - - # this is the third actor; the aggregator - async def aggregate(seed): - """Ensure that the two streams we receive match but only stream - a single set of values to the parent. - """ - async with tractor.open_nursery() as nursery: - portals = [] - for i in range(1, 3): - # fork point - portal = await nursery.start_actor( - name=f'streamer_{i}', - rpc_module_paths=[__name__], - ) - - portals.append(portal) - - send_chan, recv_chan = trio.open_memory_channel(500) - - async def push_to_chan(portal, send_chan): - async with send_chan: - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await send_chan.send(value) - - print(f"FINISHED ITERATING {portal.channel.uid}") - - # spawn 2 trio tasks to collect streams and push to a local queue - async with trio.open_nursery() as n: - - for portal in portals: - n.start_soon(push_to_chan, portal, send_chan.clone()) - - # close this local task's reference to send side - await send_chan.aclose() - - unique_vals = set() - async with recv_chan: - async for value in recv_chan: - if value not in unique_vals: - unique_vals.add(value) - # yield upwards to the spawning parent actor - yield value - - assert value in unique_vals - - print("FINISHED ITERATING in aggregator") - - await nursery.cancel() - print("WAITING on `ActorNursery` to finish") - print("AGGREGATOR COMPLETE!") - - - # this is the main actor and *arbiter* - async def main(): - # a nursery which spawns "actors" - async with tractor.open_nursery() as nursery: - - seed = int(1e3) - import time - pre_start = time.time() - - portal = await nursery.run_in_actor( - 'aggregator', - aggregate, - seed=seed, - ) - - start = time.time() - # the portal call returns exactly what you'd expect - # as if the remote "aggregate" function was called locally - result_stream = [] - async for value in await portal.result(): - result_stream.append(value) - - print(f"STREAM TIME = {time.time() - start}") - print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") - assert result_stream == list(range(seed)) - return result_stream - - - final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) - +.. literalinclude:: ../examples/full_fledged_streaming_service.py Here there's four actors running in separate processes (using all the cores on you machine). Two are streaming by *yielding* values from the @@ -672,22 +475,7 @@ now it does the trick. To find the arbiter from the current actor use the ``get_arbiter()`` function and to find an actor's socket address by name use the ``find_actor()`` function: -.. code:: python - - import tractor - - - async def main(service_name): - - async with tractor.get_arbiter() as portal: - print(f"Arbiter is listening on {portal.channel}") - - async with tractor.find_actor(service_name) as sockaddr: - print(f"my_service is found at {my_service}") - - - tractor.run(main, 'some_actor_name') - +.. literalinclude:: ../examples/service_discovery.py The ``name`` value you should pass to ``find_actor()`` is the one you passed as the *first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``.