forked from goodboy/tractor
				
			Merge pull request #176 from goodboy/eg_worker_poolz
Add our version of the std lib's "worker pool"first_pypi_release
						commit
						35775c6763
					
				| 
						 | 
				
			
			@ -0,0 +1,40 @@
 | 
			
		|||
import time
 | 
			
		||||
import concurrent.futures
 | 
			
		||||
import math
 | 
			
		||||
 | 
			
		||||
PRIMES = [
 | 
			
		||||
    112272535095293,
 | 
			
		||||
    112582705942171,
 | 
			
		||||
    112272535095293,
 | 
			
		||||
    115280095190773,
 | 
			
		||||
    115797848077099,
 | 
			
		||||
    1099726899285419]
 | 
			
		||||
 | 
			
		||||
def is_prime(n):
 | 
			
		||||
    if n < 2:
 | 
			
		||||
        return False
 | 
			
		||||
    if n == 2:
 | 
			
		||||
        return True
 | 
			
		||||
    if n % 2 == 0:
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    sqrt_n = int(math.floor(math.sqrt(n)))
 | 
			
		||||
    for i in range(3, sqrt_n + 1, 2):
 | 
			
		||||
        if n % i == 0:
 | 
			
		||||
            return False
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    with concurrent.futures.ProcessPoolExecutor() as executor:
 | 
			
		||||
        start = time.time()
 | 
			
		||||
 | 
			
		||||
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
 | 
			
		||||
            print('%d is prime: %s' % (number, prime))
 | 
			
		||||
 | 
			
		||||
        print(f'processing took {time.time() - start} seconds')
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
 | 
			
		||||
    start = time.time()
 | 
			
		||||
    main()
 | 
			
		||||
    print(f'script took {time.time() - start} seconds')
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,119 @@
 | 
			
		|||
"""
 | 
			
		||||
Demonstration of the prime number detector example from the
 | 
			
		||||
``concurrent.futures`` docs:
 | 
			
		||||
 | 
			
		||||
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example
 | 
			
		||||
 | 
			
		||||
This uses no extra threads, fancy semaphores or futures; all we need 
 | 
			
		||||
is ``tractor``'s channels.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from typing import List, Callable
 | 
			
		||||
import itertools
 | 
			
		||||
import math
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
import tractor
 | 
			
		||||
import trio
 | 
			
		||||
from async_generator import aclosing
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
PRIMES = [
 | 
			
		||||
    112272535095293,
 | 
			
		||||
    112582705942171,
 | 
			
		||||
    112272535095293,
 | 
			
		||||
    115280095190773,
 | 
			
		||||
    115797848077099,
 | 
			
		||||
    1099726899285419,
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_prime(n):
 | 
			
		||||
    if n < 2:
 | 
			
		||||
        return False
 | 
			
		||||
    if n == 2:
 | 
			
		||||
        return True
 | 
			
		||||
    if n % 2 == 0:
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    sqrt_n = int(math.floor(math.sqrt(n)))
 | 
			
		||||
    for i in range(3, sqrt_n + 1, 2):
 | 
			
		||||
        if n % i == 0:
 | 
			
		||||
            return False
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def worker_pool(workers=4):
 | 
			
		||||
    """Though it's a trivial special case for ``tractor``, the well
 | 
			
		||||
    known "worker pool" seems to be the defacto "but, I want this
 | 
			
		||||
    process pattern!" for most parallelism pilgrims.
 | 
			
		||||
 | 
			
		||||
    Yes, the workers stay alive (and ready for work) until you close
 | 
			
		||||
    the context.
 | 
			
		||||
    """
 | 
			
		||||
    async with tractor.open_nursery() as tn:
 | 
			
		||||
 | 
			
		||||
        portals = []
 | 
			
		||||
        snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))
 | 
			
		||||
 | 
			
		||||
        for i in range(workers):
 | 
			
		||||
 | 
			
		||||
            # this starts a new sub-actor (process + trio runtime) and
 | 
			
		||||
            # stores it's "portal" for later use to "submit jobs" (ugh).
 | 
			
		||||
            portals.append(
 | 
			
		||||
                await tn.start_actor(
 | 
			
		||||
                    f'worker_{i}',
 | 
			
		||||
                    enable_modules=[__name__],
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        async def _map(
 | 
			
		||||
            worker_func: Callable[[int], bool],
 | 
			
		||||
            sequence: List[int]
 | 
			
		||||
        ) -> List[bool]:
 | 
			
		||||
 | 
			
		||||
            # define an async (local) task to collect results from workers
 | 
			
		||||
            async def send_result(func, value, portal):
 | 
			
		||||
                await snd_chan.send((value, await portal.run(func, n=value)))
 | 
			
		||||
 | 
			
		||||
            async with trio.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
                for value, portal in zip(sequence, itertools.cycle(portals)):
 | 
			
		||||
                    n.start_soon(
 | 
			
		||||
                        send_result,
 | 
			
		||||
                        worker_func,
 | 
			
		||||
                        value,
 | 
			
		||||
                        portal
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                # deliver results as they arrive
 | 
			
		||||
                for _ in range(len(sequence)):
 | 
			
		||||
                    yield await recv_chan.receive()
 | 
			
		||||
 | 
			
		||||
        # deliver the parallel "worker mapper" to user code
 | 
			
		||||
        yield _map
 | 
			
		||||
 | 
			
		||||
        # tear down all "workers" on pool close
 | 
			
		||||
        await tn.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def main():
 | 
			
		||||
 | 
			
		||||
    async with worker_pool() as actor_map:
 | 
			
		||||
 | 
			
		||||
        start = time.time()
 | 
			
		||||
 | 
			
		||||
        async with aclosing(actor_map(is_prime, PRIMES)) as results:
 | 
			
		||||
            async for number, prime in results:
 | 
			
		||||
 | 
			
		||||
                print(f'{number} is prime: {prime}')
 | 
			
		||||
 | 
			
		||||
        print(f'processing took {time.time() - start} seconds')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    start = time.time()
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
    print(f'script took {time.time() - start} seconds')
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,37 @@
 | 
			
		|||
"""
 | 
			
		||||
