Compare commits

..

13 Commits

Author SHA1 Message Date
Tyler Goodlet 0bb66f5755 Mask not-yet-existing `.devx.pformat` import 2025-03-20 21:36:56 -04:00
Tyler Goodlet ee1596da4d Handle cpython builds with `libedit` for `readline`
Since `uv`'s cpython distributions are built this way `pdbp`'s tab
completion was breaking (as was vi-mode). This adds a new
`.devx._enable_readline_feats()` import hook which checks for the
appropriate library and applies settings accordingly.
2025-03-20 21:36:56 -04:00
Tyler Goodlet 9262beb0c4 Add in some dev deps for @goodboy
Namely since i use `xonsh` for a main shell, this includes adding it as
well as related tooling. Obvi bump the `uv.lock`.

Some other stuff retained from `poetry` days,
- add usage-comments around various (optional) deps.
- add toml section separator lines.
- go with 2-space indent.
- add comment on `trio>0.27` needed for py3.13+
2025-03-20 21:36:56 -04:00
Tyler Goodlet d830899a9b Disable invalid line in `ruff` config? 2025-03-20 21:36:56 -04:00
Tyler Goodlet 06d47d9376 Add a `ruff.toml` with ignore set taken from old `pyproject.toml` content 2025-03-20 21:36:56 -04:00
Guillermo Rodriguez 11e1c8d14f Migrate to uv using "uvx migrate-to-uv", use msgspec from git due to python 3.13 compat 2025-03-20 21:36:56 -04:00
Tyler Goodlet 0f3d714d38 Provision for infected-`asyncio` debug mode support
It's **almost** there, we're just missing the final translation code to
get from an `asyncio` side task to be able to call
`.devx._debug..wait_for_parent_stdin_hijack()` to do root actor TTY
locking. Then we just need to ensure internals also do the right thing
with `greenback()` for equivalent sync `breakpoint()` style pause
points.

Since i'm deferring this until later, tossing in some xfail tests to
`test_infected_asyncio` with TODOs for the needed implementation as well
as eventual test org.

By "provision" it means we add:
- `greenback` init block to `_run_asyncio_task()` when debug mode is
  enabled (but which will currently rte when `asyncio` is detected)
  using `.bestow_portal()` around the `asyncio.Task`.
- a call to `_debug.maybe_init_greenback()` in the `run_as_asyncio_guest()`
  guest-mode entry point.
- as part of `._debug.Lock.is_main_trio_thread()` whenever the async-lib
  is not 'trio' error lock the backend name (which is obvi `'asyncio'`
  in this use case).
2025-03-20 21:36:42 -04:00
Tyler Goodlet ce89cabe1f Drop extra newline from log msg 2025-03-20 21:36:42 -04:00
Tyler Goodlet 3b8119ff81 Change all `| None` -> `|None` in `._runtime` 2025-03-20 21:36:42 -04:00
Tyler Goodlet 039de4dce7 Add todo-notes for hiding `@acm` frames
In the particular case of the `Portal.open_context().__aexit__()` frame,
due to usage of `contextlib.asynccontextmanager`, we can't easily hook
into monkeypatching a `__tracebackhide__` set nor catch-n-reraise around
the block exit without defining our own `.__aexit__()` impl. Thus, it's
prolly most sane to do something with an override of
`contextlib._AsyncGeneratorContextManager` or the public exposed
`AsyncContextDecorator` (which uses the former internally right?).

