Compare commits

...

16 Commits

Author SHA1 Message Date
Tyler Goodlet f1acbd9b84 Stash the type string from remote errors 2021-01-27 14:41:17 -05:00
Tyler Goodlet 4a4a786763 Add a super basic supervisor/restart example 2021-01-27 14:40:55 -05:00
Tyler Goodlet 70c7e09831 Add class style "actors" example with client proxy API 2021-01-24 20:41:03 -05:00
Tyler Goodlet 47d7b603db Use a global dataclass instead, cuz we like "objects"? 2021-01-24 15:18:52 -05:00
Tyler Goodlet 7f8c5cdfe6 Add an actor "state mutation" via messages example 2021-01-24 14:54:46 -05:00
Tyler Goodlet ce61230815 Fix more stdlib typing issues with latest mypy 2021-01-24 14:53:58 -05:00
Tyler Goodlet 5da86a0e48 Ignore type checks on stdlib overrides 2021-01-24 14:53:58 -05:00
Tyler Goodlet 2ed071c903 Add `aclosing()` around asyn gen loop 2021-01-24 14:53:58 -05:00
Tyler Goodlet 5db737d368 Run parallel examples 2021-01-24 14:53:58 -05:00
Tyler Goodlet 47651eaf91 Contain the error 2021-01-24 14:53:58 -05:00
Tyler Goodlet 582eda4afd Add concise readme example 2021-01-24 14:53:58 -05:00
Tyler Goodlet dae154e470 More comments 2021-01-24 14:53:58 -05:00
Tyler Goodlet a10c4b172a Yield results on demand using a mem chan 2021-01-24 14:53:58 -05:00
Tyler Goodlet f715a0cae8 Remove use of tractor.run() 2021-01-24 14:53:58 -05:00
Tyler Goodlet da27d96682 Make new paralellism example space 2021-01-24 14:53:58 -05:00
Tyler Goodlet 9fae34a190 Add our version of the std lib's "worker pool"
This is a draft of the `tractor` way to implement the example from the
"processs pool" in the stdlib's `concurrent.futures` docs:

https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example

Our runtime is of course slower to startup but once up we of course get
the same performance, this confirms that we need to focus some effort
not on warm up and teardown times.  The mp forkserver method definitely
improves startup delay; rolling our own will likely be a good hot spot
to play with.

What's really nice is our implementation is done in approx 10th the code ;)

Also, do we want offer and interface that yields results as they arrive?

Relates to #175
2021-01-24 14:53:58 -05:00
10 changed files with 513 additions and 10 deletions

View File

@ -0,0 +1,86 @@
import trio
import tractor
class Restart(Exception):
"""Restart signal"""
async def sleep_then_restart():
actor = tractor.current_actor()
print(f'{actor.uid} starting up!')
await trio.sleep(0.5)
raise Restart('This is a restart signal')
async def signal_restart_whole_actor():
actor = tractor.current_actor()
print(f'{actor.uid} starting up!')
await trio.sleep(0.5)
return 'restart_me'
async def respawn_remote_task(portal):
# start a task in the actor at the other end
# of the provided portal, when it signals a restart,
# restart it..
# This is much more efficient then restarting the undlerying
# process over and over since the python interpreter runtime
# stays up and we just submit a new task to run (which
# is just the original one we submitted repeatedly.
while True:
try:
await portal.run(sleep_then_restart)
except tractor.RemoteActorError as error:
if 'Restart' in str(error):
# respawn the actor task
continue
async def supervisor():
async with tractor.open_nursery() as tn:
p0 = await tn.start_actor('task_restarter', enable_modules=[__name__])
# Yes, you can do this from multiple tasks on one actor
# or mulitple lone tasks in multiple subactors.
# We'll show both.
async with trio.open_nursery() as n:
# we'll doe the first as a lone task restart in a daemon actor
for i in range(4):
n.start_soon(respawn_remote_task, p0)
# Open another nursery that will respawn sub-actors
# spawn a set of subactors that will signal restart
# of the group of processes on each failures
portals = []
# start initial subactor set
for i in range(4):
p = await tn.run_in_actor(signal_restart_whole_actor)
portals.append(p)
# now wait on results and respawn actors
# that request it
while True:
for p in portals:
result = await p.result()
if result == 'restart_me':
print(f'restarting {p.channel.uid}')
await p.cancel_actor()
await trio.sleep(0.5)
p = await tn.run_in_actor(signal_restart_whole_actor)
portals.append(p)
# this will block indefinitely so user must
# cancel with ctrl-c
if __name__ == '__main__':
trio.run(supervisor)

View File

