forked from goodboy/tractor
1
0
Fork 0

Change to new doc string style

we_bein_all_matchy
Tyler Goodlet 2022-08-03 16:09:16 -04:00
parent ba4d4e9af3
commit 7548dba8f2
1 changed files with 48 additions and 25 deletions

View File

@ -416,9 +416,11 @@ class Actor:
arbiter_addr: Optional[tuple[str, int]] = None, arbiter_addr: Optional[tuple[str, int]] = None,
spawn_method: Optional[str] = None spawn_method: Optional[str] = None
) -> None: ) -> None:
"""This constructor is called in the parent actor **before** the spawning '''
This constructor is called in the parent actor **before** the spawning
phase (aka before a new process is executed). phase (aka before a new process is executed).
"""
'''
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid4())) self.uid = (name, uid or str(uuid.uuid4()))
@ -439,9 +441,6 @@ class Actor:
self.enable_modules = mods self.enable_modules = mods
self._mods: dict[str, ModuleType] = {} self._mods: dict[str, ModuleType] = {}
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = ( self._arb_addr = (
@ -482,9 +481,11 @@ class Actor:
async def wait_for_peer( async def wait_for_peer(
self, uid: tuple[str, str] self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]: ) -> tuple[trio.Event, Channel]:
"""Wait for a connection back from a spawned actor with a given '''
Wait for a connection back from a spawned actor with a given
``uid``. ``uid``.
"""
'''
log.runtime(f"Waiting for peer {uid} to connect") log.runtime(f"Waiting for peer {uid} to connect")
event = self._peer_connected.setdefault(uid, trio.Event()) event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait() await event.wait()
@ -492,12 +493,14 @@ class Actor:
return event, self._peers[uid][-1] return event, self._peers[uid][-1]
def load_modules(self) -> None: def load_modules(self) -> None:
"""Load allowed RPC modules locally (after fork). '''
Load allowed RPC modules locally (after fork).
Since this actor may be spawned on a different machine from Since this actor may be spawned on a different machine from
the original nursery we need to try and load the local module the original nursery we need to try and load the local module
code (if it exists). code (if it exists).
"""
'''
try: try:
if self._spawn_method == 'trio': if self._spawn_method == 'trio':
parent_data = self._parent_main_data parent_data = self._parent_main_data
@ -949,11 +952,13 @@ class Actor:
accept_port: int = 0, accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Start the channel server, begin listening for new connections. '''
Start the channel server, begin listening for new connections.
This will cause an actor to continue living (blocking) until This will cause an actor to continue living (blocking) until
``cancel_server()`` is called. ``cancel_server()`` is called.
"""
'''
self._server_down = trio.Event() self._server_down = trio.Event()
try: try:
async with trio.open_nursery() as server_n: async with trio.open_nursery() as server_n:
@ -978,16 +983,19 @@ class Actor:
self._server_down.set() self._server_down.set()
def cancel_soon(self) -> None: def cancel_soon(self) -> None:
"""Cancel this actor asap; can be called from a sync context. '''
Cancel this actor asap; can be called from a sync context.
Schedules `.cancel()` to be run immediately just like when Schedules `.cancel()` to be run immediately just like when
cancelled by the parent. cancelled by the parent.
"""
'''
assert self._service_n assert self._service_n
self._service_n.start_soon(self.cancel) self._service_n.start_soon(self.cancel)
async def cancel(self) -> bool: async def cancel(self) -> bool:
"""Cancel this actor's runtime. '''
Cancel this actor's runtime.
The "deterministic" teardown sequence in order is: The "deterministic" teardown sequence in order is:
- cancel all ongoing rpc tasks by cancel scope - cancel all ongoing rpc tasks by cancel scope
@ -996,7 +1004,8 @@ class Actor:
- cancel the "service" nursery reponsible for - cancel the "service" nursery reponsible for
spawning new rpc tasks spawning new rpc tasks
- return control the parent channel message loop - return control the parent channel message loop
"""
'''
log.cancel(f"{self.uid} is trying to cancel") log.cancel(f"{self.uid} is trying to cancel")
self._cancel_called = True self._cancel_called = True
@ -1082,9 +1091,11 @@ class Actor:
self, self,
only_chan: Optional[Channel] = None, only_chan: Optional[Channel] = None,
) -> None: ) -> None:
"""Cancel all existing RPC responder tasks using the cancel scope '''
Cancel all existing RPC responder tasks using the cancel scope
registered for each. registered for each.
"""
'''
tasks = self._rpc_tasks tasks = self._rpc_tasks
if tasks: if tasks:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
@ -1105,27 +1116,37 @@ class Actor:
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None: def cancel_server(self) -> None:
"""Cancel the internal channel server nursery thereby '''
Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established. preventing any new inbound connections from being established.
"""
'''
if self._server_n: if self._server_n:
log.runtime("Shutting down channel server") log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel() self._server_n.cancel_scope.cancel()
@property @property
def accept_addr(self) -> Optional[tuple[str, int]]: def accept_addr(self) -> Optional[tuple[str, int]]:
"""Primary address to which the channel server is bound. '''
""" Primary address to which the channel server is bound.
'''
# throws OSError on failure # throws OSError on failure
return self._listeners[0].socket.getsockname() # type: ignore return self._listeners[0].socket.getsockname() # type: ignore
def get_parent(self) -> Portal: def get_parent(self) -> Portal:
"""Return a portal to our parent actor.""" '''
Return a portal to our parent actor.
'''
assert self._parent_chan, "No parent channel for this actor?" assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan) return Portal(self._parent_chan)
def get_chans(self, uid: tuple[str, str]) -> list[Channel]: def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
"""Return all channels to the actor with provided uid.""" '''
Return all channels to the actor with provided uid.
'''
return self._peers[uid] return self._peers[uid]
async def _do_handshake( async def _do_handshake(
@ -1133,11 +1154,13 @@ class Actor:
chan: Channel chan: Channel
) -> tuple[str, str]: ) -> tuple[str, str]:
"""Exchange (name, UUIDs) identifiers as the first communication step. '''
Exchange (name, UUIDs) identifiers as the first communication step.
These are essentially the "mailbox addresses" found in actor model These are essentially the "mailbox addresses" found in actor model
parlance. parlance.
"""
'''
await chan.send(self.uid) await chan.send(self.uid)
value = await chan.recv() value = await chan.recv()
uid: tuple[str, str] = (str(value[0]), str(value[1])) uid: tuple[str, str] = (str(value[0]), str(value[1]))