Also fixup some old `._invoke` mod paths in comments and just show
`str(eoc)` in `.open_stream().__aexit__()` terminated-by-EoC log msg
since the `repr()` form won't pprint the IPC msg nicely..
2025-03-20 21:36:42 -04:00
Tyler Goodlet 8dc804279c Tweak main thread predicate to ensure `trio.run()`
Change the name to `Lock.is_main_trio_thread()` indicating that when
`True` the thread is both the main one **and** the one that called
`trio.run()`. Add a todo for just copying the
`trio._util.is_main_thread()` impl (since it's private / may change) and
some brief notes about potential usage of
`trio.from_thread.check_cancelled()` to detect non-`.to_thread` thread
spawns.
2025-03-20 21:36:42 -04:00
Tyler Goodlet 54b9a37430 Refine and test `tractor.pause_from_sync()`
Now supports use from any `trio` task, any sync thread started with
`trio.to_thread.run_sync()` AND also via `breakpoint()` builtin API!
The only bit missing now is support for `asyncio` tasks when in infected
mode.. Bo

`greenback` setup/API adjustments:
- move `._rpc.maybe_import_gb()` to -> `devx._debug` and factor out the cached
  import checking into a sync func whilst placing the async `.ensure_portal()`
  bootstrapping into a new async `maybe_init_greenback()`.
- use the new init-er func inside `open_root_actor()` with the output
  predicating whether we override the `breakpoint()` hook.

core `devx._debug` implementation deatz:
- make `mk_mpdb()` only return the `pdp.Pdb` subtype instance since
  the sigint unshielding func is now accessible from the `Lock`
  singleton from anywhere.

- add non-main thread support (at least for `trio.to_thread` use cases)
  to our `Lock` with a new `.is_trio_thread()` predicate that delegates
  directly to `trio`'s internal version.

- do `Lock.is_trio_thread()` checks inside any methods which require
  special provisions when invoked from a non-main `trio` thread:
  - `.[un]shield_sigint()` methods since `signal.signal` usage is only
    allowed from cpython's main thread.
  - `.release()` since `trio.StrictFIFOLock` can only be called from
    a `trio` task.

- rework `.pause_from_sync()` itself to directly call `._set_trace()`
  and don't bother with `greenback._await()` when we're already calling
  it from a `.to_thread.run_sync()` thread, oh and try to use the
  thread/task name when setting `Lock.local_task_in_debug`.

- make it an RTE for now if you try to use `.pause_from_sync()` from any
  infected-`asyncio` task, but support is (hopefully) coming soon!

For testing we add a new `test_debugger.py::test_pause_from_sync()`
which includes a ctrl-c parametrization around the
`examples/debugging/sync_bp.py` script which includes all currently
supported/working usages:
- `tractor.pause_from_sync()`.
- via `breakpoint()` overload.
- from a `trio.to_thread.run_sync()` spawn.
2025-03-20 21:36:42 -04:00
Tyler Goodlet bc9a9a66bc First draft workin minus non-main-thread usage! 2025-03-20 21:36:42 -04:00
18 changed files with 61 additions and 410 deletions

View File

@ -4,15 +4,9 @@ import trio
async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
try:
while True:
yield 'yo'
await tractor.breakpoint()
except BaseException:
tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!'
)
raise
async def name_error():
@ -25,7 +19,7 @@ async def main():
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='cancel',
loglevel='error',
) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -45,7 +45,6 @@ async def spawn_until(depth=0):
)
# TODO: notes on the new boxed-relayed errors through proxy actors
async def main():
"""The main ``tractor`` routine.

View File

@ -38,7 +38,6 @@ async def main():
"""
async with tractor.open_nursery(
debug_mode=True,
# loglevel='runtime',
) as n:
# Spawn both actors, don't bother with collecting results

View File

@ -23,6 +23,5 @@ async def main():
n.start_soon(debug_actor.run, die)
n.start_soon(crash_boi.run, die)
if __name__ == '__main__':
trio.run(main)

View File

@ -2,13 +2,10 @@ import trio
import tractor
async def main(
registry_addrs: tuple[str, int]|None = None
):
async def main():
async with tractor.open_root_actor(
debug_mode=True,
# loglevel='runtime',
):
while True:
await tractor.breakpoint()

View File

@ -3,20 +3,17 @@ import tractor
async def breakpoint_forever():
'''
Indefinitely re-enter debugger in child actor.
'''
"""Indefinitely re-enter debugger in child actor.
"""
while True:
await trio.sleep(0.1)
await tractor.pause()
await tractor.breakpoint()
async def main():
async with tractor.open_nursery(
debug_mode=True,
loglevel='cancel',
) as n:
portal = await n.run_in_actor(

View File

@ -3,26 +3,16 @@ import tractor
async def name_error():
getattr(doggypants) # noqa (on purpose)
getattr(doggypants)
async def main():
async with tractor.open_nursery(
debug_mode=True,
# loglevel='transport',
) as an:
) as n:
# TODO: ideally the REPL arrives at this frame in the parent,
# ABOVE the @api_frame of `Portal.run_in_actor()` (which
# should eventually not even be a portal method ... XD)
# await tractor.pause()
p: tractor.Portal = await an.run_in_actor(name_error)
# with this style, should raise on this line
await p.result()
# with this alt style should raise at `open_nusery()`
# return await p.result()
portal = await n.run_in_actor(name_error)
await portal.result()
if __name__ == '__main__':

