forked from goodboy/tractor
1
0
Fork 0

Merge pull request #243 from goodboy/less_logging

Less logging, add a `CANCEL` log level
pubsub_startup_response_msg
goodboy 2021-10-14 13:37:28 -04:00 committed by GitHub
commit dfeebd6382
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 158 additions and 91 deletions

View File

@ -0,0 +1,9 @@
Add a custom 'CANCEL' log level and use through runtime.
In order to reduce log messages and also start toying with the idea of
"application layer" oriented tracing, we added this new level just above
'runtime' but just below 'info'. It is intended to be used solely for
cancellation and teardown related messages. Included are some small
overrides to the stdlib's ``logging.LoggerAdapter`` to passthrough the
correct stack frame to show when one of the custom level methods is
used.

View File

@ -1,5 +1,6 @@
"""
Actor primitives and helpers
"""
from collections import defaultdict
from functools import partial
@ -52,7 +53,8 @@ async def _invoke(
Union[trio.CancelScope, BaseException]
] = trio.TASK_STATUS_IGNORED,
):
'''Invoke local func and deliver result(s) over provided channel.
'''
Invoke local func and deliver result(s) over provided channel.
'''
__tracebackhide__ = True
@ -127,7 +129,7 @@ async def _invoke(
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
log.debug(f"Finished iterating {coro}")
log.runtime(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
@ -195,8 +197,6 @@ async def _invoke(
if not is_multi_cancelled(err):
log.exception("Actor crashed:")
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
@ -218,8 +218,8 @@ async def _invoke(
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
log.exception("Actor crashed:")
if not entered_debug:
log.exception("Actor crashed:")
# always ship errors back to caller
err_msg = pack_error(err, tb=tb)
@ -318,7 +318,10 @@ class Actor:
# @dataclass once we get py3.7
self.loglevel = loglevel
self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None
self._arb_addr = (
str(arbiter_addr[0]),
int(arbiter_addr[1])
) if arbiter_addr else None
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -356,10 +359,10 @@ class Actor:
"""Wait for a connection back from a spawned actor with a given
``uid``.
"""
log.debug(f"Waiting for peer {uid} to connect")
log.runtime(f"Waiting for peer {uid} to connect")
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f"{uid} successfully connected back to us")
log.runtime(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]
def load_modules(self) -> None:
@ -383,7 +386,7 @@ class Actor:
# XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath))
log.debug(f"Attempting to import {modpath}@{filepath}")
log.runtime(f"Attempting to import {modpath}@{filepath}")
mod = importlib.import_module(modpath)
self._mods[modpath] = mod
if modpath == '__main__':
@ -452,7 +455,7 @@ class Actor:
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
# async-rpc calls.
log.debug(f"Waking channel waiters {event.statistics()}")
log.runtime(f"Waking channel waiters {event.statistics()}")
# Alert any task waiting on this connection to come up
event.set()
@ -489,19 +492,19 @@ class Actor:
# await send_chan.aclose()
# Drop ref to channel so it can be gc-ed and disconnected
log.debug(f"Releasing channel {chan} from {chan.uid}")
log.runtime(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.debug(f"No more channels for {chan.uid}")
log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None)
log.debug(f"Peers is {self._peers}")
log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.debug("Signalling no more peer channels")
log.runtime("Signalling no more peer channels")
# # XXX: is this necessary (GC should do it?)
if chan.connected():
@ -510,7 +513,7 @@ class Actor:
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.debug(f"Disconnecting channel {chan}")
log.runtime(f"Disconnecting channel {chan}")
try:
# send a msg loop terminate sentinel
await chan.send(None)
@ -540,12 +543,12 @@ class Actor:
# if ctx:
# ctx._error_from_remote_msg(msg)
# log.debug(f"{send_chan} was terminated at remote end")
# log.runtime(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped
# return await send_chan.aclose()
try:
log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}")
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
# maintain backpressure
await send_chan.send(msg)
@ -567,7 +570,7 @@ class Actor:
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
log.debug(f"Getting result queue for {actorid} cid {cid}")
log.runtime(f"Getting result queue for {actorid} cid {cid}")
try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError:
@ -592,7 +595,7 @@ class Actor:
cid = str(uuid.uuid4())
assert chan.uid
send_chan, recv_chan = self.get_memchans(chan.uid, cid)
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return cid, recv_chan
@ -609,7 +612,9 @@ class Actor:
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
msg = None
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
nursery_cancelled_before_task: bool = False
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try:
with trio.CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
@ -622,14 +627,14 @@ class Actor:
if msg is None: # loop terminate sentinel
log.debug(
log.cancel(
f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks.copy():
if channel is chan:
await self._cancel_task(cid, channel)
log.debug(
log.runtime(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
@ -643,7 +648,7 @@ class Actor:
# deliver response to local caller/waiter
await self._push_result(chan, cid, msg)
log.debug(
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
@ -661,7 +666,7 @@ class Actor:
chan._exc = exc
raise exc
log.debug(
log.runtime(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
@ -687,12 +692,18 @@ class Actor:
continue
# spin up a task for the requested function
log.debug(f"Spawning task for {func}")
log.runtime(f"Spawning task for {func}")
assert self._service_n
cs = await self._service_n.start(
partial(_invoke, self, cid, chan, func, kwargs),
name=funcname,
)
try:
cs = await self._service_n.start(
partial(_invoke, self, cid, chan, func, kwargs),
name=funcname,
)
except RuntimeError:
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task = True
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
if func != self.cancel:
@ -718,11 +729,11 @@ class Actor:
loop_cs.cancel()
break
log.debug(
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
else:
# channel disconnect
log.debug(
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
await self.cancel_rpc_tasks(chan)
@ -735,15 +746,21 @@ class Actor:
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}')
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
log.exception("Actor errored:")
if self._parent_chan:
await self._parent_chan.send(pack_error(err))
if nursery_cancelled_before_task:
sn = self._service_n
assert sn and sn.cancel_scope.cancel_called
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)
else:
# ship any "internal" exception (i.e. one from internal
# machinery not from an rpc task) to parent
log.exception("Actor errored:")
if self._parent_chan:
await self._parent_chan.send(pack_error(err))
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
@ -751,12 +768,12 @@ class Actor:
except trio.Cancelled:
# debugging only
log.debug(f"Msg loop was cancelled for {chan}")
log.runtime(f"Msg loop was cancelled for {chan}")
raise
finally:
# msg debugging for when he machinery is brokey
log.debug(
log.runtime(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
@ -782,7 +799,7 @@ class Actor:
# Receive runtime state from our parent
parent_data: dict[str, Any]
parent_data = await chan.recv()
log.debug(
log.runtime(
"Received state from parent:\n"
f"{parent_data}"
)
@ -791,7 +808,7 @@ class Actor:
parent_data.pop('bind_port'),
)
rvs = parent_data.pop('_runtime_vars')
log.debug(f"Runtime vars are: {rvs}")
log.runtime(f"Runtime vars are: {rvs}")
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
@ -895,7 +912,7 @@ class Actor:
_state._runtime_vars['_root_mailbox'] = accept_addr
# Register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
log.runtime(f"Registering {self} for role `{self.name}`")
assert isinstance(self._arb_addr, tuple)
async with get_arbiter(*self._arb_addr) as arb_portal:
@ -923,9 +940,9 @@ class Actor:
shield=True,
)
)
log.info("Waiting on service nursery to complete")
log.info("Service nursery complete")
log.info("Waiting on root nursery to complete")
log.runtime("Waiting on service nursery to complete")
log.runtime("Service nursery complete")
log.runtime("Waiting on root nursery to complete")
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
@ -959,11 +976,11 @@ class Actor:
raise
finally:
log.info("Root nursery complete")
log.runtime("Root nursery complete")
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
log.warning("Closing all actor lifetime contexts")
log.cancel("Closing all actor lifetime contexts")
_lifetime_stack.close()
# Unregister actor from the arbiter
@ -993,13 +1010,13 @@ class Actor:
if any(
chan.connected() for chan in chain(*self._peers.values())
):
log.debug(
log.runtime(
f"Waiting for remaining peers {self._peers} to clear")
with trio.CancelScope(shield=True):
await self._no_more_peers.wait()
log.debug("All peer channels are complete")
log.runtime("All peer channels are complete")
log.debug("Runtime completed")
log.runtime("Runtime completed")
async def _serve_forever(
self,
@ -1029,7 +1046,7 @@ class Actor:
host=accept_host,
)
)
log.debug(
log.runtime(
"Started tcp server(s) on"
f" {[getattr(l, 'socket', 'unknown socket') for l in l]}")
self._listeners.extend(l)
@ -1058,7 +1075,7 @@ class Actor:
spawning new rpc tasks
- return control the parent channel message loop
"""
log.warning(f"{self.uid} is trying to cancel")
log.cancel(f"{self.uid} is trying to cancel")
self._cancel_called = True
# cancel all ongoing rpc tasks
@ -1068,7 +1085,7 @@ class Actor:
# with the root actor in this tree
dbcs = _debug._debugger_request_cs
if dbcs is not None:
log.debug("Cancelling active debugger request")
log.cancel("Cancelling active debugger request")
dbcs.cancel()
# kill all ongoing tasks
@ -1082,7 +1099,7 @@ class Actor:
if self._service_n:
self._service_n.cancel_scope.cancel()
log.warning(f"{self.uid} was sucessfullly cancelled")
log.cancel(f"{self.uid} was sucessfullly cancelled")
self._cancel_complete.set()
return True
@ -1109,10 +1126,10 @@ class Actor:
# be cancelled was indeed spawned by a request from this channel
scope, func, is_complete = self._rpc_tasks[(chan, cid)]
except KeyError:
log.warning(f"{cid} has already completed/terminated?")
log.cancel(f"{cid} has already completed/terminated?")
return
log.debug(
log.cancel(
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
@ -1124,12 +1141,12 @@ class Actor:
scope.cancel()
# wait for _invoke to mark the task complete
log.debug(
log.runtime(
f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
await is_complete.wait()
log.debug(
log.runtime(
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
@ -1141,7 +1158,7 @@ class Actor:
registered for each.
"""
tasks = self._rpc_tasks
log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid) in tasks.copy():
if only_chan is not None:
if only_chan != chan:
@ -1150,7 +1167,7 @@ class Actor:
# TODO: this should really done in a nursery batch
await self._cancel_task(cid, chan)
log.info(
log.cancel(
f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait()
@ -1159,7 +1176,7 @@ class Actor:
preventing any new inbound connections from being established.
"""
if self._server_n:
log.debug("Shutting down channel server")
log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel()
@property

View File

@ -1,5 +1,6 @@
"""
Portal api
"""
import importlib
import inspect
@ -173,7 +174,7 @@ class Portal:
# terminate all locally running async generator
# IPC calls
if self._streams:
log.warning(
log.cancel(
f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy():
try:
@ -196,19 +197,17 @@ class Portal:
"""Cancel the actor on the other end of this portal.
"""
if not self.channel.connected():
log.warning("This portal is already closed can't cancel")
log.cancel("This portal is already closed can't cancel")
return False
await self._cancel_streams()
log.warning(
log.cancel(
f"Sending actor cancel request to {self.channel.uid} on "
f"{self.channel}")
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield
# with trio.CancelScope() as cancel_scope:
# with trio.CancelScope(shield=True) as cancel_scope:
with trio.move_on_after(0.5) as cancel_scope:
cancel_scope.shield = True
@ -216,13 +215,13 @@ class Portal:
return True
if cancel_scope.cancelled_caught:
log.warning(f"May have failed to cancel {self.channel.uid}")
log.cancel(f"May have failed to cancel {self.channel.uid}")
# if we get here some weird cancellation case happened
return False
except trio.ClosedResourceError:
log.warning(
log.cancel(
f"{self.channel} for {self.channel.uid} was already closed?")
return False
@ -347,7 +346,7 @@ class Portal:
except trio.ClosedResourceError:
# if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed.
log.warning(f'Context {ctx} was already closed?')
log.cancel(f'Context {ctx} was already closed?')
# XXX: should this always be done?
# await recv_chan.aclose()
@ -446,7 +445,7 @@ class Portal:
_err = err
# the context cancels itself on any cancel
# causing error.
log.error(f'Context {ctx} sending cancel to far end')
log.cancel(f'Context {ctx} sending cancel to far end')
with trio.CancelScope(shield=True):
await ctx.cancel()
raise
@ -468,15 +467,15 @@ class Portal:
if _err:
if ctx._cancel_called:
log.warning(
log.cancel(
f'Context {fn_name} cancelled by caller with\n{_err}'
)
elif _err is not None:
log.warning(
log.cancel(
f'Context {fn_name} cancelled by callee with\n{_err}'
)
else:
log.info(
log.runtime(
f'Context {fn_name} returned '
f'value from callee `{result}`'
)

View File

@ -185,11 +185,11 @@ async def open_root_actor(
raise
finally:
logger.info("Shutting down root actor")
logger.cancel("Shutting down root actor")
await actor.cancel()
finally:
_state._current_actor = None
logger.info("Root actor terminated")
logger.runtime("Root actor terminated")
def run(

View File

@ -4,10 +4,10 @@ Message stream types and APIs.
"""
from __future__ import annotations
import inspect
from contextlib import contextmanager, asynccontextmanager
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import (
Any, Iterator, Optional, Callable,
Any, Optional, Callable,
AsyncGenerator, Dict,
AsyncIterator
)
@ -153,7 +153,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
rx_chan = self._rx_chan
if rx_chan._closed:
log.warning(f"{self} is already closed")
log.cancel(f"{self} is already closed")
# this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics.
@ -367,7 +367,7 @@ class Context:
'''
side = 'caller' if self._portal else 'callee'
log.warning(f'Cancelling {side} side of context to {self.chan.uid}')
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
@ -380,7 +380,7 @@ class Context:
cid = self.cid
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.warning(
log.cancel(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
@ -395,11 +395,11 @@ class Context:
# some other network error occurred.
# if not self._portal.channel.connected():
if not self.chan.connected():
log.warning(
log.cancel(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
else:
log.warning(
log.cancel(
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
else:
@ -521,9 +521,8 @@ class Context:
except KeyError:
if 'yield' in msg:
# far end task is still streaming to us..
log.warning(f'Remote stream deliverd {msg}')
# do disard
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
elif 'stop' in msg:

View File

@ -168,7 +168,7 @@ class ActorNursery:
"""
self.cancelled = True
log.warning(f"Cancelling nursery in {self._actor.uid}")
log.cancel(f"Cancelling nursery in {self._actor.uid}")
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery:
@ -320,7 +320,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) or (
is_multi_cancelled(err)
):
log.warning(
log.cancel(
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
@ -357,7 +357,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}")
log.cancel(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()

View File

@ -2,7 +2,6 @@
Log like a forester!
"""
import sys
from functools import partial
import logging
import colorlog # type: ignore
from typing import Optional
@ -32,6 +31,7 @@ DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = {
'TRANSPORT': 5,
'RUNTIME': 15,
'CANCEL': 16,
'PDB': 500,
}
@ -41,6 +41,7 @@ STD_PALETTE = {
'PDB': 'white',
'WARNING': 'yellow',
'INFO': 'green',
'CANCEL': 'yellow',
'RUNTIME': 'white',
'DEBUG': 'white',
'TRANSPORT': 'cyan',
@ -69,12 +70,54 @@ class StackLevelAdapter(logging.LoggerAdapter):
) -> None:
return self.log(15, msg)
def cancel(
self,
msg: str,
) -> None:
return self.log(16, msg)
def pdb(
self,
msg: str,
) -> None:
return self.log(500, msg)
def log(self, level, msg, *args, **kwargs):
"""
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
"""
if self.isEnabledFor(level):
# msg, kwargs = self.process(msg, kwargs)
self._log(level, msg, args, **kwargs)
# LOL, the stdlib doesn't allow passing through ``stacklevel``..
def _log(
self,
level,
msg,
args,
exc_info=None,
extra=None,
stack_info=False,
# XXX: bit we added to show fileinfo from actual caller.
# this level then ``.log()`` then finally the caller's level..
stacklevel=3,
):
"""
Low-level log implementation, proxied to allow nested logger adapters.
"""
return self.logger._log(
level,
msg,
args,
exc_info=exc_info,
extra=self.extra,
stack_info=stack_info,
stacklevel=stacklevel,
)
def get_logger(