diff --git a/setup.py b/setup.py index c3001cf..f4a22c8 100755 --- a/setup.py +++ b/setup.py @@ -36,7 +36,7 @@ setup( packages=[ 'tractor', ], - install_requires=['msgpack', 'trio', 'async_generator', 'colorlog'], + install_requires=['attrs', 'msgpack', 'trio', 'async_generator', 'colorlog'], tests_require=['pytest'], python_requires=">=3.6", keywords=[ diff --git a/tractor/_actor.py b/tractor/_actor.py index a5e833f..a154c85 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -8,7 +8,10 @@ from functools import partial import traceback import uuid from itertools import chain +import typing +import types +import attr import trio from async_generator import asynccontextmanager, aclosing @@ -122,6 +125,7 @@ async def _invoke( actor._no_more_rpc_tasks.set() +@attr.s(auto_attribs=True) class Actor: """The fundamental concurrency primitive. @@ -130,7 +134,29 @@ class Actor: with other actors through "portals" which provide a native async API around "channels". """ - is_arbiter = False + name: str + rpc_module_paths: typing.List[str] = attr.ib(list) + statespace: typing.Dict = attr.ib(factory=dict) + uid: str = attr.ib(factory=uuid.uuid1) + loglevel: str = None + arbiter_addr: typing.Tuple[str, int] = None + is_arbiter: typing.ClsVar[bool] = False + + _mods: typing.Dict[str, types.ModuleType] = attr.ib(factory=dict) + _root_nursery: trio._core._run.Nursery = None + _server_nursery: trio._core._run.Nursery = None + _peers: defaultdict = defaultdict(list) + _peer_connected: dict = {} + _no_more_peers: trio.Event = attr.ib(factory=trio.Event) + _no_more_peers.set() + _no_more_rpc_tasks = trio.Event() + _no_more_rpc_tasks.set() + _rpc_tasks: dict = {} + _actors2calls: dict = {} # map {uids -> {callids -> waiter queues}} + _listeners: list = [] + _parent_chan: Channel = None + _accept_host = None + _forkserver_info = None def __init__( self, @@ -142,11 +168,9 @@ class Actor: arbiter_addr: (str, int) = None, ): self.name = name - self.uid = (name, uid or str(uuid.uuid1())) + # self.uid = (name, uid or str(uuid.uuid1())) self.rpc_module_paths = rpc_module_paths self._mods = {} - # TODO: consider making this a dynamically defined - # @dataclass once we get py3.7 self.statespace = statespace self.loglevel = loglevel self._arb_addr = arbiter_addr @@ -403,6 +427,10 @@ class Actor: A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ + # we start with no tasks or peers + self._no_more_rpc_tasks.set() + self._no_more_peers.set() + # if this is the `MainProcess` then we get a ref to the main # task's coroutine object for tossing errors into self._main_coro = _main_coro @@ -416,23 +444,17 @@ class Actor: # load allowed RPC module self.load_namespaces() - # Startup up channel server + # Startup channel server host, port = accept_addr await nursery.start(partial( self._serve_forever, accept_host=host, accept_port=port) ) - # XXX: I wonder if a better name is maybe "requester" - # since I don't think the notion of a "parent" actor - # necessarily sticks given that eventually we want - # ``'MainProcess'`` (the actor who initially starts the - # forkserver) to eventually be the only one who is - # allowed to spawn new processes per Python program. if parent_addr is not None: + # Connect back to the parent actor and conduct initial + # handshake (From this point on if we error ship the + # exception back to the parent actor) try: - # Connect back to the parent actor and conduct initial - # handshake (From this point on if we error ship the - # exception back to the parent actor) chan = self._parent_chan = Channel( destaddr=parent_addr, ) @@ -593,21 +615,25 @@ class Actor: return self._peers[actorid] +@attr.s class Arbiter(Actor): - """A special actor who knows all the other actors and always has - access to a top level nursery. + """A special actor who keeps track of all the other actors. The arbiter is by default the first actor spawned on each host and is responsible for keeping track of all other actors for coordination purposes. If a new main process is launched and an - arbiter is already running that arbiter will be used. + arbiter is already running that arbiter will be used else one will + be created. """ is_arbiter = True + _registry = attr.ib(factory=defaultdict(list)) + _waiters: dict = attr.ib() + # is_arbiter: typing.ClsVar[bool] = True - def __init__(self, *args, **kwargs): - self._registry = defaultdict(list) - self._waiters = {} - super().__init__(*args, **kwargs) + # def __init__(self, *args, **kwargs): + # self._registry = defaultdict(list) + # self._waiters = {} + # super().__init__(*args, **kwargs) def find_actor(self, name): for uid, sockaddr in self._registry.items():