View File

@ -7,7 +7,7 @@ def sync_pause(
error: bool = False,
):
if use_builtin:
breakpoint(hide_tb=False)
breakpoint()
else:
tractor.pause_from_sync()
@ -20,20 +20,18 @@ def sync_pause(
async def start_n_sync_pause(
ctx: tractor.Context,
):
actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
# sync to requesting peer
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
print(f'entering SYNC PAUSE in {actor.uid}')
sync_pause()
print(f'back from SYNC PAUSE in {actor.uid}')
async def main() -> None:
async with tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True,
) as an:

View File

@ -36,7 +36,6 @@ def parse_ipaddr(arg):
if __name__ == "__main__":
__tracebackhide__: bool = True
parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid)

View File

@ -106,7 +106,6 @@ def _trio_main(
Entry point for a `trio_run_in_process` subactor.
'''
__tracebackhide__: bool = True
_state._current_actor = actor
trio_main = partial(
async_main,

View File

@ -22,10 +22,9 @@ from contextlib import asynccontextmanager
from functools import partial
import importlib
import logging
import os
import signal
import sys
from typing import Callable
import os
import warnings
@ -79,8 +78,6 @@ async def open_root_actor(
# enables the multi-process debugger support
debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False,
# internal logging
loglevel: str|None = None,
@ -102,36 +99,19 @@ async def open_root_actor(
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler: Callable = sys.breakpointhook
if (
await _debug.maybe_init_greenback(
raise_not_found=False,
)
):
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str|None = os.environ.get(
'PYTHONBREAKPOINT',
None,
)
if (
debug_mode
and maybe_enable_greenback
and await _debug.maybe_init_greenback(
raise_not_found=False,
)
):
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug.pause_from_sync'
)
else:
# TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)
sys.breakpointhook = block_bps
# os.environ['PYTHONBREAKPOINT'] = None
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
@ -211,11 +191,7 @@ async def open_root_actor(
assert _log
# TODO: factor this into `.devx._stackscope`!!
if (
debug_mode
and
enable_stack_on_sig
):
if debug_mode:
try:
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
@ -392,8 +368,6 @@ async def open_root_actor(
_state._last_actor_terminated = actor
# restore built-in `breakpoint()` hook state
if debug_mode:
if builtin_bp_handler is not None:
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path

View File

@ -503,7 +503,7 @@ async def trio_proc(
})
# track subactor in current nursery
curr_actor: Actor = current_actor()
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up

View File

@ -30,16 +30,11 @@ if TYPE_CHECKING:
_current_actor: Actor|None = None # type: ignore # noqa
_last_actor_terminated: Actor|None = None
# TODO: mk this a `msgspec.Struct`!
_runtime_vars: dict[str, Any] = {
'_debug_mode': False,
'_is_root': False,
'_root_mailbox': (None, None),
'_registry_addrs': [],
# for `breakpoint()` support
'use_greenback': False,
}

View File

@ -1,177 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Tools for code-object annotation, introspection and mutation
as it pertains to improving the grok-ability of our runtime!
'''
from __future__ import annotations
import inspect
# import msgspec
# from pprint import pformat
from types import (
FrameType,
FunctionType,
MethodType,
# CodeType,
)
from typing import (
# Any,
Callable,
# TYPE_CHECKING,
Type,
)
from tractor.msg import (
pretty_struct,
NamespacePath,
)
# TODO: yeah, i don't love this and we should prolly just
# write a decorator that actually keeps a stupid ref to the func
# obj..
def get_class_from_frame(fr: FrameType) -> (
FunctionType
|MethodType
):
'''
Attempt to get the function (or method) reference
from a given `FrameType`.
Verbatim from an SO:
https://stackoverflow.com/a/2220759
'''
args, _, _, value_dict = inspect.getargvalues(fr)
# we check the first parameter for the frame function is
# named 'self'
if (
len(args)
and
# TODO: other cases for `@classmethod` etc..?)
args[0] == 'self'
):
# in that case, 'self' will be referenced in value_dict
instance: object = value_dict.get('self')
if instance:
# return its class
return getattr(
instance,
'__class__',
None,
)
# return None otherwise
return None
def func_ref_from_frame(
frame: FrameType,
) -> Callable:
func_name: str = frame.f_code.co_name
try:
return frame.f_globals[func_name]
except KeyError:
cls: Type|None = get_class_from_frame(frame)
if cls:
return getattr(
cls,
func_name,
)
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct):
rt_fi: inspect.FrameInfo
call_frame: FrameType
@property
def api_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.rt_fi.frame)
@property
def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
@property
def caller_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.call_frame)
@property
def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.caller_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
def find_caller_info(
dunder_var: str = '__runtimeframe__',
iframes:int = 1,
check_frame_depth: bool = True,
) -> CallerInfo|None:
'''
Scan up the callstack for a frame with a `dunder_var: str` variable
and return the `iframes` frames above it.
By default we scan for a `__runtimeframe__` scope var which
denotes a `tractor` API above which (one frame up) is "user
app code" which "called into" the `tractor` method or func.
TODO: ex with `Portal.open_context()`
'''
# TODO: use this instead?
# https://docs.python.org/3/library/inspect.html#inspect.getouterframes
frames: list[inspect.FrameInfo] = inspect.stack()
for fi in frames:
assert (
fi.function
==
fi.frame.f_code.co_name
)
this_frame: FrameType = fi.frame
dunder_val: int|None = this_frame.f_locals.get(dunder_var)
if dunder_val:
go_up_iframes: int = (
dunder_val # could be 0 or `True` i guess?
or
iframes
)
rt_frame: FrameType = fi.frame
call_frame = rt_frame
for i in range(go_up_iframes):
call_frame = call_frame.f_back
return CallerInfo(
rt_fi=fi,
call_frame=call_frame,
)
return None

