forked from goodboy/tractor
Compare commits
1 Commits
master
...
attrs_it_u
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 704c0a0cd5 |
2
setup.py
2
setup.py
|
@ -36,7 +36,7 @@ setup(
|
|||
packages=[
|
||||
'tractor',
|
||||
],
|
||||
install_requires=['msgpack', 'trio', 'async_generator', 'colorlog'],
|
||||
install_requires=['attrs', 'msgpack', 'trio', 'async_generator', 'colorlog'],
|
||||
tests_require=['pytest'],
|
||||
python_requires=">=3.6",
|
||||
keywords=[
|
||||
|
|
|
@ -8,7 +8,10 @@ from functools import partial
|
|||
import traceback
|
||||
import uuid
|
||||
from itertools import chain
|
||||
import typing
|
||||
import types
|
||||
|
||||
import attr
|
||||
import trio
|
||||
from async_generator import asynccontextmanager, aclosing
|
||||
|
||||
|
@ -122,6 +125,7 @@ async def _invoke(
|
|||
actor._no_more_rpc_tasks.set()
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class Actor:
|
||||
"""The fundamental concurrency primitive.
|
||||
|
||||
|
@ -130,7 +134,29 @@ class Actor:
|
|||
with other actors through "portals" which provide a native async API
|
||||
around "channels".
|
||||
"""
|
||||
is_arbiter = False
|
||||
name: str
|
||||
rpc_module_paths: typing.List[str] = attr.ib(list)
|
||||
statespace: typing.Dict = attr.ib(factory=dict)
|
||||
uid: str = attr.ib(factory=uuid.uuid1)
|
||||
loglevel: str = None
|
||||
arbiter_addr: typing.Tuple[str, int] = None
|
||||
is_arbiter: typing.ClsVar[bool] = False
|
||||
|
||||
_mods: typing.Dict[str, types.ModuleType] = attr.ib(factory=dict)
|
||||
_root_nursery: trio._core._run.Nursery = None
|
||||
_server_nursery: trio._core._run.Nursery = None
|
||||
_peers: defaultdict = defaultdict(list)
|
||||
_peer_connected: dict = {}
|
||||
_no_more_peers: trio.Event = attr.ib(factory=trio.Event)
|
||||
_no_more_peers.set()
|
||||
_no_more_rpc_tasks = trio.Event()
|
||||
_no_more_rpc_tasks.set()
|
||||
_rpc_tasks: dict = {}
|
||||
_actors2calls: dict = {} # map {uids -> {callids -> waiter queues}}
|
||||
_listeners: list = []
|
||||
_parent_chan: Channel = None
|
||||
_accept_host = None
|
||||
_forkserver_info = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -142,11 +168,9 @@ class Actor:
|
|||
arbiter_addr: (str, int) = None,
|
||||
):
|
||||
self.name = name
|
||||
self.uid = (name, uid or str(uuid.uuid1()))
|
||||
# self.uid = (name, uid or str(uuid.uuid1()))
|
||||
self.rpc_module_paths = rpc_module_paths
|
||||
self._mods = {}
|
||||
# TODO: consider making this a dynamically defined
|
||||
# @dataclass once we get py3.7
|
||||
self.statespace = statespace
|
||||
self.loglevel = loglevel
|
||||
self._arb_addr = arbiter_addr
|
||||
|
@ -403,6 +427,10 @@ class Actor:
|
|||
A "root-most" (or "top-level") nursery for this actor is opened here
|
||||
and when cancelled effectively cancels the actor.
|
||||
"""
|
||||
# we start with no tasks or peers
|
||||
self._no_more_rpc_tasks.set()
|
||||
self._no_more_peers.set()
|
||||
|
||||
# if this is the `MainProcess` then we get a ref to the main
|
||||
# task's coroutine object for tossing errors into
|
||||
self._main_coro = _main_coro
|
||||
|
@ -416,23 +444,17 @@ class Actor:
|
|||
# load allowed RPC module
|
||||
self.load_namespaces()
|
||||
|
||||
# Startup up channel server
|
||||
# Startup channel server
|
||||
host, port = accept_addr
|
||||
await nursery.start(partial(
|
||||
self._serve_forever, accept_host=host, accept_port=port)
|
||||
)
|
||||
|
||||
# XXX: I wonder if a better name is maybe "requester"
|
||||
# since I don't think the notion of a "parent" actor
|
||||
# necessarily sticks given that eventually we want
|
||||
# ``'MainProcess'`` (the actor who initially starts the
|
||||
# forkserver) to eventually be the only one who is
|
||||
# allowed to spawn new processes per Python program.
|
||||
if parent_addr is not None:
|
||||
# Connect back to the parent actor and conduct initial
|
||||
# handshake (From this point on if we error ship the
|
||||
# exception back to the parent actor)
|
||||
try:
|
||||
# Connect back to the parent actor and conduct initial
|
||||
# handshake (From this point on if we error ship the
|
||||
# exception back to the parent actor)
|
||||
chan = self._parent_chan = Channel(
|
||||
destaddr=parent_addr,
|
||||
)
|
||||
|
@ -593,21 +615,25 @@ class Actor:
|
|||
return self._peers[actorid]
|
||||
|
||||
|
||||
@attr.s
|
||||
class Arbiter(Actor):
|
||||
"""A special actor who knows all the other actors and always has
|
||||
access to a top level nursery.
|
||||
"""A special actor who keeps track of all the other actors.
|
||||
|
||||
The arbiter is by default the first actor spawned on each host
|
||||
and is responsible for keeping track of all other actors for
|
||||
coordination purposes. If a new main process is launched and an
|
||||
arbiter is already running that arbiter will be used.
|
||||
arbiter is already running that arbiter will be used else one will
|
||||
be created.
|
||||
"""
|
||||
is_arbiter = True
|
||||
_registry = attr.ib(factory=defaultdict(list))
|
||||
_waiters: dict = attr.ib()
|
||||
# is_arbiter: typing.ClsVar[bool] = True
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._registry = defaultdict(list)
|
||||
self._waiters = {}
|
||||
super().__init__(*args, **kwargs)
|
||||
# def __init__(self, *args, **kwargs):
|
||||
# self._registry = defaultdict(list)
|
||||
# self._waiters = {}
|
||||
# super().__init__(*args, **kwargs)
|
||||
|
||||
def find_actor(self, name):
|
||||
for uid, sockaddr in self._registry.items():
|
||||
|
|
Loading…
Reference in New Issue