Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet f1acbd9b84 Stash the type string from remote errors 2021-01-27 14:41:17 -05:00
Tyler Goodlet 4a4a786763 Add a super basic supervisor/restart example 2021-01-27 14:40:55 -05:00
Tyler Goodlet 70c7e09831 Add class style "actors" example with client proxy API 2021-01-24 20:41:03 -05:00
Tyler Goodlet 47d7b603db Use a global dataclass instead, cuz we like "objects"? 2021-01-24 15:18:52 -05:00
Tyler Goodlet 7f8c5cdfe6 Add an actor "state mutation" via messages example 2021-01-24 14:54:46 -05:00
Tyler Goodlet ce61230815 Fix more stdlib typing issues with latest mypy 2021-01-24 14:53:58 -05:00
Tyler Goodlet 5da86a0e48 Ignore type checks on stdlib overrides 2021-01-24 14:53:58 -05:00
Tyler Goodlet 2ed071c903 Add `aclosing()` around asyn gen loop 2021-01-24 14:53:58 -05:00
Tyler Goodlet 5db737d368 Run parallel examples 2021-01-24 14:53:58 -05:00
Tyler Goodlet 47651eaf91 Contain the error 2021-01-24 14:53:58 -05:00
10 changed files with 328 additions and 16 deletions

View File

@ -0,0 +1,86 @@
import trio
import tractor
class Restart(Exception):
"""Restart signal"""
async def sleep_then_restart():
actor = tractor.current_actor()
print(f'{actor.uid} starting up!')
await trio.sleep(0.5)
raise Restart('This is a restart signal')
async def signal_restart_whole_actor():
actor = tractor.current_actor()
print(f'{actor.uid} starting up!')
await trio.sleep(0.5)
return 'restart_me'
async def respawn_remote_task(portal):
# start a task in the actor at the other end
# of the provided portal, when it signals a restart,
# restart it..
# This is much more efficient then restarting the undlerying
# process over and over since the python interpreter runtime
# stays up and we just submit a new task to run (which
# is just the original one we submitted repeatedly.
while True:
try:
await portal.run(sleep_then_restart)
except tractor.RemoteActorError as error:
if 'Restart' in str(error):
# respawn the actor task
continue
async def supervisor():
async with tractor.open_nursery() as tn:
p0 = await tn.start_actor('task_restarter', enable_modules=[__name__])
# Yes, you can do this from multiple tasks on one actor
# or mulitple lone tasks in multiple subactors.
# We'll show both.
async with trio.open_nursery() as n:
# we'll doe the first as a lone task restart in a daemon actor
for i in range(4):
n.start_soon(respawn_remote_task, p0)
# Open another nursery that will respawn sub-actors
# spawn a set of subactors that will signal restart
# of the group of processes on each failures
portals = []
# start initial subactor set
for i in range(4):
p = await tn.run_in_actor(signal_restart_whole_actor)
portals.append(p)
# now wait on results and respawn actors
# that request it
while True:
for p in portals:
result = await p.result()
if result == 'restart_me':
print(f'restarting {p.channel.uid}')
await p.cancel_actor()
await trio.sleep(0.5)
p = await tn.run_in_actor(signal_restart_whole_actor)
portals.append(p)
# this will block indefinitely so user must
# cancel with ctrl-c
if __name__ == '__main__':
trio.run(supervisor)

View File