Run with a process monitor from a terminal using:
 | 
			
		||||
$TERM -e watch -n 0.1  "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $!
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
from multiprocessing import cpu_count
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
import tractor
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def target():
 | 
			
		||||
    print(f"Yo, i'm '{tractor.current_actor().name}' "
 | 
			
		||||
          f"running in pid {os.getpid()}")
 | 
			
		||||
    await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def main():
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
        for i in range(cpu_count()):
 | 
			
		||||
            await n.run_in_actor(target, name=f'worker_{i}')
 | 
			
		||||
 | 
			
		||||
        print('This process tree will self-destruct in 1 sec...')
 | 
			
		||||
        await trio.sleep(1)
 | 
			
		||||
 | 
			
		||||
        # you could have done this yourself
 | 
			
		||||
        raise Exception('Self Destructed')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    try:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
    except Exception:
 | 
			
		||||
        print('Zombies Contained')
 | 
			
		||||
							
								
								
									
										2
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										2
									
								
								setup.py
								
								
								
								
							| 
						 | 
				
			
			@ -18,7 +18,7 @@
 | 
			
		|||
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
from setuptools import setup
 | 
			
		||||
 | 
			
		||||
with open('README.rst', encoding='utf-8') as f:
 | 
			
		||||
with open('docs/README.rst', encoding='utf-8') as f:
 | 
			
		||||
    readme = f.read()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -78,13 +78,15 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
 | 
			
		|||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'example_script',
 | 
			
		||||
    [
 | 
			
		||||
        f for f in os.listdir(examples_dir())
 | 
			
		||||
        if (
 | 
			
		||||
            ('__' not in f) and
 | 
			
		||||
            ('debugging' not in f)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # walk yields: (dirpath, dirnames, filenames)
 | 
			
		||||
    [(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
 | 
			
		||||
 | 
			
		||||
        if '__' not in f
 | 
			
		||||
        and f[0] != '_'
 | 
			
		||||
        and 'debugging' not in p[0]
 | 
			
		||||
    ],
 | 
			
		||||
    ids=lambda t: t[1],
 | 
			
		||||
)
 | 
			
		||||
def test_example(run_example_in_subproc, example_script):
 | 
			
		||||
    """Load and run scripts from this repo's ``examples/`` dir as a user
 | 
			
		||||
| 
						 | 
				
			
			@ -95,7 +97,7 @@ def test_example(run_example_in_subproc, example_script):
 | 
			
		|||
    test directory and invoke the script as a module with ``python -m
 | 
			
		||||
    test_example``.
 | 
			
		||||
    """
 | 
			
		||||
    ex_file = os.path.join(examples_dir(), example_script)
 | 
			
		||||
    ex_file = os.path.join(*example_script)
 | 
			
		||||
    with open(ex_file, 'r') as ex:
 | 
			
		||||
        code = ex.read()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,6 +6,8 @@ semaphore tracker per ``MainProcess``.
 | 
			
		|||
.. note:: There is no type hinting in this code base (yet) to remain as
 | 
			
		||||
          a close as possible to upstream.
 | 
			
		||||
"""
 | 
			
		||||
# type: ignore
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
import socket
 | 
			
		||||
import signal
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue