WIP doing some attrib-ing
parent
1264cae218
commit
704c0a0cd5
2
setup.py
2
setup.py
|
@ -36,7 +36,7 @@ setup(
|
||||||
packages=[
|
packages=[
|
||||||
'tractor',
|
'tractor',
|
||||||
],
|
],
|
||||||
install_requires=['msgpack', 'trio', 'async_generator', 'colorlog'],
|
install_requires=['attrs', 'msgpack', 'trio', 'async_generator', 'colorlog'],
|
||||||
tests_require=['pytest'],
|
tests_require=['pytest'],
|
||||||
python_requires=">=3.6",
|
python_requires=">=3.6",
|
||||||
keywords=[
|
keywords=[
|
||||||
|
|
|
@ -8,7 +8,10 @@ from functools import partial
|
||||||
import traceback
|
import traceback
|
||||||
import uuid
|
import uuid
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
|
import typing
|
||||||
|
import types
|
||||||
|
|
||||||
|
import attr
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager, aclosing
|
from async_generator import asynccontextmanager, aclosing
|
||||||
|
|
||||||
|
@ -122,6 +125,7 @@ async def _invoke(
|
||||||
actor._no_more_rpc_tasks.set()
|
actor._no_more_rpc_tasks.set()
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(auto_attribs=True)
|
||||||
class Actor:
|
class Actor:
|
||||||
"""The fundamental concurrency primitive.
|
"""The fundamental concurrency primitive.
|
||||||
|
|
||||||
|
@ -130,7 +134,29 @@ class Actor:
|
||||||
with other actors through "portals" which provide a native async API
|
with other actors through "portals" which provide a native async API
|
||||||
around "channels".
|
around "channels".
|
||||||
"""
|
"""
|
||||||
is_arbiter = False
|
name: str
|
||||||
|
rpc_module_paths: typing.List[str] = attr.ib(list)
|
||||||
|
statespace: typing.Dict = attr.ib(factory=dict)
|
||||||
|
uid: str = attr.ib(factory=uuid.uuid1)
|
||||||
|
loglevel: str = None
|
||||||
|
arbiter_addr: typing.Tuple[str, int] = None
|
||||||
|
is_arbiter: typing.ClsVar[bool] = False
|
||||||
|
|
||||||
|
_mods: typing.Dict[str, types.ModuleType] = attr.ib(factory=dict)
|
||||||
|
_root_nursery: trio._core._run.Nursery = None
|
||||||
|
_server_nursery: trio._core._run.Nursery = None
|
||||||
|
_peers: defaultdict = defaultdict(list)
|
||||||
|
_peer_connected: dict = {}
|
||||||
|
_no_more_peers: trio.Event = attr.ib(factory=trio.Event)
|
||||||
|
_no_more_peers.set()
|
||||||
|
_no_more_rpc_tasks = trio.Event()
|
||||||
|
_no_more_rpc_tasks.set()
|
||||||
|
_rpc_tasks: dict = {}
|
||||||
|
_actors2calls: dict = {} # map {uids -> {callids -> waiter queues}}
|
||||||
|
_listeners: list = []
|
||||||
|
_parent_chan: Channel = None
|
||||||
|
_accept_host = None
|
||||||
|
_forkserver_info = None
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
@ -142,11 +168,9 @@ class Actor:
|
||||||
arbiter_addr: (str, int) = None,
|
arbiter_addr: (str, int) = None,
|
||||||
):
|
):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.uid = (name, uid or str(uuid.uuid1()))
|
# self.uid = (name, uid or str(uuid.uuid1()))
|
||||||
self.rpc_module_paths = rpc_module_paths
|
self.rpc_module_paths = rpc_module_paths
|
||||||
self._mods = {}
|
self._mods = {}
|
||||||
# TODO: consider making this a dynamically defined
|
|
||||||
# @dataclass once we get py3.7
|
|
||||||
self.statespace = statespace
|
self.statespace = statespace
|
||||||
self.loglevel = loglevel
|
self.loglevel = loglevel
|
||||||
self._arb_addr = arbiter_addr
|
self._arb_addr = arbiter_addr
|
||||||
|
@ -403,6 +427,10 @@ class Actor:
|
||||||
A "root-most" (or "top-level") nursery for this actor is opened here
|
A "root-most" (or "top-level") nursery for this actor is opened here
|
||||||
and when cancelled effectively cancels the actor.
|
and when cancelled effectively cancels the actor.
|
||||||
"""
|
"""
|
||||||
|
# we start with no tasks or peers
|
||||||
|
self._no_more_rpc_tasks.set()
|
||||||
|
self._no_more_peers.set()
|
||||||
|
|
||||||
# if this is the `MainProcess` then we get a ref to the main
|
# if this is the `MainProcess` then we get a ref to the main
|
||||||
# task's coroutine object for tossing errors into
|
# task's coroutine object for tossing errors into
|
||||||
self._main_coro = _main_coro
|
self._main_coro = _main_coro
|
||||||
|
@ -416,23 +444,17 @@ class Actor:
|
||||||
# load allowed RPC module
|
# load allowed RPC module
|
||||||
self.load_namespaces()
|
self.load_namespaces()
|
||||||
|
|
||||||
# Startup up channel server
|
# Startup channel server
|
||||||
host, port = accept_addr
|
host, port = accept_addr
|
||||||
await nursery.start(partial(
|
await nursery.start(partial(
|
||||||
self._serve_forever, accept_host=host, accept_port=port)
|
self._serve_forever, accept_host=host, accept_port=port)
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: I wonder if a better name is maybe "requester"
|
|
||||||
# since I don't think the notion of a "parent" actor
|
|
||||||
# necessarily sticks given that eventually we want
|
|
||||||
# ``'MainProcess'`` (the actor who initially starts the
|
|
||||||
# forkserver) to eventually be the only one who is
|
|
||||||
# allowed to spawn new processes per Python program.
|
|
||||||
if parent_addr is not None:
|
if parent_addr is not None:
|
||||||
try:
|
|
||||||
# Connect back to the parent actor and conduct initial
|
# Connect back to the parent actor and conduct initial
|
||||||
# handshake (From this point on if we error ship the
|
# handshake (From this point on if we error ship the
|
||||||
# exception back to the parent actor)
|
# exception back to the parent actor)
|
||||||
|
try:
|
||||||
chan = self._parent_chan = Channel(
|
chan = self._parent_chan = Channel(
|
||||||
destaddr=parent_addr,
|
destaddr=parent_addr,
|
||||||
)
|
)
|
||||||
|
@ -593,21 +615,25 @@ class Actor:
|
||||||
return self._peers[actorid]
|
return self._peers[actorid]
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
class Arbiter(Actor):
|
class Arbiter(Actor):
|
||||||
"""A special actor who knows all the other actors and always has
|
"""A special actor who keeps track of all the other actors.
|
||||||
access to a top level nursery.
|
|
||||||
|
|
||||||
The arbiter is by default the first actor spawned on each host
|
The arbiter is by default the first actor spawned on each host
|
||||||
and is responsible for keeping track of all other actors for
|
and is responsible for keeping track of all other actors for
|
||||||
coordination purposes. If a new main process is launched and an
|
coordination purposes. If a new main process is launched and an
|
||||||
arbiter is already running that arbiter will be used.
|
arbiter is already running that arbiter will be used else one will
|
||||||
|
be created.
|
||||||
"""
|
"""
|
||||||
is_arbiter = True
|
is_arbiter = True
|
||||||
|
_registry = attr.ib(factory=defaultdict(list))
|
||||||
|
_waiters: dict = attr.ib()
|
||||||
|
# is_arbiter: typing.ClsVar[bool] = True
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
# def __init__(self, *args, **kwargs):
|
||||||
self._registry = defaultdict(list)
|
# self._registry = defaultdict(list)
|
||||||
self._waiters = {}
|
# self._waiters = {}
|
||||||
super().__init__(*args, **kwargs)
|
# super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
def find_actor(self, name):
|
def find_actor(self, name):
|
||||||
for uid, sockaddr in self._registry.items():
|
for uid, sockaddr in self._registry.items():
|
||||||
|
|
Loading…
Reference in New Issue