@ -0,0 +1,64 @@
from itertools import cycle
from pprint import pformat
from dataclasses import dataclass, field
import trio
import tractor
@dataclass
class MyProcessStateThing:
state: dict = field(default_factory=dict)
def update(self, msg: dict):
self.state.update(msg)
_actor_state = MyProcessStateThing()
async def update_local_state(msg: dict):
"""Update process-local state from sent message and exit.
"""
actor = tractor.current_actor()
global _actor_state
print(f'Yo we got a message {msg}')
# update the "actor state"
_actor_state.update(msg)
print(f'New local "state" for {actor.uid} is {pformat(_actor_state.state)}')
# we're done so exit this task running in the subactor
async def main():
# Main process/thread that spawns one sub-actor and sends messages
# to it to update it's state.
actor_portals = []
# XXX: that subactor can **not** outlive it's parent, this is SC.
async with tractor.open_nursery() as tn:
portal = await tn.start_actor('even_boy', enable_modules=[__name__])
actor_portals.append(portal)
portal = await tn.start_actor('odd_boy', enable_modules=[__name__])
actor_portals.append(portal)
for i, (count, portal) in enumerate(
zip(range(100), cycle(actor_portals))
):
await portal.run(update_local_state, msg={f'msg_{i}': count})
# blocks here indefinitely synce we spawned "daemon actors" using
# .start_actor()`, you'll need to control-c to cancel.
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,153 @@
import inspect
from typing import Any
from functools import partial
from contextlib import asynccontextmanager, AsyncExitStack
from itertools import cycle
from pprint import pformat
import trio
import tractor
log = tractor.log.get_logger(__name__)
class ActorState:
"""Singlteton actor per process.
"""
# this is a class defined variable and is thus both
# singleton across object instances and task safe.
state: dict = {}
def update(self, msg: dict) -> None:
_actor = tractor.current_actor()
print(f'Yo we got a message {msg}')
self.state.update(msg)
print(f'New local "state" for {_actor.uid} is {pformat(self.state)}')
def close(self):
# gives headers showing which process and task is active
log.info('Actor state is closing')
# if we wanted to support spawning or talking to other
# actors we can do that using a portal map collection?
# _portals: dict = {}
async def _run_proxy_method(
meth: str,
msg: dict,
) -> Any:
"""Update process-local state from sent message and exit.
"""
# Create a new actor instance per call.
# We can make this persistent by storing it either
# in a global var or are another clas scoped variable?
# If you want it somehow persisted in another namespace
# I'd be interested to know "where".
actor = ActorState()
if meth != 'close':
return getattr(actor, meth)(msg)
else:
actor.close()
# we're done so exit this task running in the subactor
class MethodProxy:
def __init__(
self,
portal: tractor._portal.Portal
) -> None:
self._portal = portal
async def _run_method(
self,
*,
meth: str,
msg: dict,
) -> Any:
return await self._portal.run(
_run_proxy_method,
meth=meth,
msg=msg
)
def get_method_proxy(portal, target=ActorState) -> MethodProxy:
proxy = MethodProxy(portal)
# mock all remote methods
for name, method in inspect.getmembers(
target, predicate=inspect.isfunction
):
if '_' == name[0]:
# skip private methods
continue
else:
setattr(proxy, name, partial(proxy._run_method, meth=name))
return proxy
@asynccontextmanager
async def spawn_proxy_actor(name):
# XXX: that subactor can **not** outlive it's parent, this is SC.
async with tractor.open_nursery(
debug_mode=True,
# loglevel='info',
) as tn:
portal = await tn.start_actor(name, enable_modules=[__name__])
proxy = get_method_proxy(portal)
yield proxy
await proxy.close(msg=None)
async def main():
# Main process/thread that spawns one sub-actor and sends messages
# to it to update it's state.
try:
stack = AsyncExitStack()
actors = []
for name in ['even', 'odd']:
actor_proxy = await stack.enter_async_context(
spawn_proxy_actor(name + '_boy')
)
actors.append(actor_proxy)
# spin through the actors and update their states
for i, (count, actor) in enumerate(
zip(range(100), cycle(actors))
):
# Here we call the locally patched `.update()` method of the
# remote instance
# NOTE: the instance created each call here is currently
# a new object - to persist it across `portal.run()` calls
# we need to store it somewhere in memory for access by
# a new task spawned in the remote actor process.
await actor.update(msg={f'msg_{i}': count})
# blocks here indefinitely synce we spawned "daemon actors" using
# .start_actor()`, you'll need to control-c to cancel.
finally:
await stack.aclose()
if __name__ == '__main__':
trio.run(main)

View File

@ -16,6 +16,7 @@ import time
import tractor
import trio
from async_generator import aclosing
PRIMES = [
@ -103,10 +104,11 @@ async def main():
async with worker_pool() as actor_map:
start = time.time()
# for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
async for number, prime in actor_map(is_prime, PRIMES):
print(f'{number} is prime: {prime}')
async with aclosing(actor_map(is_prime, PRIMES)) as results:
async for number, prime in results:
print(f'{number} is prime: {prime}')
print(f'processing took {time.time() - start} seconds')

View File

@ -1,6 +1,6 @@
"""
Run with a process monitor from a terminal using:
$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py || kill $!
$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $!
"""
from multiprocessing import cpu_count
@ -11,7 +11,7 @@ import trio
async def target():
print(f"Yo, i'm {tractor.current_actor().name} "
print(f"Yo, i'm '{tractor.current_actor().name}' "
f"running in pid {os.getpid()}")
await trio.sleep_forever()
@ -31,4 +31,7 @@ async def main():
if __name__ == '__main__':
trio.run(main)
try:
trio.run(main)
except Exception:
print('Zombies Contained')

View File

@ -78,13 +78,13 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
@pytest.mark.parametrize(
'example_script',
[
f for f in os.listdir(examples_dir())
if (
('__' not in f) and
('debugging' not in f)
)
],
# walk yields: (dirpath, dirnames, filenames)
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
if '__' not in f
and 'debugging' not in p[0]
]
)
def test_example(run_example_in_subproc, example_script):
"""Load and run scripts from this repo's ``examples/`` dir as a user
@ -95,7 +95,7 @@ def test_example(run_example_in_subproc, example_script):
test directory and invoke the script as a module with ``python -m
test_example``.
"""
ex_file = os.path.join(examples_dir(), example_script)
ex_file = os.path.join(*example_script)
with open(ex_file, 'r') as ex:
code = ex.read()

View File

@ -17,6 +17,8 @@ class RemoteActorError(Exception):
"Remote actor exception bundled locally"
def __init__(self, message, type_str, **msgdata) -> None:
super().__init__(message)
self.type_str = type_str
for ns in [builtins, _this_mod, trio]:
try:
self.type = getattr(ns, type_str)

View File

@ -6,6 +6,8 @@ semaphore tracker per ``MainProcess``.
.. note:: There is no type hinting in this code base (yet) to remain as
a close as possible to upstream.
"""
# type: ignore
import os
import socket
import signal

View File

@ -67,7 +67,7 @@ def _fixup_main_from_name(mod_name: str) -> None:
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_module(mod_name,
run_name="__mp_main__",
alter_sys=True)
alter_sys=True) # type: ignore
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
@ -95,6 +95,6 @@ def _fixup_main_from_path(main_path: str) -> None:
# old_main_modules.append(current_main)
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_path(main_path,
run_name="__mp_main__")
run_name="__mp_main__") # type: ignore
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module