forked from goodboy/tractor
De-noise warnings, add a 'cancel' log level
Now that we're on our way to a (somewhat) serious beta release I think it's about time to start de-noising the logging emissions. Since we're trying out this approach of "stack layer oriented" log levels, I figured this is a good time to move most of the "warnings" to what they should be: cancellation monitoring status messages. The level is set to 16 which is just above our "runtime" level but just below the traditional "info" level. I think this will be a decent approach since usually if you're confused about why your `tractor` app is behaving unlike you expect, it's 90% of the time going to be to do with cancellation or error propagation. This this setup a user can specify the 'cancel' level and see all the msgs pertaining to both actor and task-in-actor cancellation mechanics.less_logging
parent
4d5a5c147a
commit
10f66e5141
|
@ -318,7 +318,10 @@ class Actor:
|
|||
# @dataclass once we get py3.7
|
||||
self.loglevel = loglevel
|
||||
|
||||
self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None
|
||||
self._arb_addr = (
|
||||
str(arbiter_addr[0]),
|
||||
int(arbiter_addr[1])
|
||||
) if arbiter_addr else None
|
||||
|
||||
# marked by the process spawning backend at startup
|
||||
# will be None for the parent most process started manually
|
||||
|
@ -622,7 +625,7 @@ class Actor:
|
|||
|
||||
if msg is None: # loop terminate sentinel
|
||||
|
||||
log.runtime(
|
||||
log.cancel(
|
||||
f"Cancelling all tasks for {chan} from {chan.uid}")
|
||||
|
||||
for (channel, cid) in self._rpc_tasks.copy():
|
||||
|
@ -738,12 +741,20 @@ class Actor:
|
|||
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
if (
|
||||
isinstance(err, RuntimeError) and
|
||||
self._service_n.cancel_scope.cancel_called
|
||||
):
|
||||
log.cancel(
|
||||
f'Service nursery cancelled before it handled {funcname}'
|
||||
)
|
||||
|
||||
# ship any "internal" exception (i.e. one from internal machinery
|
||||
# not from an rpc task) to parent
|
||||
log.exception("Actor errored:")
|
||||
if self._parent_chan:
|
||||
await self._parent_chan.send(pack_error(err))
|
||||
else:
|
||||
# ship any "internal" exception (i.e. one from internal
|
||||
# machinery not from an rpc task) to parent
|
||||
log.exception("Actor errored:")
|
||||
if self._parent_chan:
|
||||
await self._parent_chan.send(pack_error(err))
|
||||
|
||||
# if this is the `MainProcess` we expect the error broadcasting
|
||||
# above to trigger an error at consuming portal "checkpoints"
|
||||
|
@ -923,9 +934,9 @@ class Actor:
|
|||
shield=True,
|
||||
)
|
||||
)
|
||||
log.info("Waiting on service nursery to complete")
|
||||
log.info("Service nursery complete")
|
||||
log.info("Waiting on root nursery to complete")
|
||||
log.runtime("Waiting on service nursery to complete")
|
||||
log.runtime("Service nursery complete")
|
||||
log.runtime("Waiting on root nursery to complete")
|
||||
|
||||
# Blocks here as expected until the root nursery is
|
||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||
|
@ -959,11 +970,11 @@ class Actor:
|
|||
raise
|
||||
|
||||
finally:
|
||||
log.info("Root nursery complete")
|
||||
log.runtime("Root nursery complete")
|
||||
|
||||
# tear down all lifetime contexts if not in guest mode
|
||||
# XXX: should this just be in the entrypoint?
|
||||
log.warning("Closing all actor lifetime contexts")
|
||||
log.cancel("Closing all actor lifetime contexts")
|
||||
_lifetime_stack.close()
|
||||
|
||||
# Unregister actor from the arbiter
|
||||
|
@ -1058,7 +1069,7 @@ class Actor:
|
|||
spawning new rpc tasks
|
||||
- return control the parent channel message loop
|
||||
"""
|
||||
log.warning(f"{self.uid} is trying to cancel")
|
||||
log.cancel(f"{self.uid} is trying to cancel")
|
||||
self._cancel_called = True
|
||||
|
||||
# cancel all ongoing rpc tasks
|
||||
|
@ -1068,7 +1079,7 @@ class Actor:
|
|||
# with the root actor in this tree
|
||||
dbcs = _debug._debugger_request_cs
|
||||
if dbcs is not None:
|
||||
log.pdb("Cancelling active debugger request")
|
||||
log.cancel("Cancelling active debugger request")
|
||||
dbcs.cancel()
|
||||
|
||||
# kill all ongoing tasks
|
||||
|
@ -1082,7 +1093,7 @@ class Actor:
|
|||
if self._service_n:
|
||||
self._service_n.cancel_scope.cancel()
|
||||
|
||||
log.warning(f"{self.uid} was sucessfullly cancelled")
|
||||
log.cancel(f"{self.uid} was sucessfullly cancelled")
|
||||
self._cancel_complete.set()
|
||||
return True
|
||||
|
||||
|
@ -1109,10 +1120,10 @@ class Actor:
|
|||
# be cancelled was indeed spawned by a request from this channel
|
||||
scope, func, is_complete = self._rpc_tasks[(chan, cid)]
|
||||
except KeyError:
|
||||
log.warning(f"{cid} has already completed/terminated?")
|
||||
log.cancel(f"{cid} has already completed/terminated?")
|
||||
return
|
||||
|
||||
log.runtime(
|
||||
log.cancel(
|
||||
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
|
||||
f"peer: {chan.uid}\n")
|
||||
|
||||
|
@ -1141,7 +1152,7 @@ class Actor:
|
|||
registered for each.
|
||||
"""
|
||||
tasks = self._rpc_tasks
|
||||
log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||
for (chan, cid) in tasks.copy():
|
||||
if only_chan is not None:
|
||||
if only_chan != chan:
|
||||
|
@ -1150,7 +1161,7 @@ class Actor:
|
|||
# TODO: this should really done in a nursery batch
|
||||
await self._cancel_task(cid, chan)
|
||||
|
||||
log.info(
|
||||
log.cancel(
|
||||
f"Waiting for remaining rpc tasks to complete {tasks}")
|
||||
await self._ongoing_rpc_tasks.wait()
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""
|
||||
Portal api
|
||||
|
||||
"""
|
||||
import importlib
|
||||
import inspect
|
||||
|
@ -173,7 +174,7 @@ class Portal:
|
|||
# terminate all locally running async generator
|
||||
# IPC calls
|
||||
if self._streams:
|
||||
log.warning(
|
||||
log.cancel(
|
||||
f"Cancelling all streams with {self.channel.uid}")
|
||||
for stream in self._streams.copy():
|
||||
try:
|
||||
|
@ -196,19 +197,17 @@ class Portal:
|
|||
"""Cancel the actor on the other end of this portal.
|
||||
"""
|
||||
if not self.channel.connected():
|
||||
log.warning("This portal is already closed can't cancel")
|
||||
log.cancel("This portal is already closed can't cancel")
|
||||
return False
|
||||
|
||||
await self._cancel_streams()
|
||||
|
||||
log.warning(
|
||||
log.cancel(
|
||||
f"Sending actor cancel request to {self.channel.uid} on "
|
||||
f"{self.channel}")
|
||||
try:
|
||||
# send cancel cmd - might not get response
|
||||
# XXX: sure would be nice to make this work with a proper shield
|
||||
# with trio.CancelScope() as cancel_scope:
|
||||
# with trio.CancelScope(shield=True) as cancel_scope:
|
||||
with trio.move_on_after(0.5) as cancel_scope:
|
||||
cancel_scope.shield = True
|
||||
|
||||
|
@ -216,13 +215,13 @@ class Portal:
|
|||
return True
|
||||
|
||||
if cancel_scope.cancelled_caught:
|
||||
log.warning(f"May have failed to cancel {self.channel.uid}")
|
||||
log.cancel(f"May have failed to cancel {self.channel.uid}")
|
||||
|
||||
# if we get here some weird cancellation case happened
|
||||
return False
|
||||
|
||||
except trio.ClosedResourceError:
|
||||
log.warning(
|
||||
log.cancel(
|
||||
f"{self.channel} for {self.channel.uid} was already closed?")
|
||||
return False
|
||||
|
||||
|
@ -347,7 +346,7 @@ class Portal:
|
|||
except trio.ClosedResourceError:
|
||||
# if the far end terminates before we send a cancel the
|
||||
# underlying transport-channel may already be closed.
|
||||
log.warning(f'Context {ctx} was already closed?')
|
||||
log.cancel(f'Context {ctx} was already closed?')
|
||||
|
||||
# XXX: should this always be done?
|
||||
# await recv_chan.aclose()
|
||||
|
@ -446,7 +445,7 @@ class Portal:
|
|||
_err = err
|
||||
# the context cancels itself on any cancel
|
||||
# causing error.
|
||||
log.error(f'Context {ctx} sending cancel to far end')
|
||||
log.cancel(f'Context {ctx} sending cancel to far end')
|
||||
with trio.CancelScope(shield=True):
|
||||
await ctx.cancel()
|
||||
raise
|
||||
|
@ -468,15 +467,15 @@ class Portal:
|
|||
|
||||
if _err:
|
||||
if ctx._cancel_called:
|
||||
log.warning(
|
||||
log.cancel(
|
||||
f'Context {fn_name} cancelled by caller with\n{_err}'
|
||||
)
|
||||
elif _err is not None:
|
||||
log.warning(
|
||||
log.cancel(
|
||||
f'Context {fn_name} cancelled by callee with\n{_err}'
|
||||
)
|
||||
else:
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned '
|
||||
f'value from callee `{result}`'
|
||||
)
|
||||
|
|
|
@ -185,11 +185,11 @@ async def open_root_actor(
|
|||
raise
|
||||
|
||||
finally:
|
||||
logger.info("Shutting down root actor")
|
||||
logger.cancel("Shutting down root actor")
|
||||
await actor.cancel()
|
||||
finally:
|
||||
_state._current_actor = None
|
||||
logger.info("Root actor terminated")
|
||||
logger.runtime("Root actor terminated")
|
||||
|
||||
|
||||
def run(
|
||||
|
|
|
@ -4,10 +4,10 @@ Message stream types and APIs.
|
|||
"""
|
||||
from __future__ import annotations
|
||||
import inspect
|
||||
from contextlib import contextmanager, asynccontextmanager
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
from typing import (
|
||||
Any, Iterator, Optional, Callable,
|
||||
Any, Optional, Callable,
|
||||
AsyncGenerator, Dict,
|
||||
AsyncIterator
|
||||
)
|
||||
|
@ -153,7 +153,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
rx_chan = self._rx_chan
|
||||
|
||||
if rx_chan._closed:
|
||||
log.warning(f"{self} is already closed")
|
||||
log.cancel(f"{self} is already closed")
|
||||
|
||||
# this stream has already been closed so silently succeed as
|
||||
# per ``trio.AsyncResource`` semantics.
|
||||
|
@ -367,7 +367,7 @@ class Context:
|
|||
'''
|
||||
side = 'caller' if self._portal else 'callee'
|
||||
|
||||
log.warning(f'Cancelling {side} side of context to {self.chan.uid}')
|
||||
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
||||
|
||||
self._cancel_called = True
|
||||
|
||||
|
@ -380,7 +380,7 @@ class Context:
|
|||
cid = self.cid
|
||||
with trio.move_on_after(0.5) as cs:
|
||||
cs.shield = True
|
||||
log.warning(
|
||||
log.cancel(
|
||||
f"Cancelling stream {cid} to "
|
||||
f"{self._portal.channel.uid}")
|
||||
|
||||
|
@ -395,11 +395,11 @@ class Context:
|
|||
# some other network error occurred.
|
||||
# if not self._portal.channel.connected():
|
||||
if not self.chan.connected():
|
||||
log.warning(
|
||||
log.cancel(
|
||||
"May have failed to cancel remote task "
|
||||
f"{cid} for {self._portal.channel.uid}")
|
||||
else:
|
||||
log.warning(
|
||||
log.cancel(
|
||||
"Timed out on cancelling remote task "
|
||||
f"{cid} for {self._portal.channel.uid}")
|
||||
else:
|
||||
|
@ -521,9 +521,8 @@ class Context:
|
|||
except KeyError:
|
||||
|
||||
if 'yield' in msg:
|
||||
# far end task is still streaming to us..
|
||||
log.warning(f'Remote stream deliverd {msg}')
|
||||
# do disard
|
||||
# far end task is still streaming to us so discard
|
||||
log.warning(f'Discarding stream delivered {msg}')
|
||||
continue
|
||||
|
||||
elif 'stop' in msg:
|
||||
|
|
|
@ -168,7 +168,7 @@ class ActorNursery:
|
|||
"""
|
||||
self.cancelled = True
|
||||
|
||||
log.warning(f"Cancelling nursery in {self._actor.uid}")
|
||||
log.cancel(f"Cancelling nursery in {self._actor.uid}")
|
||||
with trio.move_on_after(3) as cs:
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
|
@ -320,7 +320,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
) or (
|
||||
is_multi_cancelled(err)
|
||||
):
|
||||
log.warning(
|
||||
log.cancel(
|
||||
f"Nursery for {current_actor().uid} "
|
||||
f"was cancelled with {etype}")
|
||||
else:
|
||||
|
@ -357,7 +357,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
# ".run_in_actor()" actors then we also want to cancel all
|
||||
# remaining sub-actors (due to our lone strategy:
|
||||
# one-cancels-all).
|
||||
log.warning(f"Nursery cancelling due to {err}")
|
||||
log.cancel(f"Nursery cancelling due to {err}")
|
||||
if anursery._children:
|
||||
with trio.CancelScope(shield=True):
|
||||
await anursery.cancel()
|
||||
|
|
|
@ -31,6 +31,7 @@ DATE_FORMAT = '%b %d %H:%M:%S'
|
|||
LEVELS = {
|
||||
'TRANSPORT': 5,
|
||||
'RUNTIME': 15,
|
||||
'CANCEL': 16,
|
||||
'PDB': 500,
|
||||
}
|
||||
|
||||
|
@ -40,6 +41,7 @@ STD_PALETTE = {
|
|||
'PDB': 'white',
|
||||
'WARNING': 'yellow',
|
||||
'INFO': 'green',
|
||||
'CANCEL': 'yellow',
|
||||
'RUNTIME': 'white',
|
||||
'DEBUG': 'white',
|
||||
'TRANSPORT': 'cyan',
|
||||
|
@ -68,6 +70,12 @@ class StackLevelAdapter(logging.LoggerAdapter):
|
|||
) -> None:
|
||||
return self.log(15, msg)
|
||||
|
||||
def cancel(
|
||||
self,
|
||||
msg: str,
|
||||
) -> None:
|
||||
return self.log(16, msg)
|
||||
|
||||
def pdb(
|
||||
self,
|
||||
msg: str,
|
||||
|
|
Loading…
Reference in New Issue