2018-07-06 06:34:11 +00:00
|
|
|
tractor
|
|
|
|
=======
|
2018-08-03 04:55:50 +00:00
|
|
|
An async-native `actor model`_ built on trio_ and multiprocessing_.
|
|
|
|
|
2018-07-06 06:34:11 +00:00
|
|
|
|
2018-07-13 15:34:43 +00:00
|
|
|
|travis|
|
|
|
|
|
|
|
|
.. |travis| image:: https://img.shields.io/travis/tgoodlet/tractor/master.svg
|
|
|
|
:target: https://travis-ci.org/tgoodlet/tractor
|
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
.. _actor model: https://en.wikipedia.org/wiki/Actor_model
|
|
|
|
.. _trio: https://github.com/python-trio/trio
|
|
|
|
.. _multiprocessing: https://docs.python.org/3/library/multiprocessing.html
|
|
|
|
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
|
|
|
|
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
|
|
|
.. _always propagate: https://trio.readthedocs.io/en/latest/design.html#exceptions-always-propagate
|
|
|
|
.. _causality: https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#c-c-c-c-causality-breaker
|
|
|
|
.. _shared nothing architecture: https://en.wikipedia.org/wiki/Shared-nothing_architecture
|
|
|
|
.. _cancellation: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-and-timeouts
|
|
|
|
.. _channels: https://en.wikipedia.org/wiki/Channel_(programming)
|
|
|
|
.. _chaos engineering: http://principlesofchaos.org/
|
|
|
|
|
|
|
|
|
2018-08-08 17:25:04 +00:00
|
|
|
What's this? Spawning event loops in processes?
|
|
|
|
-----------------------------------------------
|
|
|
|
Heh, yeah pretty much.
|
2018-08-03 04:55:50 +00:00
|
|
|
|
2018-07-06 06:34:11 +00:00
|
|
|
``tractor`` is an attempt to take trionic_ concurrency concepts and apply
|
2018-08-03 04:55:50 +00:00
|
|
|
them to distributed multi-core Python.
|
|
|
|
|
|
|
|
``tractor`` lets you run and spawn *actors*: separate processes which run a ``trio``
|
|
|
|
scheduler and task tree (also known as an `async sandwich`_).
|
|
|
|
*Actors* communicate by sending messages_ over channels_ and avoid sharing any state.
|
|
|
|
This `actor model`_ allows for highly distributed software architecture which works just as
|
|
|
|
well on multiple cores as it does over many hosts.
|
|
|
|
``tractor`` takes much inspiration from pulsar_ and execnet_ but attempts to be much more
|
2018-08-08 17:25:04 +00:00
|
|
|
focussed on sophistication of the lower level distributed architecture as well as have first
|
|
|
|
class support for `modern async Python`_.
|
2018-08-03 04:55:50 +00:00
|
|
|
|
|
|
|
The first step to grok ``tractor`` is to get the basics of ``trio``
|
2018-08-08 17:25:04 +00:00
|
|
|
down. A great place to start is the `trio docs`_ and this `blog post`_.
|
2018-07-06 06:34:11 +00:00
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
.. _messages: https://en.wikipedia.org/wiki/Message_passing
|
|
|
|
.. _trio docs: https://trio.readthedocs.io/en/latest/
|
|
|
|
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
2018-08-08 17:25:04 +00:00
|
|
|
.. _modern async Python: https://www.python.org/dev/peps/pep-0525/
|
2018-07-12 19:18:02 +00:00
|
|
|
|
2018-07-06 06:34:11 +00:00
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
Philosophy
|
|
|
|
----------
|
2018-07-06 06:34:11 +00:00
|
|
|
``tractor``'s tenets non-comprehensively include:
|
|
|
|
|
|
|
|
- no spawning of processes *willy-nilly*; causality_ is paramount!
|
|
|
|
- `shared nothing architecture`_
|
|
|
|
- remote errors `always propagate`_ back to the caller
|
|
|
|
- verbatim support for ``trio``'s cancellation_ system
|
|
|
|
- no use of *proxy* objects to wrap RPC calls
|
|
|
|
- an immersive debugging experience
|
2018-08-03 04:55:50 +00:00
|
|
|
- anti-fragility through `chaos engineering`_
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
.. warning:: ``tractor`` is in alpha-alpha and is expected to change rapidly!
|
2018-08-03 04:55:50 +00:00
|
|
|
Expect nothing to be set in stone. Your ideas about where it should go
|
|
|
|
are greatly appreciated!
|
2018-07-06 06:34:11 +00:00
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
.. _pulsar: http://quantmind.github.io/pulsar/design.html
|
|
|
|
.. _execnet: https://codespeak.net/execnet/
|
2018-07-12 19:18:02 +00:00
|
|
|
|
|
|
|
|
|
|
|
Install
|
|
|
|
-------
|
|
|
|
No PyPi release yet!
|
|
|
|
|
|
|
|
::
|
|
|
|
|
|
|
|
pip install git+git://github.com/tgoodlet/tractor.git
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
A trynamic first scene
|
|
|
|
----------------------
|
2018-08-03 04:55:50 +00:00
|
|
|
Let's direct a couple *actors* and have them run their lines for
|
|
|
|
the hip new film we're shooting:
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
import tractor
|
|
|
|
from functools import partial
|
|
|
|
|
|
|
|
_this_module = __name__
|
|
|
|
the_line = 'Hi my name is {}'
|
|
|
|
|
|
|
|
|
|
|
|
async def hi():
|
|
|
|
return the_line.format(tractor.current_actor().name)
|
|
|
|
|
|
|
|
|
|
|
|
async def say_hello(other_actor):
|
|
|
|
await trio.sleep(0.4) # wait for other actor to spawn
|
|
|
|
async with tractor.find_actor(other_actor) as portal:
|
|
|
|
return await portal.run(_this_module, 'hi')
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
"""Main tractor entry point, the "master" process (for now
|
|
|
|
acts as the "director").
|
|
|
|
"""
|
|
|
|
async with tractor.open_nursery() as n:
|
|
|
|
print("Alright... Action!")
|
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
donny = await n.run_in_actor(
|
2018-07-06 06:34:11 +00:00
|
|
|
'donny',
|
2018-08-03 04:55:50 +00:00
|
|
|
say_hello,
|
|
|
|
other_actor='gretchen',
|
2018-07-06 06:34:11 +00:00
|
|
|
)
|
2018-08-03 04:55:50 +00:00
|
|
|
gretchen = await n.run_in_actor(
|
2018-07-06 06:34:11 +00:00
|
|
|
'gretchen',
|
2018-08-03 04:55:50 +00:00
|
|
|
say_hello,
|
|
|
|
other_actor='donny',
|
2018-07-06 06:34:11 +00:00
|
|
|
)
|
|
|
|
print(await gretchen.result())
|
|
|
|
print(await donny.result())
|
|
|
|
await donny.cancel_actor()
|
2018-08-03 04:55:50 +00:00
|
|
|
print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...")
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
tractor.run(main)
|
|
|
|
|
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
We spawn two *actors*, *donny* and *gretchen*.
|
|
|
|
Each actor starts up and executes their *main task* defined by an
|
|
|
|
async function, ``say_hello()``. The function instructs each actor
|
|
|
|
to find their partner and say hello by calling their partner's
|
|
|
|
``hi()`` function using something called a *portal*. Each actor
|
|
|
|
receives a response and relays that back to the parent actor (in
|
|
|
|
this case our "director" executing ``main()``).
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
Actor spawning and causality
|
|
|
|
----------------------------
|
|
|
|
``tractor`` tries to take ``trio``'s concept of causal task lifetimes
|
2018-08-03 04:55:50 +00:00
|
|
|
to multi-process land. Accordingly, ``tractor``'s *actor nursery* behaves
|
|
|
|
similar to ``trio``'s nursery_. That is, ``tractor.open_nursery()``
|
|
|
|
opens an ``ActorNursery`` which waits on spawned *actors* to complete
|
|
|
|
(or error) in the same causal_ way ``trio`` waits on spawned subtasks.
|
|
|
|
This includes errors from any one actor causing all other actors
|
|
|
|
spawned by the same nursery to be cancelled_.
|
|
|
|
|
|
|
|
To spawn an actor and run a function in it, open a *nursery block*
|
|
|
|
and use the ``run_in_actor()`` method:
|
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
import tractor
|
|
|
|
|
|
|
|
|
|
|
|
def cellar_door():
|
|
|
|
return "Dang that's beautiful"
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
"""The main ``tractor`` routine.
|
|
|
|
"""
|
|
|
|
async with tractor.open_nursery() as n:
|
|
|
|
|
|
|
|
portal = await n.run_in_actor('frank', movie_theatre_question)
|
|
|
|
|
|
|
|
# The ``async with`` will unblock here since the 'frank'
|
|
|
|
# actor has completed its main task ``movie_theatre_question()``.
|
|
|
|
|
|
|
|
print(await portal.result())
|
|
|
|
|
|
|
|
|
|
|
|
tractor.run(main)
|
|
|
|
|
|
|
|
|
|
|
|
What's going on?
|
|
|
|
|
|
|
|
- an initial *actor* is started with ``tractor.run()`` and told to execute
|
|
|
|
its main task_: ``main()``
|
|
|
|
|
|
|
|
- inside ``main()`` an actor is *spawned* using an ``ActorNusery`` and is told
|
|
|
|
to run a single function: ``cellar_door()``
|
|
|
|
|
|
|
|
- a ``portal`` instance (we'll get to what it is shortly)
|
|
|
|
returned from ``nursery.run_in_actor()`` is used to communicate with
|
|
|
|
the newly spawned *sub-actor*
|
|
|
|
|
|
|
|
- the second actor, *frank*, in a new *process* running a new ``trio`` task_
|
|
|
|
then executes ``cellar_door()`` and returns its result over a *channel* back
|
|
|
|
to the parent actor
|
|
|
|
|
|
|
|
- the parent actor retrieves the subactor's (*frank*) *final result* using ``portal.result()``
|
|
|
|
much like you'd expect from a future_.
|
|
|
|
|
|
|
|
This ``run_in_actor()`` API should look very familiar to users of
|
|
|
|
``asyncio``'s run_in_executor_ which uses a ``concurrent.futures`` Executor_.
|
|
|
|
|
|
|
|
Since you might also want to spawn long running *worker* or *daemon*
|
|
|
|
actors, each actor's *lifetime* can be determined based on the spawn
|
2018-07-06 06:34:11 +00:00
|
|
|
method:
|
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
- if the actor is spawned using ``run_in_actor()`` it terminates when
|
|
|
|
its *main* task completes (i.e. when the (async) function submitted
|
|
|
|
to it *returns*). The ``with tractor.open_nursery()`` exits only once
|
|
|
|
all actors' main function/task complete (just like the nursery_ in ``trio``)
|
|
|
|
|
|
|
|
- actors can be spawned to *live forever* using the ``start_actor()``
|
|
|
|
method and act like an RPC daemon that runs indefinitely (the
|
|
|
|
``with tractor.open_nursery()`` wont' exit) until cancelled_
|
|
|
|
|
|
|
|
Had we wanted the latter form in our example it would have looked like:
|
|
|
|
|
2018-07-06 06:34:11 +00:00
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
def movie_theatre_question():
|
|
|
|
"""A question asked in a dark theatre, in a tangent
|
|
|
|
(errr, I mean different) process.
|
|
|
|
"""
|
|
|
|
return 'have you ever seen a portal?'
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
"""The main ``tractor`` routine.
|
|
|
|
"""
|
|
|
|
async with tractor.open_nursery() as n:
|
2018-08-03 04:55:50 +00:00
|
|
|
|
2018-07-06 06:34:11 +00:00
|
|
|
portal = await n.start_actor(
|
|
|
|
'frank',
|
|
|
|
# enable the actor to run funcs from this current module
|
|
|
|
rpc_module_paths=[__name__],
|
|
|
|
)
|
|
|
|
|
|
|
|
print(await portal.run(__name__, 'movie_theatre_question'))
|
2018-08-03 04:55:50 +00:00
|
|
|
# call the subactor a 2nd time
|
2018-07-06 06:34:11 +00:00
|
|
|
print(await portal.run(__name__, 'movie_theatre_question'))
|
|
|
|
|
|
|
|
# the async with will block here indefinitely waiting
|
|
|
|
# for our actor "frank" to complete, but since it's an
|
|
|
|
# "outlive_main" actor it will never end until cancelled
|
|
|
|
await portal.cancel_actor()
|
|
|
|
|
2018-07-12 19:18:02 +00:00
|
|
|
|
|
|
|
The ``rpc_module_paths`` `kwarg` above is a list of module path
|
|
|
|
strings that will be loaded and made accessible for execution in the
|
|
|
|
remote actor through a call to ``Portal.run()``. For now this is
|
|
|
|
a simple mechanism to restrict the functionality of the remote
|
2018-08-03 04:55:50 +00:00
|
|
|
(and possibly daemonized) actor and uses Python's module system to
|
|
|
|
limit the allowed remote function namespace(s).
|
2018-07-12 19:18:02 +00:00
|
|
|
|
|
|
|
``tractor`` is opinionated about the underlying threading model used for
|
2018-07-13 15:34:43 +00:00
|
|
|
each *actor*. Since Python has a GIL and an actor model by definition
|
|
|
|
shares no state between actors, it fits naturally to use a multiprocessing_
|
|
|
|
``Process``. This allows ``tractor`` programs to leverage not only multi-core
|
|
|
|
hardware but also distribute over many hardware hosts (each *actor* can talk
|
|
|
|
to all others with ease over standard network protocols).
|
2018-07-06 06:34:11 +00:00
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
.. _task: https://trio.readthedocs.io/en/latest/reference-core.html#tasks-let-you-do-multiple-things-at-once
|
2018-07-06 06:34:11 +00:00
|
|
|
.. _nursery: https://trio.readthedocs.io/en/latest/reference-core.html#nurseries-and-spawning
|
|
|
|
.. _causal: https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#causality
|
|
|
|
.. _cancelled: https://trio.readthedocs.io/en/latest/reference-core.html#child-tasks-and-cancellation
|
2018-08-03 04:55:50 +00:00
|
|
|
.. _run_in_executor: https://docs.python.org/3/library/asyncio-eventloop.html#executor
|
|
|
|
.. _Executor: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
Transparent remote function calling using *portals*
|
|
|
|
---------------------------------------------------
|
|
|
|
``tractor`` introduces the concept of a *portal* which is an API
|
|
|
|
borrowed_ from ``trio``. A portal may seem similar to the idea of
|
2018-07-12 19:18:02 +00:00
|
|
|
a RPC future_ except a *portal* allows invoking remote *async* functions and
|
|
|
|
generators and intermittently blocking to receive responses. This allows
|
|
|
|
for fully async-native IPC between actors.
|
|
|
|
|
|
|
|
When you invoke another actor's routines using a *portal* it looks as though
|
|
|
|
it was called locally in the current actor. So when you see a call to
|
|
|
|
``await portal.run()`` what you get back is what you'd expect
|
2018-07-06 06:34:11 +00:00
|
|
|
to if you'd called the function directly in-process. This approach avoids
|
|
|
|
the need to add any special RPC *proxy* objects to the library by instead just
|
|
|
|
relying on the built-in (async) function calling semantics and protocols of Python.
|
|
|
|
|
|
|
|
Depending on the function type ``Portal.run()`` tries to
|
|
|
|
correctly interface exactly like a local version of the remote
|
2018-07-12 19:18:02 +00:00
|
|
|
built-in Python *function type*. Currently async functions, generators,
|
2018-07-06 06:34:11 +00:00
|
|
|
and regular functions are supported. Inspiration for this API comes
|
|
|
|
from the way execnet_ does `remote function execution`_ but without
|
|
|
|
the client code (necessarily) having to worry about the underlying
|
2018-07-12 19:18:02 +00:00
|
|
|
channels_ system or shipping code over the network.
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
This *portal* approach turns out to be paricularly exciting with the
|
|
|
|
introduction of `asynchronous generators`_ in Python 3.6! It means that
|
|
|
|
actors can compose nicely in a data processing pipeline.
|
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
As an example here's an actor that streams for 1 second from a remote async
|
|
|
|
generator function running in a separate actor:
|
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
from itertools import repeat
|
|
|
|
import trio
|
|
|
|
import tractor
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_forever():
|
|
|
|
for i in repeat("I can see these little future bubble things"):
|
|
|
|
# each yielded value is sent over the ``Channel`` to the
|
|
|
|
# parent actor
|
|
|
|
yield i
|
|
|
|
await trio.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
# stream for at most 1 seconds
|
|
|
|
with trio.move_on_after(1) as cancel_scope:
|
|
|
|
async with tractor.open_nursery() as n:
|
|
|
|
portal = await n.start_actor(
|
|
|
|
f'donny',
|
|
|
|
rpc_module_paths=[__name__],
|
|
|
|
)
|
|
|
|
|
|
|
|
# this async for loop streams values from the above
|
|
|
|
# async generator running in a separate process
|
|
|
|
async for letter in await portal.run(__name__, 'stream_forever'):
|
|
|
|
print(letter)
|
|
|
|
|
|
|
|
# we support trio's cancellation system
|
|
|
|
assert cancel_scope.cancelled_caught
|
|
|
|
assert n.cancelled
|
|
|
|
|
|
|
|
|
|
|
|
tractor.run(main)
|
|
|
|
|
|
|
|
|
|
|
|
Alright, let's get fancy.
|
|
|
|
|
2018-07-12 19:18:02 +00:00
|
|
|
Say you wanted to spawn two actors which each pulling data feeds from
|
2018-07-06 06:34:11 +00:00
|
|
|
two different sources (and wanted this work spread across 2 cpus).
|
|
|
|
You also want to aggregate these feeds, do some processing on them and then
|
2018-08-03 04:55:50 +00:00
|
|
|
deliver the final result stream to a client (or in this case parent) actor
|
|
|
|
and print the results to your screen:
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
import time
|
|
|
|
import trio
|
|
|
|
import tractor
|
|
|
|
|
|
|
|
|
2018-07-12 20:20:38 +00:00
|
|
|
# this is the first 2 actors, streamer_1 and streamer_2
|
2018-07-06 06:34:11 +00:00
|
|
|
async def stream_data(seed):
|
|
|
|
for i in range(seed):
|
|
|
|
yield i
|
|
|
|
await trio.sleep(0) # trigger scheduler
|
|
|
|
|
|
|
|
|
2018-07-12 20:20:38 +00:00
|
|
|
# this is the third actor; the aggregator
|
2018-07-06 06:34:11 +00:00
|
|
|
async def aggregate(seed):
|
|
|
|
"""Ensure that the two streams we receive match but only stream
|
|
|
|
a single set of values to the parent.
|
|
|
|
"""
|
|
|
|
async with tractor.open_nursery() as nursery:
|
|
|
|
portals = []
|
|
|
|
for i in range(1, 3):
|
|
|
|
# fork point
|
|
|
|
portal = await nursery.start_actor(
|
|
|
|
name=f'streamer_{i}',
|
|
|
|
rpc_module_paths=[__name__],
|
|
|
|
)
|
|
|
|
|
|
|
|
portals.append(portal)
|
|
|
|
|
|
|
|
q = trio.Queue(500)
|
|
|
|
|
|
|
|
async def push_to_q(portal):
|
|
|
|
async for value in await portal.run(
|
|
|
|
__name__, 'stream_data', seed=seed
|
|
|
|
):
|
|
|
|
# leverage trio's built-in backpressure
|
|
|
|
await q.put(value)
|
|
|
|
|
|
|
|
await q.put(None)
|
|
|
|
print(f"FINISHED ITERATING {portal.channel.uid}")
|
|
|
|
|
|
|
|
# spawn 2 trio tasks to collect streams and push to a local queue
|
|
|
|
async with trio.open_nursery() as n:
|
|
|
|
for portal in portals:
|
|
|
|
n.start_soon(push_to_q, portal)
|
|
|
|
|
|
|
|
unique_vals = set()
|
|
|
|
async for value in q:
|
|
|
|
if value not in unique_vals:
|
|
|
|
unique_vals.add(value)
|
|
|
|
# yield upwards to the spawning parent actor
|
|
|
|
yield value
|
|
|
|
|
|
|
|
if value is None:
|
|
|
|
break
|
|
|
|
|
|
|
|
assert value in unique_vals
|
|
|
|
|
|
|
|
print("FINISHED ITERATING in aggregator")
|
|
|
|
|
|
|
|
await nursery.cancel()
|
|
|
|
print("WAITING on `ActorNursery` to finish")
|
|
|
|
print("AGGREGATOR COMPLETE!")
|
|
|
|
|
|
|
|
|
2018-07-12 20:20:38 +00:00
|
|
|
# this is the main actor and *arbiter*
|
2018-07-06 06:34:11 +00:00
|
|
|
async def main():
|
|
|
|
# a nursery which spawns "actors"
|
|
|
|
async with tractor.open_nursery() as nursery:
|
|
|
|
|
|
|
|
seed = int(1e3)
|
|
|
|
import time
|
|
|
|
pre_start = time.time()
|
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
portal = await nursery.run_in_actor(
|
|
|
|
'aggregator',
|
|
|
|
aggregate,
|
|
|
|
seed=seed,
|
2018-07-06 06:34:11 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
start = time.time()
|
|
|
|
# the portal call returns exactly what you'd expect
|
2018-08-03 04:55:50 +00:00
|
|
|
# as if the remote "aggregate" function was called locally
|
2018-07-06 06:34:11 +00:00
|
|
|
result_stream = []
|
|
|
|
async for value in await portal.result():
|
|
|
|
result_stream.append(value)
|
|
|
|
|
|
|
|
print(f"STREAM TIME = {time.time() - start}")
|
|
|
|
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
|
|
|
|
assert result_stream == list(range(seed)) + [None]
|
|
|
|
return result_stream
|
|
|
|
|
|
|
|
|
|
|
|
final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616))
|
|
|
|
|
|
|
|
|
|
|
|
Here there's four actors running in separate processes (using all the
|
2018-08-03 04:55:50 +00:00
|
|
|
cores on you machine). Two are streaming by *yielding* values from the
|
2018-07-13 15:34:43 +00:00
|
|
|
``stream_data()`` async generator, one is aggregating values from
|
|
|
|
those two in ``aggregate()`` (also an async generator) and shipping the
|
2018-07-06 06:34:11 +00:00
|
|
|
single stream of unique values up the parent actor (the ``'MainProcess'``
|
|
|
|
as ``multiprocessing`` calls it) which is running ``main()``.
|
|
|
|
|
2018-07-12 19:18:02 +00:00
|
|
|
.. _future: https://en.wikipedia.org/wiki/Futures_and_promises
|
|
|
|
.. _borrowed:
|
|
|
|
https://trio.readthedocs.io/en/latest/reference-core.html#getting-back-into-the-trio-thread-from-another-thread
|
2018-07-06 06:34:11 +00:00
|
|
|
.. _asynchronous generators: https://www.python.org/dev/peps/pep-0525/
|
|
|
|
.. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i
|
|
|
|
.. _asyncitertools: https://github.com/vodik/asyncitertools
|
|
|
|
|
|
|
|
|
|
|
|
Cancellation
|
|
|
|
------------
|
2018-08-03 04:55:50 +00:00
|
|
|
``tractor`` supports ``trio``'s cancellation_ system verbatim.
|
2018-07-12 19:18:02 +00:00
|
|
|
Cancelling a nursery block cancels all actors spawned by it.
|
2018-07-13 15:34:43 +00:00
|
|
|
Eventually ``tractor`` plans to support different `supervision strategies`_ like ``erlang``.
|
|
|
|
|
|
|
|
.. _supervision strategies: http://erlang.org/doc/man/supervisor.html#sup_flags
|
2018-07-12 19:18:02 +00:00
|
|
|
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
Remote error propagation
|
|
|
|
------------------------
|
|
|
|
Any task invoked in a remote actor should ship any error(s) back to the calling
|
2018-08-03 04:55:50 +00:00
|
|
|
actor where it is raised and expected to be dealt with. This way remote actors
|
2018-07-06 06:34:11 +00:00
|
|
|
are never cancelled unless explicitly asked or there's a bug in ``tractor`` itself.
|
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
async def assert_err():
|
|
|
|
assert 0
|
|
|
|
|
2018-08-03 04:55:50 +00:00
|
|
|
|
2018-07-06 06:34:11 +00:00
|
|
|
async def main():
|
|
|
|
async with tractor.open_nursery() as n:
|
|
|
|
real_actors = []
|
|
|
|
for i in range(3):
|
|
|
|
real_actors.append(await n.start_actor(
|
|
|
|
f'actor_{i}',
|
|
|
|
rpc_module_paths=[__name__],
|
|
|
|
))
|
|
|
|
|
|
|
|
# start one actor that will fail immediately
|
2018-08-03 04:55:50 +00:00
|
|
|
await n.run_in_actor('extra', assert_err)
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
# should error here with a ``RemoteActorError`` containing
|
|
|
|
# an ``AssertionError`` and all the other actors have been cancelled
|
|
|
|
|
|
|
|
try:
|
|
|
|
# also raises
|
|
|
|
tractor.run(main)
|
|
|
|
except tractor.RemoteActorError:
|
|
|
|
print("Look Maa that actor failed hard, hehhh!")
|
|
|
|
|
|
|
|
|
|
|
|
You'll notice the nursery cancellation conducts a *one-cancels-all*
|
|
|
|
supervisory strategy `exactly like trio`_. The plan is to add more
|
|
|
|
`erlang strategies`_ in the near future by allowing nurseries to accept
|
|
|
|
a ``Supervisor`` type.
|
|
|
|
|
|
|
|
.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics
|
|
|
|
.. _erlang strategies: http://learnyousomeerlang.com/supervisors
|
|
|
|
|
|
|
|
|
|
|
|
Shared task state
|
|
|
|
-----------------
|
|
|
|
Although ``tractor`` uses a *shared-nothing* architecture between processes
|
|
|
|
you can of course share state within an actor. ``trio`` tasks spawned via
|
|
|
|
multiple RPC calls to an actor can access global data using the per actor
|
|
|
|
``statespace`` dictionary:
|
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
|
|
|
|
statespace = {'doggy': 10}
|
|
|
|
|
|
|
|
|
|
|
|
def check_statespace():
|
|
|
|
# Remember this runs in a new process so no changes
|
|
|
|
# will propagate back to the parent actor
|
|
|
|
assert tractor.current_actor().statespace == statespace
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
async with tractor.open_nursery() as n:
|
2018-08-03 04:55:50 +00:00
|
|
|
await n.run_in_actor(
|
|
|
|
'checker',
|
|
|
|
check_statespace,
|
2018-07-06 06:34:11 +00:00
|
|
|
statespace=statespace
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2018-07-13 15:34:43 +00:00
|
|
|
Of course you don't have to use the ``statespace`` variable (it's mostly
|
|
|
|
a convenience for passing simple data to newly spawned actors); building
|
|
|
|
out a state sharing system per-actor is totally up to you.
|
|
|
|
|
|
|
|
|
2018-07-06 06:34:11 +00:00
|
|
|
How do actors find each other (a poor man's *service discovery*)?
|
|
|
|
-----------------------------------------------------------------
|
|
|
|
Though it will be built out much more in the near future, ``tractor``
|
|
|
|
currently keeps track of actors by ``(name: str, id: str)`` using a
|
|
|
|
special actor called the *arbiter*. Currently the *arbiter* must exist
|
|
|
|
on a host (or it will be created if one can't be found) and keeps a
|
|
|
|
simple ``dict`` of actor names to sockets for discovery by other actors.
|
|
|
|
Obviously this can be made more sophisticated (help me with it!) but for
|
|
|
|
now it does the trick.
|
|
|
|
|
|
|
|
To find the arbiter from the current actor use the ``get_arbiter()`` function and to
|
|
|
|
find an actor's socket address by name use the ``find_actor()`` function:
|
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
import tractor
|
|
|
|
|
|
|
|
|
|
|
|
async def main(service_name):
|
|
|
|
|
|
|
|
async with tractor.get_arbiter() as portal:
|
|
|
|
print(f"Arbiter is listening on {portal.channel}")
|
|
|
|
|
|
|
|
async with tractor.find_actor(service_name) as sockaddr:
|
|
|
|
print(f"my_service is found at {my_service}")
|
|
|
|
|
|
|
|
|
|
|
|
tractor.run(main, service_name)
|
|
|
|
|
|
|
|
|
|
|
|
The ``name`` value you should pass to ``find_actor()`` is the one you passed as the
|
|
|
|
*first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``.
|
|
|
|
|
|
|
|
|
|
|
|
Using ``Channel`` directly (undocumented)
|
|
|
|
-----------------------------------------
|
|
|
|
You can use the ``Channel`` api if necessary by simply defining a
|
|
|
|
``chan`` and ``cid`` *kwarg* in your async function definition.
|
|
|
|
``tractor`` will treat such async functions like async generators on
|
|
|
|
the calling side (for now anyway) such that you can push stream values
|
|
|
|
a little more granularly if you find *yielding* values to be restrictive.
|
|
|
|
I am purposely not documenting this feature with code because I'm not yet
|
|
|
|
sure yet how it should be used correctly. If you'd like more details
|
|
|
|
please feel free to ask me on the `trio gitter channel`_.
|
|
|
|
|
|
|
|
|
|
|
|
Running actors standalone (without spawning)
|
|
|
|
--------------------------------------------
|
|
|
|
You don't have to spawn any actors using ``open_nursery()`` if you just
|
|
|
|
want to run a single actor that connects to an existing cluster.
|
|
|
|
All the comms and arbiter registration stuff still works. This can
|
|
|
|
somtimes turn out being handy when debugging mult-process apps when you
|
|
|
|
need to hop into a debugger. You just need to pass the existing
|
|
|
|
*arbiter*'s socket address you'd like to connect to:
|
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
tractor.run(main, arbiter_addr=('192.168.0.10', 1616))
|
|
|
|
|
|
|
|
|
|
|
|
Enabling logging
|
|
|
|
----------------
|
|
|
|
Considering how complicated distributed software can become it helps to know
|
|
|
|
what exactly it's doing (even at the lowest levels). Luckily ``tractor`` has
|
|
|
|
tons of logging throughout the core. ``tractor`` isn't opinionated on
|
|
|
|
how you use this information and users are expected to consume log messages in
|
|
|
|
whichever way is appropriate for the system at hand. That being said, when hacking
|
2018-07-12 19:18:02 +00:00
|
|
|
on ``tractor`` there is a prettified console formatter which you can enable to
|
2018-07-06 06:34:11 +00:00
|
|
|
see what the heck is going on. Just put the following somewhere in your code:
|
|
|
|
|
|
|
|
.. code:: python
|
|
|
|
|
|
|
|
from tractor.log import get_console_log
|
|
|
|
log = get_console_log('trace')
|
|
|
|
|
|
|
|
|
|
|
|
What the future holds
|
|
|
|
---------------------
|
|
|
|
Stuff I'd like to see ``tractor`` do one day:
|
|
|
|
|
|
|
|
- erlang-like supervisors_
|
|
|
|
- native support for zeromq_ as a channel transport
|
|
|
|
- native `gossip protocol`_ support for service discovery and arbiter election
|
|
|
|
- a distributed log ledger for tracking cluster behaviour
|
|
|
|
- a slick multi-process aware debugger much like in celery_
|
|
|
|
but with better `pdb++`_ support
|
2018-08-03 04:55:50 +00:00
|
|
|
- an extensive `chaos engineering`_ test suite
|
|
|
|
- support for reactive programming primitives and native support for asyncitertools_ like libs
|
2018-07-06 06:34:11 +00:00
|
|
|
|
|
|
|
If you're interested in tackling any of these please do shout about it on the
|
|
|
|
`trio gitter channel`_!
|
|
|
|
|
|
|
|
.. _supervisors: http://learnyousomeerlang.com/supervisors
|
|
|
|
.. _zeromq: https://en.wikipedia.org/wiki/ZeroMQ
|
|
|
|
.. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol
|
|
|
|
.. _trio gitter channel: https://gitter.im/python-trio/general
|
|
|
|
.. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html
|
|
|
|
.. _pdb++: https://github.com/antocuni/pdb
|