@ -0,0 +1,64 @@
from itertools import cycle
from pprint import pformat
from dataclasses import dataclass, field
import trio
import tractor
@dataclass
class MyProcessStateThing:
state: dict = field(default_factory=dict)
def update(self, msg: dict):
self.state.update(msg)
_actor_state = MyProcessStateThing()
async def update_local_state(msg: dict):
"""Update process-local state from sent message and exit.
"""
actor = tractor.current_actor()
global _actor_state
print(f'Yo we got a message {msg}')
# update the "actor state"
_actor_state.update(msg)
print(f'New local "state" for {actor.uid} is {pformat(_actor_state.state)}')
# we're done so exit this task running in the subactor
async def main():
# Main process/thread that spawns one sub-actor and sends messages
# to it to update it's state.
actor_portals = []
# XXX: that subactor can **not** outlive it's parent, this is SC.
async with tractor.open_nursery() as tn:
portal = await tn.start_actor('even_boy', enable_modules=[__name__])
actor_portals.append(portal)
portal = await tn.start_actor('odd_boy', enable_modules=[__name__])
actor_portals.append(portal)
for i, (count, portal) in enumerate(
zip(range(100), cycle(actor_portals))
):
await portal.run(update_local_state, msg={f'msg_{i}': count})
# blocks here indefinitely synce we spawned "daemon actors" using
# .start_actor()`, you'll need to control-c to cancel.
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,153 @@
import inspect
from typing import Any
from functools import partial
from contextlib import asynccontextmanager, AsyncExitStack
from itertools import cycle
from pprint import pformat
import trio
import tractor
log = tractor.log.get_logger(__name__)
class ActorState:
"""Singlteton actor per process.
"""
# this is a class defined variable and is thus both
# singleton across object instances and task safe.
state: dict = {}
def update(self, msg: dict) -> None:
_actor = tractor.current_actor()
print(f'Yo we got a message {msg}')
self.state.update(msg)
print(f'New local "state" for {_actor.uid} is {pformat(self.state)}')
def close(self):
# gives headers showing which process and task is active
log.info('Actor state is closing')
# if we wanted to support spawning or talking to other
# actors we can do that using a portal map collection?
# _portals: dict = {}
async def _run_proxy_method(
meth: str,
msg: dict,
) -> Any:
"""Update process-local state from sent message and exit.
"""
# Create a new actor instance per call.
# We can make this persistent by storing it either
# in a global var or are another clas scoped variable?
# If you want it somehow persisted in another namespace
# I'd be interested to know "where".
actor = ActorState()
if meth != 'close':
return getattr(actor, meth)(msg)
else:
actor.close()
# we're done so exit this task running in the subactor
class MethodProxy:
def __init__(
self,
portal: tractor._portal.Portal
) -> None:
self._portal = portal
async def _run_method(
self,
*,
meth: str,
msg: dict,
) -> Any:
return await self._portal.run(
_run_proxy_method,
meth=meth,
msg=msg
)
def get_method_proxy(portal, target=ActorState) -> MethodProxy:
proxy = MethodProxy(portal)
# mock all remote methods
for name, method in inspect.getmembers(
target, predicate=inspect.isfunction
):
if '_' == name[0]:
# skip private methods
continue
else:
setattr(proxy, name, partial(proxy._run_method, meth=name))
return proxy
@asynccontextmanager
async def spawn_proxy_actor(name):
# XXX: that subactor can **not** outlive it's parent, this is SC.
async with tractor.open_nursery(
debug_mode=True,
# loglevel='info',
) as tn:
portal = await tn.start_actor(name, enable_modules=[__name__])
proxy = get_method_proxy(portal)
yield proxy
await proxy.close(msg=None)
async def main():
# Main process/thread that spawns one sub-actor and sends messages
# to it to update it's state.
try:
stack = AsyncExitStack()
actors = []
for name in ['even', 'odd']:
actor_proxy = await stack.enter_async_context(
spawn_proxy_actor(name + '_boy')
)
actors.append(actor_proxy)
# spin through the actors and update their states
for i, (count, actor) in enumerate(
zip(range(100), cycle(actors))
):
# Here we call the locally patched `.update()` method of the
# remote instance
# NOTE: the instance created each call here is currently
# a new object - to persist it across `portal.run()` calls
# we need to store it somewhere in memory for access by
# a new task spawned in the remote actor process.
await actor.update(msg={f'msg_{i}': count})
# blocks here indefinitely synce we spawned "daemon actors" using
# .start_actor()`, you'll need to control-c to cancel.
finally:
await stack.aclose()
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,119 @@
"""
Demonstration of the prime number detector example from the
``concurrent.futures`` docs:
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example
This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s channels.
"""
from contextlib import asynccontextmanager
from typing import List, Callable
import itertools
import math
import time
import tractor
import trio
from async_generator import aclosing
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
@asynccontextmanager
async def worker_pool(workers=4):
"""Though it's a trivial special case for ``tractor``, the well
known "worker pool" seems to be the defacto "but, I want this
process pattern!" for most parallelism pilgrims.
Yes, the workers stay alive (and ready for work) until you close
the context.
"""
async with tractor.open_nursery() as tn:
portals = []
snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))
for i in range(workers):
# this starts a new sub-actor (process + trio runtime) and
# stores it's "portal" for later use to "submit jobs" (ugh).
portals.append(
await tn.start_actor(
f'worker_{i}',
enable_modules=[__name__],
)
)
async def _map(
worker_func: Callable[[int], bool],
sequence: List[int]
) -> List[bool]:
# define an async (local) task to collect results from workers
async def send_result(func, value, portal):
await snd_chan.send((value, await portal.run(func, n=value)))
async with trio.open_nursery() as n:
for value, portal in zip(sequence, itertools.cycle(portals)):
n.start_soon(
send_result,
worker_func,
value,
portal
)
# deliver results as they arrive
for _ in range(len(sequence)):
yield await recv_chan.receive()
# deliver the parallel "worker mapper" to user code
yield _map
# tear down all "workers" on pool close
await tn.cancel()
async def main():
async with worker_pool() as actor_map:
start = time.time()
async with aclosing(actor_map(is_prime, PRIMES)) as results:
async for number, prime in results:
print(f'{number} is prime: {prime}')
print(f'processing took {time.time() - start} seconds')
if __name__ == '__main__':
start = time.time()
trio.run(main)
print(f'script took {time.time() - start} seconds')

