forked from goodboy/tractor
1
0
Fork 0
'structured concurrent `trio`-"actors"'
Go to file
Tyler Goodlet 0e7db46631 Revert auto-gen readme and merge in auto-gen code blocks by hand for now 2021-02-25 09:10:18 -05:00
.github/workflows Include Python 3.9 in CI 2020-12-27 13:28:54 -05:00
docs Revert auto-gen readme and merge in auto-gen code blocks by hand for now 2021-02-25 09:10:18 -05:00
examples Remove tractor run from some debug examples 2021-02-24 13:14:40 -05:00
tests Use root as default name from `tractor.run()` 2021-02-25 08:51:28 -05:00
tractor Use root as default name from `tractor.run()` 2021-02-25 08:51:28 -05:00
.gitignore Initial commit 2018-07-05 16:01:15 -04:00
LICENSE Initial commit 2018-07-05 16:01:15 -04:00
mypy.ini Add mypy.ini lel 2020-01-21 15:28:12 -05:00
requirements-docs.txt Added logo, fixed github links and grammar issues 2020-08-31 11:49:14 -03:00
requirements-test.txt Add `pexpect` dep for debugger tests 2020-10-13 11:04:16 -04:00
setup.py Test putting readme in docs dir 2021-02-21 17:52:04 -05:00

docs/README.rst

tractor

The Python async-native multi-core system you always wanted.

gh_actions Documentation Status

tractor is a structured concurrent "actor model" built on trio and multi-processing.

It is an attempt to pair trionic structured concurrency with distributed Python. You can think of it as a trio -across-processes or simply as an opinionated replacement for the stdlib's multiprocessing but built on async programming primitives from the ground up.

Don't be scared off by this description. tractor is just ``trio`` but with nurseries for process management and cancel-able IPC. If you understand how to work with trio, tractor will give you the parallelism you've been missing.

tractor's nurseries let you spawn trio "actors": new Python processes which each run a trio scheduled task tree (also known as an async sandwich - a call to trio.run()). That is, each "Actor" is a new process plus a trio runtime.

"Actors" communicate by exchanging asynchronous messages and avoid sharing state. The intention of this model is to allow for highly distributed software that, through the adherence to structured concurrency, results in systems which fail in predictable and recoverable ways.

The first step to grok tractor is to get the basics of trio down. A great place to start is the trio docs and this blog post.

Install

No PyPi release yet!

pip install git+git://github.com/goodboy/tractor.git

Alluring Features

  • It's just trio, but with SC applied to processes (aka "actors")
  • Infinitely nesteable process trees
  • Built-in API for inter-process streaming
  • A (first ever?) "native" multi-core debugger for Python using pdb++
  • (Soon to land) asyncio support allowing for "infected" actors where trio drives the asyncio scheduler via the astounding "guest mode"

Example: self-destruct a process tree

"""
Run with a process monitor from a terminal using:
$TERM -e watch -n 0.1  "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $!

"""
from multiprocessing import cpu_count
import os

import tractor
import trio


async def target():
    print(f"Yo, i'm '{tractor.current_actor().name}' "
          f"running in pid {os.getpid()}")
    await trio.sleep_forever()


async def main():

    async with tractor.open_nursery() as n:

        for i in range(cpu_count()):
            await n.run_in_actor(target, name=f'worker_{i}')

        print('This process tree will self-destruct in 1 sec...')
        await trio.sleep(1)

        # you could have done this yourself
        raise Exception('Self Destructed')


if __name__ == '__main__':
    try:
        trio.run(main)
    except Exception:
        print('Zombies Contained')

The example you're probably after...

It seems the initial query from most new users is "how do I make a worker pool thing?".

tractor is built to handle any SC process tree you can imagine; the "worker pool" pattern is a trivial special case:

"""
Demonstration of the prime number detector example from the
``concurrent.futures`` docs:

https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example

This uses no extra threads, fancy semaphores or futures; all we need 
is ``tractor``'s channels.

"""
from contextlib import asynccontextmanager
from typing import List, Callable
import itertools
import math
import time

import tractor
import trio
from async_generator import aclosing


PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419,
]


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


@asynccontextmanager
async def worker_pool(workers=4):
    """Though it's a trivial special case for ``tractor``, the well
    known "worker pool" seems to be the defacto "but, I want this
    process pattern!" for most parallelism pilgrims.

    Yes, the workers stay alive (and ready for work) until you close
    the context.
    """
    async with tractor.open_nursery() as tn:

        portals = []
        snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))

        for i in range(workers):

            # this starts a new sub-actor (process + trio runtime) and
            # stores it's "portal" for later use to "submit jobs" (ugh).
            portals.append(
                await tn.start_actor(
                    f'worker_{i}',
                    enable_modules=[__name__],
                )
            )

        async def _map(
            worker_func: Callable[[int], bool],
            sequence: List[int]
        ) -> List[bool]:

            # define an async (local) task to collect results from workers
            async def send_result(func, value, portal):
                await snd_chan.send((value, await portal.run(func, n=value)))

            async with trio.open_nursery() as n:

                for value, portal in zip(sequence, itertools.cycle(portals)):
                    n.start_soon(
                        send_result,
                        worker_func,
                        value,
                        portal
                    )

                # deliver results as they arrive
                for _ in range(len(sequence)):
                    yield await recv_chan.receive()

        # deliver the parallel "worker mapper" to user code
        yield _map

        # tear down all "workers" on pool close
        await tn.cancel()


async def main():

    async with worker_pool() as actor_map:

        start = time.time()

        async with aclosing(actor_map(is_prime, PRIMES)) as results:
            async for number, prime in results:

                print(f'{number} is prime: {prime}')

        print(f'processing took {time.time() - start} seconds')


if __name__ == '__main__':
    start = time.time()
    trio.run(main)
    print(f'script took {time.time() - start} seconds')

Feel like saying hi?

This project is very much coupled to the ongoing development of trio (i.e. tractor gets most of its ideas from that brilliant community). If you want to help, have suggestions or just want to say hi, please feel free to reach us in our matrix channel. If matrix seems too hip, we're also mostly all in the the trio gitter channel!