View File

@ -190,14 +190,11 @@ class Lock:
is_trio_main = (
# TODO: since this is private, @oremanj says
# we should just copy the impl for now..
(is_main_thread := trio._util.is_main_thread())
trio._util.is_main_thread()
and
(async_lib := sniffio.current_async_library()) == 'trio'
)
if (
not is_trio_main
and is_main_thread
):
if not is_trio_main:
log.warning(
f'Current async-lib detected by `sniffio`: {async_lib}\n'
)

View File

@ -23,31 +23,12 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
'''
from __future__ import annotations
import multiprocessing as mp
from signal import (
signal,
SIGUSR1,
)
import traceback
from typing import TYPE_CHECKING
import trio
from tractor import (
_state,
log as logmod,
)
log = logmod.get_logger(__name__)
if TYPE_CHECKING:
from tractor._spawn import ProcessType
from tractor import (
Actor,
ActorNursery,
)
@trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None:
@ -60,15 +41,9 @@ def dump_task_tree() -> None:
recurse_child_tasks=True
)
)
log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor()
log.devx(
f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
log = get_console_log('cancel')
log.pdb(
f'Dumping `stackscope` tree:\n\n'
f'{tree_str}\n'
)
# import logging
@ -81,13 +56,8 @@ def dump_task_tree() -> None:
# ).exception("Error printing task tree")
def signal_handler(
sig: int,
frame: object,
relay_to_subs: bool = True,
) -> None:
def signal_handler(sig: int, frame: object) -> None:
import traceback
try:
trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree)
@ -95,26 +65,6 @@ def signal_handler(
# not in async context -- print a normal traceback
traceback.print_stack()
if not relay_to_subs:
return
an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.devx(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'
)
if isinstance(subproc, trio.Process):
subproc.send_signal(sig)
elif isinstance(subproc, mp.Process):
subproc._send_signal(sig)
def enable_stack_on_sig(
@ -132,6 +82,3 @@ def enable_stack_on_sig(
# NOTE: not the above can be triggered from
# a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
#
# for example if you were looking to trace a `pytest` run
# kill -SIGUSR1 @$(pgrep -f 'pytest')

View File

@ -21,11 +21,6 @@ Log like a forester!
from collections.abc import Mapping
import sys
import logging
from logging import (
LoggerAdapter,
Logger,
StreamHandler,
)
import colorlog # type: ignore
import trio
@ -53,19 +48,20 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
# FYI, ERROR is 40
CUSTOM_LEVELS: dict[str, int] = {
LEVELS: dict[str, int] = {
'TRANSPORT': 5,
'RUNTIME': 15,
'DEVX': 17,
'CANCEL': 18,
'CANCEL': 16,
'PDB': 500,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
# }
STD_PALETTE = {
'CRITICAL': 'red',
'ERROR': 'red',
'PDB': 'white',
'DEVX': 'cyan',
'WARNING': 'yellow',
'INFO': 'green',
'CANCEL': 'yellow',
@ -82,7 +78,7 @@ BOLD_PALETTE = {
# TODO: this isn't showing the correct '{filename}'
# as it did before..
class StackLevelAdapter(LoggerAdapter):
class StackLevelAdapter(logging.LoggerAdapter):
def transport(
self,
@ -90,8 +86,7 @@ class StackLevelAdapter(LoggerAdapter):
) -> None:
'''
IPC transport level msg IO; generally anything below
`._ipc.Channel` and friends.
IPC level msg-ing.
'''
return self.log(5, msg)
@ -107,11 +102,11 @@ class StackLevelAdapter(LoggerAdapter):
msg: str,
) -> None:
'''
Cancellation sequencing, mostly for runtime reporting.
Cancellation logging, mostly for runtime reporting.
'''
return self.log(
level=22,
level=16,
msg=msg,
# stacklevel=4,
)
@ -121,21 +116,11 @@ class StackLevelAdapter(LoggerAdapter):
msg: str,
) -> None:
'''
`pdb`-REPL (debugger) related statuses.
Debugger logging.
'''
return self.log(500, msg)
def devx(
self,
msg: str,
) -> None:
'''
"Developer experience" sub-sys statuses.
'''
return self.log(17, msg)
def log(
self,
level,
@ -151,7 +136,8 @@ class StackLevelAdapter(LoggerAdapter):
if self.isEnabledFor(level):
stacklevel: int = 3
if (
level in CUSTOM_LEVELS.values()
level in LEVELS.values()
# or level in _custom_levels
):
stacklevel: int = 4
@ -198,30 +184,8 @@ class StackLevelAdapter(LoggerAdapter):
)
# TODO IDEAs:
# -[ ] move to `.devx.pformat`?
# -[ ] do per task-name and actor-name color coding
# -[ ] unique color per task-id and actor-uuid
def pformat_task_uid(
id_part: str = 'tail'
):
'''
Return `str`-ified unique for a `trio.Task` via a combo of its
`.name: str` and `id()` truncated output.
'''
task: trio.Task = trio.lowlevel.current_task()
tid: str = str(id(task))
if id_part == 'tail':
tid_part: str = tid[-6:]
else:
tid_part: str = tid[:6]
return f'{task.name}[{tid_part}]'
_conc_name_getters = {
'task': pformat_task_uid,
'task': lambda: trio.lowlevel.current_task().name,
'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6],
@ -229,10 +193,7 @@ _conc_name_getters = {
class ActorContextInfo(Mapping):
'''
Dyanmic lookup for local actor and task names.
'''
"Dyanmic lookup for local actor and task names"
_context_keys = (
'task',
'actor',
@ -263,7 +224,6 @@ def get_logger(
'''Return the package log or a sub-logger for ``name`` if provided.
'''
log: Logger
log = rlog = logging.getLogger(_root_name)
if (
@ -306,7 +266,7 @@ def get_logger(
logger = StackLevelAdapter(log, ActorContextInfo())
# additional levels
for name, val in CUSTOM_LEVELS.items():
for name, val in LEVELS.items():
logging.addLevelName(val, name)
# ensure customs levels exist as methods
@ -318,7 +278,7 @@ def get_logger(
def get_console_log(
level: str | None = None,
**kwargs,
) -> LoggerAdapter:
) -> logging.LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it.
@ -343,7 +303,7 @@ def get_console_log(
None,
)
):
handler = StreamHandler()
handler = logging.StreamHandler()
formatter = colorlog.ColoredFormatter(
LOG_FORMAT,
datefmt=DATE_FORMAT,
@ -363,19 +323,3 @@ def get_loglevel() -> str:
# global module logger for tractor itself
log = get_logger('tractor')
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = CUSTOM_LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False