View File

@ -0,0 +1,40 @@
import time
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
start = time.time()
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
print(f'processing took {time.time() - start} seconds')
if __name__ == '__main__':
start = time.time()
main()
print(f'script took {time.time() - start} seconds')

View File

@ -0,0 +1,37 @@
"""
Run with a process monitor from a terminal using:
$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $!
"""
from multiprocessing import cpu_count
import os
import tractor
import trio
async def target():
print(f"Yo, i'm '{tractor.current_actor().name}' "
f"running in pid {os.getpid()}")
await trio.sleep_forever()
async def main():
async with tractor.open_nursery() as n:
for i in range(cpu_count()):
await n.run_in_actor(target, name=f'worker_{i}')
print('This process tree will self-destruct in 1 sec...')
await trio.sleep(1)
# you could have done this yourself
raise Exception('Self Destructed')
if __name__ == '__main__':
try:
trio.run(main)
except Exception:
print('Zombies Contained')

View File

@ -78,13 +78,13 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
@pytest.mark.parametrize(
'example_script',
[
f for f in os.listdir(examples_dir())
if (
('__' not in f) and
('debugging' not in f)
)
],
# walk yields: (dirpath, dirnames, filenames)
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
if '__' not in f
and 'debugging' not in p[0]
]
)
def test_example(run_example_in_subproc, example_script):
"""Load and run scripts from this repo's ``examples/`` dir as a user
@ -95,7 +95,7 @@ def test_example(run_example_in_subproc, example_script):
test directory and invoke the script as a module with ``python -m
test_example``.
"""
ex_file = os.path.join(examples_dir(), example_script)
ex_file = os.path.join(*example_script)
with open(ex_file, 'r') as ex:
code = ex.read()

View File

@ -17,6 +17,8 @@ class RemoteActorError(Exception):
"Remote actor exception bundled locally"
def __init__(self, message, type_str, **msgdata) -> None:
super().__init__(message)
self.type_str = type_str
for ns in [builtins, _this_mod, trio]:
try:
self.type = getattr(ns, type_str)

View File

@ -6,6 +6,8 @@ semaphore tracker per ``MainProcess``.
.. note:: There is no type hinting in this code base (yet) to remain as
a close as possible to upstream.
"""
# type: ignore
import os
import socket
import signal

View File

@ -67,7 +67,7 @@ def _fixup_main_from_name(mod_name: str) -> None:
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_module(mod_name,
run_name="__mp_main__",
alter_sys=True)
alter_sys=True) # type: ignore
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
@ -95,6 +95,6 @@ def _fixup_main_from_path(main_path: str) -> None:
# old_main_modules.append(current_main)
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_path(main_path,
run_name="__mp_main__")
run_name="__mp_main__") # type: ignore
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module