From 3f6d4d6af4d8923c97017f1ec4b98ce8cf306e32 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 17:03:55 -0400 Subject: [PATCH 1/6] Don't log.error if it was intentional --- tractor/_actor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index dcbe541..2e470b4 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -195,8 +195,6 @@ async def _invoke( if not is_multi_cancelled(err): - log.exception("Actor crashed:") - # TODO: maybe we'll want different "levels" of debugging # eventualy such as ('app', 'supervisory', 'runtime') ? @@ -218,8 +216,8 @@ async def _invoke( entered_debug = await _debug._maybe_enter_pm(err) - if not entered_debug: - log.exception("Actor crashed:") + if not entered_debug: + log.exception("Actor crashed:") # always ship errors back to caller err_msg = pack_error(err, tb=tb) From d2f084304169f8e8e3f06daa9c5aff709b60c236 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 17:04:11 -0400 Subject: [PATCH 2/6] Make custom log levels report the right stack frame The stdlib's `logging.LoggingAdapter` doesn't currently pass through `stacklevel: int` down to its wrapped logger instance. Hack it here and get our msgs looking like they would if using a built-in level. --- tractor/log.py | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/tractor/log.py b/tractor/log.py index 4bfc798..164e76b 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -2,7 +2,6 @@ Log like a forester! """ import sys -from functools import partial import logging import colorlog # type: ignore from typing import Optional @@ -75,6 +74,42 @@ class StackLevelAdapter(logging.LoggerAdapter): ) -> None: return self.log(500, msg) + def log(self, level, msg, *args, **kwargs): + """ + Delegate a log call to the underlying logger, after adding + contextual information from this adapter instance. + """ + if self.isEnabledFor(level): + # msg, kwargs = self.process(msg, kwargs) + self._log(level, msg, args, **kwargs) + + # LOL, the stdlib doesn't allow passing through ``stacklevel``.. + def _log( + self, + level, + msg, + args, + exc_info=None, + extra=None, + stack_info=False, + + # XXX: bit we added to show fileinfo from actual caller. + # this level then ``.log()`` then finally the caller's level.. + stacklevel=3, + ): + """ + Low-level log implementation, proxied to allow nested logger adapters. + """ + return self.logger._log( + level, + msg, + args, + exc_info=exc_info, + extra=self.extra, + stack_info=stack_info, + stacklevel=stacklevel, + ) + def get_logger( From 4d5a5c147a05c42c18f19fcb52039273d22081f6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 17:07:23 -0400 Subject: [PATCH 3/6] Move core actor runtime logging to, well, "runtime" --- tractor/_actor.py | 78 ++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2e470b4..bfe175e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1,5 +1,6 @@ """ Actor primitives and helpers + """ from collections import defaultdict from functools import partial @@ -52,7 +53,8 @@ async def _invoke( Union[trio.CancelScope, BaseException] ] = trio.TASK_STATUS_IGNORED, ): - '''Invoke local func and deliver result(s) over provided channel. + ''' + Invoke local func and deliver result(s) over provided channel. ''' __tracebackhide__ = True @@ -127,7 +129,7 @@ async def _invoke( # to_yield = await coro.asend(to_send) await chan.send({'yield': item, 'cid': cid}) - log.debug(f"Finished iterating {coro}") + log.runtime(f"Finished iterating {coro}") # TODO: we should really support a proper # `StopAsyncIteration` system here for returning a final # value if desired @@ -354,10 +356,10 @@ class Actor: """Wait for a connection back from a spawned actor with a given ``uid``. """ - log.debug(f"Waiting for peer {uid} to connect") + log.runtime(f"Waiting for peer {uid} to connect") event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() - log.debug(f"{uid} successfully connected back to us") + log.runtime(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] def load_modules(self) -> None: @@ -381,7 +383,7 @@ class Actor: # XXX append the allowed module to the python path which # should allow for relative (at least downward) imports. sys.path.append(os.path.dirname(filepath)) - log.debug(f"Attempting to import {modpath}@{filepath}") + log.runtime(f"Attempting to import {modpath}@{filepath}") mod = importlib.import_module(modpath) self._mods[modpath] = mod if modpath == '__main__': @@ -450,7 +452,7 @@ class Actor: # Instructing connection: this is likely a new channel to # a recently spawned actor which we'd like to control via # async-rpc calls. - log.debug(f"Waking channel waiters {event.statistics()}") + log.runtime(f"Waking channel waiters {event.statistics()}") # Alert any task waiting on this connection to come up event.set() @@ -487,19 +489,19 @@ class Actor: # await send_chan.aclose() # Drop ref to channel so it can be gc-ed and disconnected - log.debug(f"Releasing channel {chan} from {chan.uid}") + log.runtime(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) if not chans: - log.debug(f"No more channels for {chan.uid}") + log.runtime(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) - log.debug(f"Peers is {self._peers}") + log.runtime(f"Peers is {self._peers}") if not self._peers: # no more channels connected self._no_more_peers.set() - log.debug("Signalling no more peer channels") + log.runtime("Signalling no more peer channels") # # XXX: is this necessary (GC should do it?) if chan.connected(): @@ -508,7 +510,7 @@ class Actor: # an error and so we should at least try to terminate # the channel from this end gracefully. - log.debug(f"Disconnecting channel {chan}") + log.runtime(f"Disconnecting channel {chan}") try: # send a msg loop terminate sentinel await chan.send(None) @@ -538,12 +540,12 @@ class Actor: # if ctx: # ctx._error_from_remote_msg(msg) - # log.debug(f"{send_chan} was terminated at remote end") + # log.runtime(f"{send_chan} was terminated at remote end") # # indicate to consumer that far end has stopped # return await send_chan.aclose() try: - log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") + log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") # maintain backpressure await send_chan.send(msg) @@ -565,7 +567,7 @@ class Actor: ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: - log.debug(f"Getting result queue for {actorid} cid {cid}") + log.runtime(f"Getting result queue for {actorid} cid {cid}") try: send_chan, recv_chan = self._cids2qs[(actorid, cid)] except KeyError: @@ -590,7 +592,7 @@ class Actor: cid = str(uuid.uuid4()) assert chan.uid send_chan, recv_chan = self.get_memchans(chan.uid, cid) - log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") + log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) return cid, recv_chan @@ -607,7 +609,7 @@ class Actor: # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! msg = None - log.debug(f"Entering msg loop for {chan} from {chan.uid}") + log.runtime(f"Entering msg loop for {chan} from {chan.uid}") try: with trio.CancelScope(shield=shield) as loop_cs: # this internal scope allows for keeping this message @@ -620,14 +622,14 @@ class Actor: if msg is None: # loop terminate sentinel - log.debug( + log.runtime( f"Cancelling all tasks for {chan} from {chan.uid}") for (channel, cid) in self._rpc_tasks.copy(): if channel is chan: await self._cancel_task(cid, channel) - log.debug( + log.runtime( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") @@ -641,7 +643,7 @@ class Actor: # deliver response to local caller/waiter await self._push_result(chan, cid, msg) - log.debug( + log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -659,7 +661,7 @@ class Actor: chan._exc = exc raise exc - log.debug( + log.runtime( f"Processing request from {actorid}\n" f"{ns}.{funcname}({kwargs})") if ns == 'self': @@ -685,7 +687,7 @@ class Actor: continue # spin up a task for the requested function - log.debug(f"Spawning task for {func}") + log.runtime(f"Spawning task for {func}") assert self._service_n cs = await self._service_n.start( partial(_invoke, self, cid, chan, func, kwargs), @@ -716,11 +718,11 @@ class Actor: loop_cs.cancel() break - log.debug( + log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") else: # channel disconnect - log.debug( + log.runtime( f"{chan} for {chan.uid} disconnected, cancelling tasks" ) await self.cancel_rpc_tasks(chan) @@ -733,7 +735,7 @@ class Actor: # handshake for them (yet) and instead we simply bail out of # the message loop and expect the teardown sequence to clean # up. - log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}') + log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') except (Exception, trio.MultiError) as err: @@ -749,12 +751,12 @@ class Actor: except trio.Cancelled: # debugging only - log.debug(f"Msg loop was cancelled for {chan}") + log.runtime(f"Msg loop was cancelled for {chan}") raise finally: # msg debugging for when he machinery is brokey - log.debug( + log.runtime( f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") @@ -780,7 +782,7 @@ class Actor: # Receive runtime state from our parent parent_data: dict[str, Any] parent_data = await chan.recv() - log.debug( + log.runtime( "Received state from parent:\n" f"{parent_data}" ) @@ -789,7 +791,7 @@ class Actor: parent_data.pop('bind_port'), ) rvs = parent_data.pop('_runtime_vars') - log.debug(f"Runtime vars are: {rvs}") + log.runtime(f"Runtime vars are: {rvs}") rvs['_is_root'] = False _state._runtime_vars.update(rvs) @@ -893,7 +895,7 @@ class Actor: _state._runtime_vars['_root_mailbox'] = accept_addr # Register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") + log.runtime(f"Registering {self} for role `{self.name}`") assert isinstance(self._arb_addr, tuple) async with get_arbiter(*self._arb_addr) as arb_portal: @@ -991,13 +993,13 @@ class Actor: if any( chan.connected() for chan in chain(*self._peers.values()) ): - log.debug( + log.runtime( f"Waiting for remaining peers {self._peers} to clear") with trio.CancelScope(shield=True): await self._no_more_peers.wait() - log.debug("All peer channels are complete") + log.runtime("All peer channels are complete") - log.debug("Runtime completed") + log.runtime("Runtime completed") async def _serve_forever( self, @@ -1027,7 +1029,7 @@ class Actor: host=accept_host, ) ) - log.debug( + log.runtime( "Started tcp server(s) on" f" {[getattr(l, 'socket', 'unknown socket') for l in l]}") self._listeners.extend(l) @@ -1066,7 +1068,7 @@ class Actor: # with the root actor in this tree dbcs = _debug._debugger_request_cs if dbcs is not None: - log.debug("Cancelling active debugger request") + log.pdb("Cancelling active debugger request") dbcs.cancel() # kill all ongoing tasks @@ -1110,7 +1112,7 @@ class Actor: log.warning(f"{cid} has already completed/terminated?") return - log.debug( + log.runtime( f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") @@ -1122,12 +1124,12 @@ class Actor: scope.cancel() # wait for _invoke to mark the task complete - log.debug( + log.runtime( f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") await is_complete.wait() - log.debug( + log.runtime( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") @@ -1157,7 +1159,7 @@ class Actor: preventing any new inbound connections from being established. """ if self._server_n: - log.debug("Shutting down channel server") + log.runtime("Shutting down channel server") self._server_n.cancel_scope.cancel() @property From 10f66e5141906823f7c3b10a699e5fe568bc9afe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 19:25:28 -0400 Subject: [PATCH 4/6] De-noise warnings, add a 'cancel' log level Now that we're on our way to a (somewhat) serious beta release I think it's about time to start de-noising the logging emissions. Since we're trying out this approach of "stack layer oriented" log levels, I figured this is a good time to move most of the "warnings" to what they should be: cancellation monitoring status messages. The level is set to 16 which is just above our "runtime" level but just below the traditional "info" level. I think this will be a decent approach since usually if you're confused about why your `tractor` app is behaving unlike you expect, it's 90% of the time going to be to do with cancellation or error propagation. This this setup a user can specify the 'cancel' level and see all the msgs pertaining to both actor and task-in-actor cancellation mechanics. --- tractor/_actor.py | 49 ++++++++++++++++++++++++++----------------- tractor/_portal.py | 23 ++++++++++---------- tractor/_root.py | 4 ++-- tractor/_streaming.py | 19 ++++++++--------- tractor/_trionics.py | 6 +++--- tractor/log.py | 8 +++++++ 6 files changed, 63 insertions(+), 46 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index bfe175e..793a678 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -318,7 +318,10 @@ class Actor: # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None + self._arb_addr = ( + str(arbiter_addr[0]), + int(arbiter_addr[1]) + ) if arbiter_addr else None # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -622,7 +625,7 @@ class Actor: if msg is None: # loop terminate sentinel - log.runtime( + log.cancel( f"Cancelling all tasks for {chan} from {chan.uid}") for (channel, cid) in self._rpc_tasks.copy(): @@ -738,12 +741,20 @@ class Actor: log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') except (Exception, trio.MultiError) as err: + if ( + isinstance(err, RuntimeError) and + self._service_n.cancel_scope.cancel_called + ): + log.cancel( + f'Service nursery cancelled before it handled {funcname}' + ) - # ship any "internal" exception (i.e. one from internal machinery - # not from an rpc task) to parent - log.exception("Actor errored:") - if self._parent_chan: - await self._parent_chan.send(pack_error(err)) + else: + # ship any "internal" exception (i.e. one from internal + # machinery not from an rpc task) to parent + log.exception("Actor errored:") + if self._parent_chan: + await self._parent_chan.send(pack_error(err)) # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" @@ -923,9 +934,9 @@ class Actor: shield=True, ) ) - log.info("Waiting on service nursery to complete") - log.info("Service nursery complete") - log.info("Waiting on root nursery to complete") + log.runtime("Waiting on service nursery to complete") + log.runtime("Service nursery complete") + log.runtime("Waiting on root nursery to complete") # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) @@ -959,11 +970,11 @@ class Actor: raise finally: - log.info("Root nursery complete") + log.runtime("Root nursery complete") # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? - log.warning("Closing all actor lifetime contexts") + log.cancel("Closing all actor lifetime contexts") _lifetime_stack.close() # Unregister actor from the arbiter @@ -1058,7 +1069,7 @@ class Actor: spawning new rpc tasks - return control the parent channel message loop """ - log.warning(f"{self.uid} is trying to cancel") + log.cancel(f"{self.uid} is trying to cancel") self._cancel_called = True # cancel all ongoing rpc tasks @@ -1068,7 +1079,7 @@ class Actor: # with the root actor in this tree dbcs = _debug._debugger_request_cs if dbcs is not None: - log.pdb("Cancelling active debugger request") + log.cancel("Cancelling active debugger request") dbcs.cancel() # kill all ongoing tasks @@ -1082,7 +1093,7 @@ class Actor: if self._service_n: self._service_n.cancel_scope.cancel() - log.warning(f"{self.uid} was sucessfullly cancelled") + log.cancel(f"{self.uid} was sucessfullly cancelled") self._cancel_complete.set() return True @@ -1109,10 +1120,10 @@ class Actor: # be cancelled was indeed spawned by a request from this channel scope, func, is_complete = self._rpc_tasks[(chan, cid)] except KeyError: - log.warning(f"{cid} has already completed/terminated?") + log.cancel(f"{cid} has already completed/terminated?") return - log.runtime( + log.cancel( f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") @@ -1141,7 +1152,7 @@ class Actor: registered for each. """ tasks = self._rpc_tasks - log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") + log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for (chan, cid) in tasks.copy(): if only_chan is not None: if only_chan != chan: @@ -1150,7 +1161,7 @@ class Actor: # TODO: this should really done in a nursery batch await self._cancel_task(cid, chan) - log.info( + log.cancel( f"Waiting for remaining rpc tasks to complete {tasks}") await self._ongoing_rpc_tasks.wait() diff --git a/tractor/_portal.py b/tractor/_portal.py index 137761e..382f8e4 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -1,5 +1,6 @@ """ Portal api + """ import importlib import inspect @@ -173,7 +174,7 @@ class Portal: # terminate all locally running async generator # IPC calls if self._streams: - log.warning( + log.cancel( f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): try: @@ -196,19 +197,17 @@ class Portal: """Cancel the actor on the other end of this portal. """ if not self.channel.connected(): - log.warning("This portal is already closed can't cancel") + log.cancel("This portal is already closed can't cancel") return False await self._cancel_streams() - log.warning( + log.cancel( f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with a proper shield - # with trio.CancelScope() as cancel_scope: - # with trio.CancelScope(shield=True) as cancel_scope: with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True @@ -216,13 +215,13 @@ class Portal: return True if cancel_scope.cancelled_caught: - log.warning(f"May have failed to cancel {self.channel.uid}") + log.cancel(f"May have failed to cancel {self.channel.uid}") # if we get here some weird cancellation case happened return False except trio.ClosedResourceError: - log.warning( + log.cancel( f"{self.channel} for {self.channel.uid} was already closed?") return False @@ -347,7 +346,7 @@ class Portal: except trio.ClosedResourceError: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. - log.warning(f'Context {ctx} was already closed?') + log.cancel(f'Context {ctx} was already closed?') # XXX: should this always be done? # await recv_chan.aclose() @@ -446,7 +445,7 @@ class Portal: _err = err # the context cancels itself on any cancel # causing error. - log.error(f'Context {ctx} sending cancel to far end') + log.cancel(f'Context {ctx} sending cancel to far end') with trio.CancelScope(shield=True): await ctx.cancel() raise @@ -468,15 +467,15 @@ class Portal: if _err: if ctx._cancel_called: - log.warning( + log.cancel( f'Context {fn_name} cancelled by caller with\n{_err}' ) elif _err is not None: - log.warning( + log.cancel( f'Context {fn_name} cancelled by callee with\n{_err}' ) else: - log.info( + log.runtime( f'Context {fn_name} returned ' f'value from callee `{result}`' ) diff --git a/tractor/_root.py b/tractor/_root.py index b153755..7731488 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -185,11 +185,11 @@ async def open_root_actor( raise finally: - logger.info("Shutting down root actor") + logger.cancel("Shutting down root actor") await actor.cancel() finally: _state._current_actor = None - logger.info("Root actor terminated") + logger.runtime("Root actor terminated") def run( diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 2477531..6ee264c 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -4,10 +4,10 @@ Message stream types and APIs. """ from __future__ import annotations import inspect -from contextlib import contextmanager, asynccontextmanager +from contextlib import asynccontextmanager from dataclasses import dataclass from typing import ( - Any, Iterator, Optional, Callable, + Any, Optional, Callable, AsyncGenerator, Dict, AsyncIterator ) @@ -153,7 +153,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): rx_chan = self._rx_chan if rx_chan._closed: - log.warning(f"{self} is already closed") + log.cancel(f"{self} is already closed") # this stream has already been closed so silently succeed as # per ``trio.AsyncResource`` semantics. @@ -367,7 +367,7 @@ class Context: ''' side = 'caller' if self._portal else 'callee' - log.warning(f'Cancelling {side} side of context to {self.chan.uid}') + log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') self._cancel_called = True @@ -380,7 +380,7 @@ class Context: cid = self.cid with trio.move_on_after(0.5) as cs: cs.shield = True - log.warning( + log.cancel( f"Cancelling stream {cid} to " f"{self._portal.channel.uid}") @@ -395,11 +395,11 @@ class Context: # some other network error occurred. # if not self._portal.channel.connected(): if not self.chan.connected(): - log.warning( + log.cancel( "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") else: - log.warning( + log.cancel( "Timed out on cancelling remote task " f"{cid} for {self._portal.channel.uid}") else: @@ -521,9 +521,8 @@ class Context: except KeyError: if 'yield' in msg: - # far end task is still streaming to us.. - log.warning(f'Remote stream deliverd {msg}') - # do disard + # far end task is still streaming to us so discard + log.warning(f'Discarding stream delivered {msg}') continue elif 'stop' in msg: diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 17e7838..e29bf5e 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -168,7 +168,7 @@ class ActorNursery: """ self.cancelled = True - log.warning(f"Cancelling nursery in {self._actor.uid}") + log.cancel(f"Cancelling nursery in {self._actor.uid}") with trio.move_on_after(3) as cs: async with trio.open_nursery() as nursery: @@ -320,7 +320,7 @@ async def _open_and_supervise_one_cancels_all_nursery( ) or ( is_multi_cancelled(err) ): - log.warning( + log.cancel( f"Nursery for {current_actor().uid} " f"was cancelled with {etype}") else: @@ -357,7 +357,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # ".run_in_actor()" actors then we also want to cancel all # remaining sub-actors (due to our lone strategy: # one-cancels-all). - log.warning(f"Nursery cancelling due to {err}") + log.cancel(f"Nursery cancelling due to {err}") if anursery._children: with trio.CancelScope(shield=True): await anursery.cancel() diff --git a/tractor/log.py b/tractor/log.py index 164e76b..bcba228 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -31,6 +31,7 @@ DATE_FORMAT = '%b %d %H:%M:%S' LEVELS = { 'TRANSPORT': 5, 'RUNTIME': 15, + 'CANCEL': 16, 'PDB': 500, } @@ -40,6 +41,7 @@ STD_PALETTE = { 'PDB': 'white', 'WARNING': 'yellow', 'INFO': 'green', + 'CANCEL': 'yellow', 'RUNTIME': 'white', 'DEBUG': 'white', 'TRANSPORT': 'cyan', @@ -68,6 +70,12 @@ class StackLevelAdapter(logging.LoggerAdapter): ) -> None: return self.log(15, msg) + def cancel( + self, + msg: str, + ) -> None: + return self.log(16, msg) + def pdb( self, msg: str, From 1f0cc156753916b1245eba0a38fdd21a7b0a3e3a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Oct 2021 15:53:44 -0400 Subject: [PATCH 5/6] Just set flag for use-after-closed service nursery calls --- tractor/_actor.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 793a678..6aafa11 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -612,6 +612,8 @@ class Actor: # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! msg = None + nursery_cancelled_before_task: bool = False + log.runtime(f"Entering msg loop for {chan} from {chan.uid}") try: with trio.CancelScope(shield=shield) as loop_cs: @@ -692,10 +694,16 @@ class Actor: # spin up a task for the requested function log.runtime(f"Spawning task for {func}") assert self._service_n - cs = await self._service_n.start( - partial(_invoke, self, cid, chan, func, kwargs), - name=funcname, - ) + try: + cs = await self._service_n.start( + partial(_invoke, self, cid, chan, func, kwargs), + name=funcname, + ) + except RuntimeError: + # avoid reporting a benign race condition + # during actor runtime teardown. + nursery_cancelled_before_task = True + # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) if func != self.cancel: @@ -741,14 +749,12 @@ class Actor: log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') except (Exception, trio.MultiError) as err: - if ( - isinstance(err, RuntimeError) and - self._service_n.cancel_scope.cancel_called - ): + if nursery_cancelled_before_task: + sn = self._service_n + assert sn and sn.cancel_scope.cancel_called log.cancel( f'Service nursery cancelled before it handled {funcname}' ) - else: # ship any "internal" exception (i.e. one from internal # machinery not from an rpc task) to parent From 6cda17436ad8ba36a34da745d92a22ccdde0b065 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 11:47:06 -0400 Subject: [PATCH 6/6] Add nooz --- newsfragments/243.rst | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 newsfragments/243.rst diff --git a/newsfragments/243.rst b/newsfragments/243.rst new file mode 100644 index 0000000..498377b --- /dev/null +++ b/newsfragments/243.rst @@ -0,0 +1,9 @@ +Add a custom 'CANCEL' log level and use through runtime. + +In order to reduce log messages and also start toying with the idea of +"application layer" oriented tracing, we added this new level just above +'runtime' but just below 'info'. It is intended to be used solely for +cancellation and teardown related messages. Included are some small +overrides to the stdlib's ``logging.LoggerAdapter`` to passthrough the +correct stack frame to show when one of the custom level methods is +used.