Compare commits

...

1 Commits

Author SHA1 Message Date
Tyler Goodlet 704c0a0cd5 WIP doing some attrib-ing 2018-08-18 22:22:39 -04:00
2 changed files with 48 additions and 22 deletions

View File

@ -36,7 +36,7 @@ setup(
packages=[ packages=[
'tractor', 'tractor',
], ],
install_requires=['msgpack', 'trio', 'async_generator', 'colorlog'], install_requires=['attrs', 'msgpack', 'trio', 'async_generator', 'colorlog'],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.6", python_requires=">=3.6",
keywords=[ keywords=[

View File

@ -8,7 +8,10 @@ from functools import partial
import traceback import traceback
import uuid import uuid
from itertools import chain from itertools import chain
import typing
import types
import attr
import trio import trio
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
@ -122,6 +125,7 @@ async def _invoke(
actor._no_more_rpc_tasks.set() actor._no_more_rpc_tasks.set()
@attr.s(auto_attribs=True)
class Actor: class Actor:
"""The fundamental concurrency primitive. """The fundamental concurrency primitive.
@ -130,7 +134,29 @@ class Actor:
with other actors through "portals" which provide a native async API with other actors through "portals" which provide a native async API
around "channels". around "channels".
""" """
is_arbiter = False name: str
rpc_module_paths: typing.List[str] = attr.ib(list)
statespace: typing.Dict = attr.ib(factory=dict)
uid: str = attr.ib(factory=uuid.uuid1)
loglevel: str = None
arbiter_addr: typing.Tuple[str, int] = None
is_arbiter: typing.ClsVar[bool] = False
_mods: typing.Dict[str, types.ModuleType] = attr.ib(factory=dict)
_root_nursery: trio._core._run.Nursery = None
_server_nursery: trio._core._run.Nursery = None
_peers: defaultdict = defaultdict(list)
_peer_connected: dict = {}
_no_more_peers: trio.Event = attr.ib(factory=trio.Event)
_no_more_peers.set()
_no_more_rpc_tasks = trio.Event()
_no_more_rpc_tasks.set()
_rpc_tasks: dict = {}
_actors2calls: dict = {} # map {uids -> {callids -> waiter queues}}
_listeners: list = []
_parent_chan: Channel = None
_accept_host = None
_forkserver_info = None
def __init__( def __init__(
self, self,
@ -142,11 +168,9 @@ class Actor:
arbiter_addr: (str, int) = None, arbiter_addr: (str, int) = None,
): ):
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid1())) # self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths self.rpc_module_paths = rpc_module_paths
self._mods = {} self._mods = {}
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.statespace = statespace self.statespace = statespace
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr self._arb_addr = arbiter_addr
@ -403,6 +427,10 @@ class Actor:
A "root-most" (or "top-level") nursery for this actor is opened here A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
""" """
# we start with no tasks or peers
self._no_more_rpc_tasks.set()
self._no_more_peers.set()
# if this is the `MainProcess` then we get a ref to the main # if this is the `MainProcess` then we get a ref to the main
# task's coroutine object for tossing errors into # task's coroutine object for tossing errors into
self._main_coro = _main_coro self._main_coro = _main_coro
@ -416,23 +444,17 @@ class Actor:
# load allowed RPC module # load allowed RPC module
self.load_namespaces() self.load_namespaces()
# Startup up channel server # Startup channel server
host, port = accept_addr host, port = accept_addr
await nursery.start(partial( await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port) self._serve_forever, accept_host=host, accept_port=port)
) )
# XXX: I wonder if a better name is maybe "requester"
# since I don't think the notion of a "parent" actor
# necessarily sticks given that eventually we want
# ``'MainProcess'`` (the actor who initially starts the
# forkserver) to eventually be the only one who is
# allowed to spawn new processes per Python program.
if parent_addr is not None: if parent_addr is not None:
# Connect back to the parent actor and conduct initial
# handshake (From this point on if we error ship the
# exception back to the parent actor)
try: try:
# Connect back to the parent actor and conduct initial
# handshake (From this point on if we error ship the
# exception back to the parent actor)
chan = self._parent_chan = Channel( chan = self._parent_chan = Channel(
destaddr=parent_addr, destaddr=parent_addr,
) )
@ -593,21 +615,25 @@ class Actor:
return self._peers[actorid] return self._peers[actorid]
@attr.s
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who keeps track of all the other actors.
access to a top level nursery.
The arbiter is by default the first actor spawned on each host The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used. arbiter is already running that arbiter will be used else one will
be created.
""" """
is_arbiter = True is_arbiter = True
_registry = attr.ib(factory=defaultdict(list))
_waiters: dict = attr.ib()
# is_arbiter: typing.ClsVar[bool] = True
def __init__(self, *args, **kwargs): # def __init__(self, *args, **kwargs):
self._registry = defaultdict(list) # self._registry = defaultdict(list)
self._waiters = {} # self._waiters = {}
super().__init__(*args, **kwargs) # super().__init__(*args, **kwargs)
def find_actor(self, name): def find_actor(self, name):
for uid, sockaddr in self._registry.items(): for uid, sockaddr in self._registry.items():