forked from goodboy/tractor
1
0
Fork 0

Compare commits

...

1 Commits

Author SHA1 Message Date
Tyler Goodlet 704c0a0cd5 WIP doing some attrib-ing 2018-08-18 22:22:39 -04:00
2 changed files with 48 additions and 22 deletions

View File

@ -36,7 +36,7 @@ setup(
packages=[
'tractor',
],
install_requires=['msgpack', 'trio', 'async_generator', 'colorlog'],
install_requires=['attrs', 'msgpack', 'trio', 'async_generator', 'colorlog'],
tests_require=['pytest'],
python_requires=">=3.6",
keywords=[

View File

@ -8,7 +8,10 @@ from functools import partial
import traceback
import uuid
from itertools import chain
import typing
import types
import attr
import trio
from async_generator import asynccontextmanager, aclosing
@ -122,6 +125,7 @@ async def _invoke(
actor._no_more_rpc_tasks.set()
@attr.s(auto_attribs=True)
class Actor:
"""The fundamental concurrency primitive.
@ -130,7 +134,29 @@ class Actor:
with other actors through "portals" which provide a native async API
around "channels".
"""
is_arbiter = False
name: str
rpc_module_paths: typing.List[str] = attr.ib(list)
statespace: typing.Dict = attr.ib(factory=dict)
uid: str = attr.ib(factory=uuid.uuid1)
loglevel: str = None
arbiter_addr: typing.Tuple[str, int] = None
is_arbiter: typing.ClsVar[bool] = False
_mods: typing.Dict[str, types.ModuleType] = attr.ib(factory=dict)
_root_nursery: trio._core._run.Nursery = None
_server_nursery: trio._core._run.Nursery = None
_peers: defaultdict = defaultdict(list)
_peer_connected: dict = {}
_no_more_peers: trio.Event = attr.ib(factory=trio.Event)
_no_more_peers.set()
_no_more_rpc_tasks = trio.Event()
_no_more_rpc_tasks.set()
_rpc_tasks: dict = {}
_actors2calls: dict = {} # map {uids -> {callids -> waiter queues}}
_listeners: list = []
_parent_chan: Channel = None
_accept_host = None
_forkserver_info = None
def __init__(
self,
@ -142,11 +168,9 @@ class Actor:
arbiter_addr: (str, int) = None,
):
self.name = name
self.uid = (name, uid or str(uuid.uuid1()))
# self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths
self._mods = {}
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.statespace = statespace
self.loglevel = loglevel
self._arb_addr = arbiter_addr
@ -403,6 +427,10 @@ class Actor:
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
# we start with no tasks or peers
self._no_more_rpc_tasks.set()
self._no_more_peers.set()
# if this is the `MainProcess` then we get a ref to the main
# task's coroutine object for tossing errors into
self._main_coro = _main_coro
@ -416,23 +444,17 @@ class Actor:
# load allowed RPC module
self.load_namespaces()
# Startup up channel server
# Startup channel server
host, port = accept_addr
await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port)
)
# XXX: I wonder if a better name is maybe "requester"
# since I don't think the notion of a "parent" actor
# necessarily sticks given that eventually we want
# ``'MainProcess'`` (the actor who initially starts the
# forkserver) to eventually be the only one who is
# allowed to spawn new processes per Python program.
if parent_addr is not None:
# Connect back to the parent actor and conduct initial
# handshake (From this point on if we error ship the
# exception back to the parent actor)
try:
# Connect back to the parent actor and conduct initial
# handshake (From this point on if we error ship the
# exception back to the parent actor)
chan = self._parent_chan = Channel(
destaddr=parent_addr,
)
@ -593,21 +615,25 @@ class Actor:
return self._peers[actorid]
@attr.s
class Arbiter(Actor):
"""A special actor who knows all the other actors and always has
access to a top level nursery.
"""A special actor who keeps track of all the other actors.
The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used.
arbiter is already running that arbiter will be used else one will
be created.
"""
is_arbiter = True
_registry = attr.ib(factory=defaultdict(list))
_waiters: dict = attr.ib()
# is_arbiter: typing.ClsVar[bool] = True
def __init__(self, *args, **kwargs):
self._registry = defaultdict(list)
self._waiters = {}
super().__init__(*args, **kwargs)
# def __init__(self, *args, **kwargs):
# self._registry = defaultdict(list)
# self._waiters = {}
# super().__init__(*args, **kwargs)
def find_actor(self, name):
for uid, sockaddr in self._registry.items():