From 7548dba8f240a170aca15ea03c445ca335ab6064 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 3 Aug 2022 16:09:16 -0400 Subject: [PATCH] Change to new doc string style --- tractor/_runtime.py | 73 +++++++++++++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 5fddfb1..4634375 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -416,9 +416,11 @@ class Actor: arbiter_addr: Optional[tuple[str, int]] = None, spawn_method: Optional[str] = None ) -> None: - """This constructor is called in the parent actor **before** the spawning + ''' + This constructor is called in the parent actor **before** the spawning phase (aka before a new process is executed). - """ + + ''' self.name = name self.uid = (name, uid or str(uuid.uuid4())) @@ -439,9 +441,6 @@ class Actor: self.enable_modules = mods self._mods: dict[str, ModuleType] = {} - - # TODO: consider making this a dynamically defined - # @dataclass once we get py3.7 self.loglevel = loglevel self._arb_addr = ( @@ -482,9 +481,11 @@ class Actor: async def wait_for_peer( self, uid: tuple[str, str] ) -> tuple[trio.Event, Channel]: - """Wait for a connection back from a spawned actor with a given + ''' + Wait for a connection back from a spawned actor with a given ``uid``. - """ + + ''' log.runtime(f"Waiting for peer {uid} to connect") event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() @@ -492,12 +493,14 @@ class Actor: return event, self._peers[uid][-1] def load_modules(self) -> None: - """Load allowed RPC modules locally (after fork). + ''' + Load allowed RPC modules locally (after fork). Since this actor may be spawned on a different machine from the original nursery we need to try and load the local module code (if it exists). - """ + + ''' try: if self._spawn_method == 'trio': parent_data = self._parent_main_data @@ -949,11 +952,13 @@ class Actor: accept_port: int = 0, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: - """Start the channel server, begin listening for new connections. + ''' + Start the channel server, begin listening for new connections. This will cause an actor to continue living (blocking) until ``cancel_server()`` is called. - """ + + ''' self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: @@ -978,16 +983,19 @@ class Actor: self._server_down.set() def cancel_soon(self) -> None: - """Cancel this actor asap; can be called from a sync context. + ''' + Cancel this actor asap; can be called from a sync context. Schedules `.cancel()` to be run immediately just like when cancelled by the parent. - """ + + ''' assert self._service_n self._service_n.start_soon(self.cancel) async def cancel(self) -> bool: - """Cancel this actor's runtime. + ''' + Cancel this actor's runtime. The "deterministic" teardown sequence in order is: - cancel all ongoing rpc tasks by cancel scope @@ -996,7 +1004,8 @@ class Actor: - cancel the "service" nursery reponsible for spawning new rpc tasks - return control the parent channel message loop - """ + + ''' log.cancel(f"{self.uid} is trying to cancel") self._cancel_called = True @@ -1082,9 +1091,11 @@ class Actor: self, only_chan: Optional[Channel] = None, ) -> None: - """Cancel all existing RPC responder tasks using the cancel scope + ''' + Cancel all existing RPC responder tasks using the cancel scope registered for each. - """ + + ''' tasks = self._rpc_tasks if tasks: log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") @@ -1105,27 +1116,37 @@ class Actor: await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: - """Cancel the internal channel server nursery thereby + ''' + Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. - """ + + ''' if self._server_n: log.runtime("Shutting down channel server") self._server_n.cancel_scope.cancel() @property def accept_addr(self) -> Optional[tuple[str, int]]: - """Primary address to which the channel server is bound. - """ + ''' + Primary address to which the channel server is bound. + + ''' # throws OSError on failure return self._listeners[0].socket.getsockname() # type: ignore def get_parent(self) -> Portal: - """Return a portal to our parent actor.""" + ''' + Return a portal to our parent actor. + + ''' assert self._parent_chan, "No parent channel for this actor?" return Portal(self._parent_chan) def get_chans(self, uid: tuple[str, str]) -> list[Channel]: - """Return all channels to the actor with provided uid.""" + ''' + Return all channels to the actor with provided uid. + + ''' return self._peers[uid] async def _do_handshake( @@ -1133,11 +1154,13 @@ class Actor: chan: Channel ) -> tuple[str, str]: - """Exchange (name, UUIDs) identifiers as the first communication step. + ''' + Exchange (name, UUIDs) identifiers as the first communication step. These are essentially the "mailbox addresses" found in actor model parlance. - """ + + ''' await chan.send(self.uid) value = await chan.recv() uid: tuple[str, str] = (str(value[0]), str(value[1]))