From 4d5a5c147a05c42c18f19fcb52039273d22081f6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 17:07:23 -0400 Subject: [PATCH] Move core actor runtime logging to, well, "runtime" --- tractor/_actor.py | 78 ++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2e470b4..bfe175e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1,5 +1,6 @@ """ Actor primitives and helpers + """ from collections import defaultdict from functools import partial @@ -52,7 +53,8 @@ async def _invoke( Union[trio.CancelScope, BaseException] ] = trio.TASK_STATUS_IGNORED, ): - '''Invoke local func and deliver result(s) over provided channel. + ''' + Invoke local func and deliver result(s) over provided channel. ''' __tracebackhide__ = True @@ -127,7 +129,7 @@ async def _invoke( # to_yield = await coro.asend(to_send) await chan.send({'yield': item, 'cid': cid}) - log.debug(f"Finished iterating {coro}") + log.runtime(f"Finished iterating {coro}") # TODO: we should really support a proper # `StopAsyncIteration` system here for returning a final # value if desired @@ -354,10 +356,10 @@ class Actor: """Wait for a connection back from a spawned actor with a given ``uid``. """ - log.debug(f"Waiting for peer {uid} to connect") + log.runtime(f"Waiting for peer {uid} to connect") event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() - log.debug(f"{uid} successfully connected back to us") + log.runtime(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] def load_modules(self) -> None: @@ -381,7 +383,7 @@ class Actor: # XXX append the allowed module to the python path which # should allow for relative (at least downward) imports. sys.path.append(os.path.dirname(filepath)) - log.debug(f"Attempting to import {modpath}@{filepath}") + log.runtime(f"Attempting to import {modpath}@{filepath}") mod = importlib.import_module(modpath) self._mods[modpath] = mod if modpath == '__main__': @@ -450,7 +452,7 @@ class Actor: # Instructing connection: this is likely a new channel to # a recently spawned actor which we'd like to control via # async-rpc calls. - log.debug(f"Waking channel waiters {event.statistics()}") + log.runtime(f"Waking channel waiters {event.statistics()}") # Alert any task waiting on this connection to come up event.set() @@ -487,19 +489,19 @@ class Actor: # await send_chan.aclose() # Drop ref to channel so it can be gc-ed and disconnected - log.debug(f"Releasing channel {chan} from {chan.uid}") + log.runtime(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) if not chans: - log.debug(f"No more channels for {chan.uid}") + log.runtime(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) - log.debug(f"Peers is {self._peers}") + log.runtime(f"Peers is {self._peers}") if not self._peers: # no more channels connected self._no_more_peers.set() - log.debug("Signalling no more peer channels") + log.runtime("Signalling no more peer channels") # # XXX: is this necessary (GC should do it?) if chan.connected(): @@ -508,7 +510,7 @@ class Actor: # an error and so we should at least try to terminate # the channel from this end gracefully. - log.debug(f"Disconnecting channel {chan}") + log.runtime(f"Disconnecting channel {chan}") try: # send a msg loop terminate sentinel await chan.send(None) @@ -538,12 +540,12 @@ class Actor: # if ctx: # ctx._error_from_remote_msg(msg) - # log.debug(f"{send_chan} was terminated at remote end") + # log.runtime(f"{send_chan} was terminated at remote end") # # indicate to consumer that far end has stopped # return await send_chan.aclose() try: - log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") + log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") # maintain backpressure await send_chan.send(msg) @@ -565,7 +567,7 @@ class Actor: ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: - log.debug(f"Getting result queue for {actorid} cid {cid}") + log.runtime(f"Getting result queue for {actorid} cid {cid}") try: send_chan, recv_chan = self._cids2qs[(actorid, cid)] except KeyError: @@ -590,7 +592,7 @@ class Actor: cid = str(uuid.uuid4()) assert chan.uid send_chan, recv_chan = self.get_memchans(chan.uid, cid) - log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") + log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) return cid, recv_chan @@ -607,7 +609,7 @@ class Actor: # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! msg = None - log.debug(f"Entering msg loop for {chan} from {chan.uid}") + log.runtime(f"Entering msg loop for {chan} from {chan.uid}") try: with trio.CancelScope(shield=shield) as loop_cs: # this internal scope allows for keeping this message @@ -620,14 +622,14 @@ class Actor: if msg is None: # loop terminate sentinel - log.debug( + log.runtime( f"Cancelling all tasks for {chan} from {chan.uid}") for (channel, cid) in self._rpc_tasks.copy(): if channel is chan: await self._cancel_task(cid, channel) - log.debug( + log.runtime( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") @@ -641,7 +643,7 @@ class Actor: # deliver response to local caller/waiter await self._push_result(chan, cid, msg) - log.debug( + log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -659,7 +661,7 @@ class Actor: chan._exc = exc raise exc - log.debug( + log.runtime( f"Processing request from {actorid}\n" f"{ns}.{funcname}({kwargs})") if ns == 'self': @@ -685,7 +687,7 @@ class Actor: continue # spin up a task for the requested function - log.debug(f"Spawning task for {func}") + log.runtime(f"Spawning task for {func}") assert self._service_n cs = await self._service_n.start( partial(_invoke, self, cid, chan, func, kwargs), @@ -716,11 +718,11 @@ class Actor: loop_cs.cancel() break - log.debug( + log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") else: # channel disconnect - log.debug( + log.runtime( f"{chan} for {chan.uid} disconnected, cancelling tasks" ) await self.cancel_rpc_tasks(chan) @@ -733,7 +735,7 @@ class Actor: # handshake for them (yet) and instead we simply bail out of # the message loop and expect the teardown sequence to clean # up. - log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}') + log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') except (Exception, trio.MultiError) as err: @@ -749,12 +751,12 @@ class Actor: except trio.Cancelled: # debugging only - log.debug(f"Msg loop was cancelled for {chan}") + log.runtime(f"Msg loop was cancelled for {chan}") raise finally: # msg debugging for when he machinery is brokey - log.debug( + log.runtime( f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") @@ -780,7 +782,7 @@ class Actor: # Receive runtime state from our parent parent_data: dict[str, Any] parent_data = await chan.recv() - log.debug( + log.runtime( "Received state from parent:\n" f"{parent_data}" ) @@ -789,7 +791,7 @@ class Actor: parent_data.pop('bind_port'), ) rvs = parent_data.pop('_runtime_vars') - log.debug(f"Runtime vars are: {rvs}") + log.runtime(f"Runtime vars are: {rvs}") rvs['_is_root'] = False _state._runtime_vars.update(rvs) @@ -893,7 +895,7 @@ class Actor: _state._runtime_vars['_root_mailbox'] = accept_addr # Register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") + log.runtime(f"Registering {self} for role `{self.name}`") assert isinstance(self._arb_addr, tuple) async with get_arbiter(*self._arb_addr) as arb_portal: @@ -991,13 +993,13 @@ class Actor: if any( chan.connected() for chan in chain(*self._peers.values()) ): - log.debug( + log.runtime( f"Waiting for remaining peers {self._peers} to clear") with trio.CancelScope(shield=True): await self._no_more_peers.wait() - log.debug("All peer channels are complete") + log.runtime("All peer channels are complete") - log.debug("Runtime completed") + log.runtime("Runtime completed") async def _serve_forever( self, @@ -1027,7 +1029,7 @@ class Actor: host=accept_host, ) ) - log.debug( + log.runtime( "Started tcp server(s) on" f" {[getattr(l, 'socket', 'unknown socket') for l in l]}") self._listeners.extend(l) @@ -1066,7 +1068,7 @@ class Actor: # with the root actor in this tree dbcs = _debug._debugger_request_cs if dbcs is not None: - log.debug("Cancelling active debugger request") + log.pdb("Cancelling active debugger request") dbcs.cancel() # kill all ongoing tasks @@ -1110,7 +1112,7 @@ class Actor: log.warning(f"{cid} has already completed/terminated?") return - log.debug( + log.runtime( f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") @@ -1122,12 +1124,12 @@ class Actor: scope.cancel() # wait for _invoke to mark the task complete - log.debug( + log.runtime( f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") await is_complete.wait() - log.debug( + log.runtime( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") @@ -1157,7 +1159,7 @@ class Actor: preventing any new inbound connections from being established. """ if self._server_n: - log.debug("Shutting down channel server") + log.runtime("Shutting down channel server") self._server_n.cancel_scope.cancel() @property