Compare commits
13 Commits
9639c3b455
...
8842b758d7
Author | SHA1 | Date |
---|---|---|
|
8842b758d7 | |
|
54ee624632 | |
|
e8f2dfc088 | |
|
d2282f4275 | |
|
83ce2275b9 | |
|
9f757ffa63 | |
|
0c6d512ba4 | |
|
fc130d06b8 | |
|
73423ef2b7 | |
|
b1f2a6b394 | |
|
9489a2f84d | |
|
92eaed6fec | |
|
217d54b9d1 |
|
@ -24,14 +24,10 @@ from tractor._testing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX TODO cases:
|
# XXX TODO cases:
|
||||||
# - [ ] peer cancelled itself - so other peers should
|
|
||||||
# get errors reflecting that the peer was itself the .canceller?
|
|
||||||
|
|
||||||
# - [x] WE cancelled the peer and thus should not see any raised
|
# - [x] WE cancelled the peer and thus should not see any raised
|
||||||
# `ContextCancelled` as it should be reaped silently?
|
# `ContextCancelled` as it should be reaped silently?
|
||||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||||
# already covers this case?
|
# already covers this case?
|
||||||
|
|
||||||
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
||||||
# Portal.cancel_actor().
|
# Portal.cancel_actor().
|
||||||
# => all other connected peers should get that cancel requesting peer's
|
# => all other connected peers should get that cancel requesting peer's
|
||||||
|
@ -44,16 +40,6 @@ from tractor._testing import (
|
||||||
# that also spawned a remote task task in that same peer-parent.
|
# that also spawned a remote task task in that same peer-parent.
|
||||||
|
|
||||||
|
|
||||||
# def test_self_cancel():
|
|
||||||
# '''
|
|
||||||
# 2 cases:
|
|
||||||
# - calls `Actor.cancel()` locally in some task
|
|
||||||
# - calls LocalPortal.cancel_actor()` ?
|
|
||||||
|
|
||||||
# '''
|
|
||||||
# ...
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def open_stream_then_sleep_forever(
|
async def open_stream_then_sleep_forever(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
@ -806,7 +792,7 @@ async def basic_echo_server(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str = 'wittle_bruv',
|
peer_name: str = 'wittle_bruv',
|
||||||
|
|
||||||
err_after: int|None = None,
|
err_after_imsg: int|None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -835,8 +821,9 @@ async def basic_echo_server(
|
||||||
await ipc.send(resp)
|
await ipc.send(resp)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
err_after
|
err_after_imsg
|
||||||
and i > err_after
|
and
|
||||||
|
i > err_after_imsg
|
||||||
):
|
):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'Simulated error in `{peer_name}`'
|
f'Simulated error in `{peer_name}`'
|
||||||
|
@ -978,7 +965,8 @@ async def tell_little_bro(
|
||||||
actor_name: str,
|
actor_name: str,
|
||||||
|
|
||||||
caller: str = '',
|
caller: str = '',
|
||||||
err_after: int|None = None,
|
err_after: float|None = None,
|
||||||
|
rng_seed: int = 50,
|
||||||
):
|
):
|
||||||
# contact target actor, do a stream dialog.
|
# contact target actor, do a stream dialog.
|
||||||
async with (
|
async with (
|
||||||
|
@ -989,14 +977,18 @@ async def tell_little_bro(
|
||||||
basic_echo_server,
|
basic_echo_server,
|
||||||
|
|
||||||
# XXX proxy any delayed err condition
|
# XXX proxy any delayed err condition
|
||||||
err_after=err_after,
|
err_after_imsg=(
|
||||||
|
err_after * rng_seed
|
||||||
|
if err_after is not None
|
||||||
|
else None
|
||||||
|
),
|
||||||
) as (sub_ctx, first),
|
) as (sub_ctx, first),
|
||||||
|
|
||||||
sub_ctx.open_stream() as echo_ipc,
|
sub_ctx.open_stream() as echo_ipc,
|
||||||
):
|
):
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
uid: tuple = actor.uid
|
uid: tuple = actor.uid
|
||||||
for i in range(100):
|
for i in range(rng_seed):
|
||||||
msg: tuple = (
|
msg: tuple = (
|
||||||
uid,
|
uid,
|
||||||
i,
|
i,
|
||||||
|
@ -1021,13 +1013,13 @@ async def tell_little_bro(
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'raise_sub_spawn_error_after',
|
'raise_sub_spawn_error_after',
|
||||||
[None, 50],
|
[None, 0.5],
|
||||||
)
|
)
|
||||||
def test_peer_spawns_and_cancels_service_subactor(
|
def test_peer_spawns_and_cancels_service_subactor(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
raise_client_error: str,
|
raise_client_error: str,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
raise_sub_spawn_error_after: int|None,
|
raise_sub_spawn_error_after: float|None,
|
||||||
):
|
):
|
||||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||||
# discovered as part of implementing workspace ctx
|
# discovered as part of implementing workspace ctx
|
||||||
|
@ -1041,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
# and the server's spawned child should cancel and terminate!
|
# and the server's spawned child should cancel and terminate!
|
||||||
peer_name: str = 'little_bro'
|
peer_name: str = 'little_bro'
|
||||||
|
|
||||||
|
|
||||||
def check_inner_rte(rae: RemoteActorError):
|
def check_inner_rte(rae: RemoteActorError):
|
||||||
'''
|
'''
|
||||||
Validate the little_bro's relayed inception!
|
Validate the little_bro's relayed inception!
|
||||||
|
@ -1134,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = await client_ctx.result(hide_tb=False)
|
res = await client_ctx.wait_for_result(hide_tb=False)
|
||||||
|
|
||||||
# in remote (relayed inception) error
|
# in remote (relayed inception) error
|
||||||
# case, we should error on the line above!
|
# case, we should error on the line above!
|
||||||
if raise_sub_spawn_error_after:
|
if raise_sub_spawn_error_after:
|
||||||
|
@ -1146,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
assert isinstance(res, ContextCancelled)
|
assert isinstance(res, ContextCancelled)
|
||||||
assert client_ctx.cancel_acked
|
assert client_ctx.cancel_acked
|
||||||
assert res.canceller == root.uid
|
assert res.canceller == root.uid
|
||||||
|
assert not raise_sub_spawn_error_after
|
||||||
|
|
||||||
|
# cancelling the spawner sub should
|
||||||
|
# transitively cancel it's sub, the little
|
||||||
|
# bruv.
|
||||||
|
print('root cancelling server/client sub-actors')
|
||||||
|
await spawn_ctx.cancel()
|
||||||
|
async with tractor.find_actor(
|
||||||
|
name=peer_name,
|
||||||
|
) as sub:
|
||||||
|
assert not sub
|
||||||
|
|
||||||
|
# XXX, only for tracing
|
||||||
|
# except BaseException as _berr:
|
||||||
|
# berr = _berr
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
# raise berr
|
||||||
|
|
||||||
except RemoteActorError as rae:
|
except RemoteActorError as rae:
|
||||||
_err = rae
|
_err = rae
|
||||||
|
@ -1174,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
raise
|
raise
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
|
|
||||||
else:
|
|
||||||
assert not raise_sub_spawn_error_after
|
|
||||||
|
|
||||||
# cancelling the spawner sub should
|
|
||||||
# transitively cancel it's sub, the little
|
|
||||||
# bruv.
|
|
||||||
print('root cancelling server/client sub-actors')
|
|
||||||
await spawn_ctx.cancel()
|
|
||||||
async with tractor.find_actor(
|
|
||||||
name=peer_name,
|
|
||||||
) as sub:
|
|
||||||
assert not sub
|
|
||||||
|
|
||||||
|
# await tractor.pause()
|
||||||
# await server.cancel_actor()
|
# await server.cancel_actor()
|
||||||
|
|
||||||
except RemoteActorError as rae:
|
except RemoteActorError as rae:
|
||||||
|
@ -1199,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
|
|
||||||
# since we called `.cancel_actor()`, `.cancel_ack`
|
# since we called `.cancel_actor()`, `.cancel_ack`
|
||||||
# will not be set on the ctx bc `ctx.cancel()` was not
|
# will not be set on the ctx bc `ctx.cancel()` was not
|
||||||
# called directly fot this confext.
|
# called directly for this confext.
|
||||||
except ContextCancelled as ctxc:
|
except ContextCancelled as ctxc:
|
||||||
_ctxc = ctxc
|
_ctxc = ctxc
|
||||||
print(
|
print(
|
||||||
|
@ -1239,12 +1237,19 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
|
|
||||||
# assert spawn_ctx.cancelled_caught
|
# assert spawn_ctx.cancelled_caught
|
||||||
|
|
||||||
|
async def _main():
|
||||||
|
with trio.fail_after(
|
||||||
|
3 if not debug_mode
|
||||||
|
else 999
|
||||||
|
):
|
||||||
|
await main()
|
||||||
|
|
||||||
if raise_sub_spawn_error_after:
|
if raise_sub_spawn_error_after:
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(_main)
|
||||||
|
|
||||||
rae: RemoteActorError = excinfo.value
|
rae: RemoteActorError = excinfo.value
|
||||||
check_inner_rte(rae)
|
check_inner_rte(rae)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
trio.run(main)
|
trio.run(_main)
|
||||||
|
|
|
@ -0,0 +1,177 @@
|
||||||
|
'''
|
||||||
|
`tractor.log`-wrapping unit tests.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import importlib
|
||||||
|
from pathlib import Path
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
from types import ModuleType
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
def test_root_pkg_not_duplicated_in_logger_name():
|
||||||
|
'''
|
||||||
|
When both `pkg_name` and `name` are passed and they have
|
||||||
|
a common `<root_name>.< >` prefix, ensure that it is not
|
||||||
|
duplicated in the child's `StackLevelAdapter.name: str`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
project_name: str = 'pylib'
|
||||||
|
pkg_path: str = 'pylib.subpkg.mod'
|
||||||
|
|
||||||
|
proj_log = tractor.log.get_logger(
|
||||||
|
pkg_name=project_name,
|
||||||
|
mk_sublog=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
sublog = tractor.log.get_logger(
|
||||||
|
pkg_name=project_name,
|
||||||
|
name=pkg_path,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert proj_log is not sublog
|
||||||
|
assert sublog.name.count(proj_log.name) == 1
|
||||||
|
assert 'mod' not in sublog.name
|
||||||
|
|
||||||
|
|
||||||
|
# ?TODO, move this into internal libs?
|
||||||
|
# -[ ] we already use it in `modden.config._pymod` as well
|
||||||
|
def load_module_from_path(
|
||||||
|
path: Path,
|
||||||
|
module_name: str|None = None,
|
||||||
|
) -> ModuleType:
|
||||||
|
'''
|
||||||
|
Taken from SO,
|
||||||
|
https://stackoverflow.com/a/67208147
|
||||||
|
|
||||||
|
which is based on stdlib docs,
|
||||||
|
https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly
|
||||||
|
|
||||||
|
'''
|
||||||
|
module_name = module_name or path.stem
|
||||||
|
spec = importlib.util.spec_from_file_location(
|
||||||
|
module_name,
|
||||||
|
str(path),
|
||||||
|
)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
sys.modules[module_name] = module
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
return module
|
||||||
|
|
||||||
|
|
||||||
|
def test_implicit_mod_name_applied_for_child(
|
||||||
|
testdir: pytest.Pytester,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that when `.log.get_logger(pkg_name='pylib')` is called
|
||||||
|
from a given sub-mod from within the `pylib` pkg-path, we
|
||||||
|
implicitly set the equiv of `name=__name__` from the caller's
|
||||||
|
module.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# tractor.log.get_console_log(level=loglevel)
|
||||||
|
proj_name: str = 'snakelib'
|
||||||
|
mod_code: str = (
|
||||||
|
f'import tractor\n'
|
||||||
|
f'\n'
|
||||||
|
f'log = tractor.log.get_logger(pkg_name="{proj_name}")\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# create a sub-module for each pkg layer
|
||||||
|
_lib = testdir.mkpydir(proj_name)
|
||||||
|
pkg: Path = Path(_lib)
|
||||||
|
subpkg: Path = pkg / 'subpkg'
|
||||||
|
subpkg.mkdir()
|
||||||
|
|
||||||
|
pkgmod: Path = subpkg / "__init__.py"
|
||||||
|
pkgmod.touch()
|
||||||
|
|
||||||
|
_submod: Path = testdir.makepyfile(
|
||||||
|
_mod=mod_code,
|
||||||
|
)
|
||||||
|
|
||||||
|
pkg_mod = pkg / 'mod.py'
|
||||||
|
pkg_subpkg_submod = subpkg / 'submod.py'
|
||||||
|
shutil.copyfile(
|
||||||
|
_submod,
|
||||||
|
pkg_mod,
|
||||||
|
)
|
||||||
|
shutil.copyfile(
|
||||||
|
_submod,
|
||||||
|
pkg_subpkg_submod,
|
||||||
|
)
|
||||||
|
testdir.chdir()
|
||||||
|
|
||||||
|
# XXX NOTE, once the "top level" pkg mod has been
|
||||||
|
# imported, we can then use `import` syntax to
|
||||||
|
# import it's sub-pkgs and modules.
|
||||||
|
pkgmod = load_module_from_path(
|
||||||
|
Path(pkg / '__init__.py'),
|
||||||
|
module_name=proj_name,
|
||||||
|
)
|
||||||
|
pkg_root_log = tractor.log.get_logger(
|
||||||
|
pkg_name=proj_name,
|
||||||
|
mk_sublog=False,
|
||||||
|
)
|
||||||
|
assert pkg_root_log.name == proj_name
|
||||||
|
assert not pkg_root_log.logger.getChildren()
|
||||||
|
|
||||||
|
from snakelib import mod
|
||||||
|
assert mod.log.name == proj_name
|
||||||
|
|
||||||
|
from snakelib.subpkg import submod
|
||||||
|
assert (
|
||||||
|
submod.log.name
|
||||||
|
==
|
||||||
|
submod.__package__ # ?TODO, use this in `.get_logger()` instead?
|
||||||
|
==
|
||||||
|
f'{proj_name}.subpkg'
|
||||||
|
)
|
||||||
|
|
||||||
|
sub_logs = pkg_root_log.logger.getChildren()
|
||||||
|
assert len(sub_logs) == 1 # only one nested sub-pkg module
|
||||||
|
assert submod.log.logger in sub_logs
|
||||||
|
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
|
||||||
|
# TODO, moar tests against existing feats:
|
||||||
|
# ------ - ------
|
||||||
|
# - [ ] color settings?
|
||||||
|
# - [ ] header contents like,
|
||||||
|
# - actor + thread + task names from various conc-primitives,
|
||||||
|
# - [ ] `StackLevelAdapter` extensions,
|
||||||
|
# - our custom levels/methods: `transport|runtime|cance|pdb|devx`
|
||||||
|
# - [ ] custom-headers support?
|
||||||
|
#
|
||||||
|
|
||||||
|
# TODO, test driven dev of new-ideas/long-wanted feats,
|
||||||
|
# ------ - ------
|
||||||
|
# - [ ] https://github.com/goodboy/tractor/issues/244
|
||||||
|
# - [ ] @catern mentioned using a sync / deterministic sys
|
||||||
|
# and in particular `svlogd`?
|
||||||
|
# |_ https://smarden.org/runit/svlogd.8
|
||||||
|
|
||||||
|
# - [ ] using adapter vs. filters?
|
||||||
|
# - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
|
||||||
|
|
||||||
|
# - [ ] `.at_least_level()` optimization which short circuits wtv
|
||||||
|
# `logging` is doing behind the scenes when the level filters
|
||||||
|
# the emission..?
|
||||||
|
|
||||||
|
# - [ ] use of `.log.get_console_log()` in subactors and the
|
||||||
|
# subtleties of ensuring it actually emits from a subproc.
|
||||||
|
|
||||||
|
# - [ ] this idea of activating per-subsys emissions with some
|
||||||
|
# kind of `.name` filter passed to the runtime or maybe configured
|
||||||
|
# via the root `StackLevelAdapter`?
|
||||||
|
|
||||||
|
# - [ ] use of `logging.dict.dictConfig()` to simplify the impl
|
||||||
|
# of any of ^^ ??
|
||||||
|
# - https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
|
||||||
|
# - https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
|
||||||
|
# - https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig
|
|
@ -0,0 +1,239 @@
|
||||||
|
'''
|
||||||
|
Define the details of inter-actor "out-of-band" (OoB) cancel
|
||||||
|
semantics, that is how cancellation works when a cancel request comes
|
||||||
|
from the different concurrency (primitive's) "layer" then where the
|
||||||
|
eventual `trio.Task` actually raises a signal.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from functools import partial
|
||||||
|
# from contextlib import asynccontextmanager as acm
|
||||||
|
# import itertools
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
from tractor import ( # typing
|
||||||
|
ActorNursery,
|
||||||
|
Portal,
|
||||||
|
Context,
|
||||||
|
# ContextCancelled,
|
||||||
|
# RemoteActorError,
|
||||||
|
)
|
||||||
|
# from tractor._testing import (
|
||||||
|
# tractor_test,
|
||||||
|
# expect_ctxc,
|
||||||
|
# )
|
||||||
|
|
||||||
|
# XXX TODO cases:
|
||||||
|
# - [ ] peer cancelled itself - so other peers should
|
||||||
|
# get errors reflecting that the peer was itself the .canceller?
|
||||||
|
|
||||||
|
# def test_self_cancel():
|
||||||
|
# '''
|
||||||
|
# 2 cases:
|
||||||
|
# - calls `Actor.cancel()` locally in some task
|
||||||
|
# - calls LocalPortal.cancel_actor()` ?
|
||||||
|
#
|
||||||
|
# things to ensure!
|
||||||
|
# -[ ] the ctxc raised in a child should ideally show the tb of the
|
||||||
|
# underlying `Cancelled` checkpoint, i.e.
|
||||||
|
# `raise scope_error from ctxc`?
|
||||||
|
#
|
||||||
|
# -[ ] a self-cancelled context, if not allowed to block on
|
||||||
|
# `ctx.result()` at some point will hang since the `ctx._scope`
|
||||||
|
# is never `.cancel_called`; cases for this include,
|
||||||
|
# - an `open_ctx()` which never starteds before being OoB actor
|
||||||
|
# cancelled.
|
||||||
|
# |_ parent task will be blocked in `.open_context()` for the
|
||||||
|
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
|
||||||
|
# will never have been signalled..
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# ...
|
||||||
|
|
||||||
|
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
|
||||||
|
# but with the `Lock.acquire()` from a `@context` to ensure the
|
||||||
|
# implicit ignore-case-non-unmasking.
|
||||||
|
#
|
||||||
|
# @tractor.context
|
||||||
|
# async def acquire_actor_global_lock(
|
||||||
|
# ctx: tractor.Context,
|
||||||
|
# ignore_special_cases: bool,
|
||||||
|
# ):
|
||||||
|
|
||||||
|
# async with maybe_unmask_excs(
|
||||||
|
# ignore_special_cases=ignore_special_cases,
|
||||||
|
# ):
|
||||||
|
# await ctx.started('locked')
|
||||||
|
|
||||||
|
# # block til cancelled
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def sleep_forever(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
# ignore_special_cases: bool,
|
||||||
|
do_started: bool,
|
||||||
|
):
|
||||||
|
|
||||||
|
# async with maybe_unmask_excs(
|
||||||
|
# ignore_special_cases=ignore_special_cases,
|
||||||
|
# ):
|
||||||
|
# await ctx.started('locked')
|
||||||
|
if do_started:
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
# block til cancelled
|
||||||
|
print('sleepin on child-side..')
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'cancel_ctx',
|
||||||
|
[True, False],
|
||||||
|
)
|
||||||
|
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
cancel_ctx: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
The most "basic" out-of-band-task self-cancellation case where
|
||||||
|
`Portal.open_context()` is entered in a bg task and the
|
||||||
|
parent-task (of the containing nursery) calls `Context.cancel()`
|
||||||
|
without the child knowing; the `Context._scope` should be
|
||||||
|
`.cancel_called` when the IPC ctx's child-side relays
|
||||||
|
a `ContextCancelled` with a `.canceller` set to the parent
|
||||||
|
actor('s task).
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
with trio.fail_after(
|
||||||
|
2 if not debug_mode else 999,
|
||||||
|
):
|
||||||
|
an: ActorNursery
|
||||||
|
async with (
|
||||||
|
tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel='devx',
|
||||||
|
enable_stack_on_sig=True,
|
||||||
|
) as an,
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
ptl: Portal = await an.start_actor(
|
||||||
|
'sub',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _open_ctx_async(
|
||||||
|
do_started: bool = True,
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
# do we expect to never enter the
|
||||||
|
# `.open_context()` below.
|
||||||
|
if not do_started:
|
||||||
|
task_status.started()
|
||||||
|
|
||||||
|
async with ptl.open_context(
|
||||||
|
sleep_forever,
|
||||||
|
do_started=do_started,
|
||||||
|
) as (ctx, first):
|
||||||
|
task_status.started(ctx)
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
# XXX, this is the key OoB part!
|
||||||
|
#
|
||||||
|
# - start the `.open_context()` in a bg task which
|
||||||
|
# blocks inside the embedded scope-body,
|
||||||
|
#
|
||||||
|
# - when we call `Context.cancel()` it **is
|
||||||
|
# not** from the same task which eventually runs
|
||||||
|
# `.__aexit__()`,
|
||||||
|
#
|
||||||
|
# - since the bg "opener" task will be in
|
||||||
|
# a `trio.sleep_forever()`, it must be interrupted
|
||||||
|
# by the `ContextCancelled` delivered from the
|
||||||
|
# child-side; `Context._scope: CancelScope` MUST
|
||||||
|
# be `.cancel_called`!
|
||||||
|
#
|
||||||
|
print('ASYNC opening IPC context in subtask..')
|
||||||
|
maybe_ctx: Context|None = await tn.start(partial(
|
||||||
|
_open_ctx_async,
|
||||||
|
))
|
||||||
|
|
||||||
|
if (
|
||||||
|
maybe_ctx
|
||||||
|
and
|
||||||
|
cancel_ctx
|
||||||
|
):
|
||||||
|
print('cancelling first IPC ctx!')
|
||||||
|
await maybe_ctx.cancel()
|
||||||
|
|
||||||
|
# XXX, note that despite `maybe_context.cancel()`
|
||||||
|
# being called above, it's the parent (bg) task
|
||||||
|
# which was originally never interrupted in
|
||||||
|
# the `ctx._scope` body due to missing case logic in
|
||||||
|
# `ctx._maybe_cancel_and_set_remote_error()`.
|
||||||
|
#
|
||||||
|
# It didn't matter that the subactor process was
|
||||||
|
# already terminated and reaped, nothing was
|
||||||
|
# cancelling the ctx-parent task's scope!
|
||||||
|
#
|
||||||
|
print('cancelling subactor!')
|
||||||
|
await ptl.cancel_actor()
|
||||||
|
|
||||||
|
if maybe_ctx:
|
||||||
|
try:
|
||||||
|
await maybe_ctx.wait_for_result()
|
||||||
|
except tractor.ContextCancelled as ctxc:
|
||||||
|
assert not cancel_ctx
|
||||||
|
assert (
|
||||||
|
ctxc.canceller
|
||||||
|
==
|
||||||
|
tractor.current_actor().aid.uid
|
||||||
|
)
|
||||||
|
# don't re-raise since it'll trigger
|
||||||
|
# an EG from the above tn.
|
||||||
|
|
||||||
|
if cancel_ctx:
|
||||||
|
# graceful self-cancel
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# ctx parent task should see OoB ctxc due to
|
||||||
|
# `ptl.cancel_actor()`.
|
||||||
|
with pytest.raises(tractor.ContextCancelled) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert 'root' in excinfo.value.canceller[0]
|
||||||
|
|
||||||
|
|
||||||
|
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
|
||||||
|
# debug_mode: bool,
|
||||||
|
# loglevel: str,
|
||||||
|
# ):
|
||||||
|
# '''
|
||||||
|
# Demos OoB cancellation from the perspective of a ctx opened with
|
||||||
|
# a child subactor where the parent cancels the child at the "actor
|
||||||
|
# layer" using `Portal.cancel_actor()` and thus the
|
||||||
|
# `ContextCancelled.canceller` received by the ctx's parent-side
|
||||||
|
# task will appear to be a "self cancellation" even though that
|
||||||
|
# specific task itself was not cancelled and thus
|
||||||
|
# `Context.cancel_called ==False`.
|
||||||
|
# '''
|
||||||
|
# TODO, do we have an existing implied ctx
|
||||||
|
# cancel test like this?
|
||||||
|
# with trio.move_on_after(0.5):# as cs:
|
||||||
|
# await _open_ctx_async(
|
||||||
|
# do_started=False,
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
|
# in-line ctx scope should definitely raise
|
||||||
|
# a ctxc with `.canceller = 'root'`
|
||||||
|
# async with ptl.open_context(
|
||||||
|
# sleep_forever,
|
||||||
|
# do_started=True,
|
||||||
|
# ) as pair:
|
||||||
|
|
|
@ -442,25 +442,25 @@ class Context:
|
||||||
'''
|
'''
|
||||||
Records whether cancellation has been requested for this context
|
Records whether cancellation has been requested for this context
|
||||||
by a call to `.cancel()` either due to,
|
by a call to `.cancel()` either due to,
|
||||||
- either an explicit call by some local task,
|
- an explicit call by some local task,
|
||||||
- or an implicit call due to an error caught inside
|
- or an implicit call due to an error caught inside
|
||||||
the ``Portal.open_context()`` block.
|
the `Portal.open_context()` block.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._cancel_called
|
return self._cancel_called
|
||||||
|
|
||||||
@cancel_called.setter
|
# XXX, to debug who frickin sets it..
|
||||||
def cancel_called(self, val: bool) -> None:
|
# @cancel_called.setter
|
||||||
'''
|
# def cancel_called(self, val: bool) -> None:
|
||||||
Set the self-cancelled request `bool` value.
|
# '''
|
||||||
|
# Set the self-cancelled request `bool` value.
|
||||||
|
|
||||||
'''
|
# '''
|
||||||
# to debug who frickin sets it..
|
# if val:
|
||||||
# if val:
|
# from .devx import pause_from_sync
|
||||||
# from .devx import pause_from_sync
|
# pause_from_sync()
|
||||||
# pause_from_sync()
|
|
||||||
|
|
||||||
self._cancel_called = val
|
# self._cancel_called = val
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def canceller(self) -> tuple[str, str]|None:
|
def canceller(self) -> tuple[str, str]|None:
|
||||||
|
@ -635,6 +635,71 @@ class Context:
|
||||||
'''
|
'''
|
||||||
await self.chan.send(Stop(cid=self.cid))
|
await self.chan.send(Stop(cid=self.cid))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def parent_task(self) -> trio.Task:
|
||||||
|
'''
|
||||||
|
This IPC context's "owning task" which is a `trio.Task`
|
||||||
|
on one of the "sides" of the IPC.
|
||||||
|
|
||||||
|
Note that the "parent_" prefix here refers to the local
|
||||||
|
`trio` task tree using the same interface as
|
||||||
|
`trio.Nursery.parent_task` whereas for IPC contexts,
|
||||||
|
a different cross-actor task hierarchy exists:
|
||||||
|
|
||||||
|
- a "parent"-side which originally entered
|
||||||
|
`Portal.open_context()`,
|
||||||
|
|
||||||
|
- the "child"-side which was spawned and scheduled to invoke
|
||||||
|
a function decorated with `@tractor.context`.
|
||||||
|
|
||||||
|
This task is thus a handle to mem-domain-distinct/per-process
|
||||||
|
`Nursery.parent_task` depending on in which of the above
|
||||||
|
"sides" this context exists.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._task
|
||||||
|
|
||||||
|
def _is_blocked_on_rx_chan(self) -> bool:
|
||||||
|
'''
|
||||||
|
Predicate to indicate whether the owner `._task: trio.Task` is
|
||||||
|
currently blocked (by `.receive()`-ing) on its underlying RPC
|
||||||
|
feeder `._rx_chan`.
|
||||||
|
|
||||||
|
This knowledge is highly useful when handling so called
|
||||||
|
"out-of-band" (OoB) cancellation conditions where a peer
|
||||||
|
actor's task transmitted some remote error/cancel-msg and we
|
||||||
|
must know whether to signal-via-cancel currently executing
|
||||||
|
"user-code" (user defined code embedded in `ctx._scope`) or
|
||||||
|
simply to forward the IPC-msg-as-error **without calling**
|
||||||
|
`._scope.cancel()`.
|
||||||
|
|
||||||
|
In the latter case it is presumed that if the owner task is
|
||||||
|
blocking for the next IPC msg, it will eventually receive,
|
||||||
|
process and raise the equivalent local error **without**
|
||||||
|
requiring `._scope.cancel()` to be explicitly called by the
|
||||||
|
*delivering OoB RPC-task* (via `_deliver_msg()`).
|
||||||
|
|
||||||
|
'''
|
||||||
|
# NOTE, see the mem-chan meth-impls for *why* this
|
||||||
|
# logic works,
|
||||||
|
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
|
||||||
|
#
|
||||||
|
# XXX realize that this is NOT an
|
||||||
|
# official/will-be-loudly-deprecated API:
|
||||||
|
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
|
||||||
|
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
|
||||||
|
#
|
||||||
|
# orig repo intro in the mem-chan change over patch:
|
||||||
|
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
|
||||||
|
# |_https://github.com/python-trio/trio/pull/616
|
||||||
|
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
|
||||||
|
#
|
||||||
|
return (
|
||||||
|
self._task.custom_sleep_data
|
||||||
|
is
|
||||||
|
self._rx_chan
|
||||||
|
)
|
||||||
|
|
||||||
def _maybe_cancel_and_set_remote_error(
|
def _maybe_cancel_and_set_remote_error(
|
||||||
self,
|
self,
|
||||||
error: BaseException,
|
error: BaseException,
|
||||||
|
@ -787,13 +852,27 @@ class Context:
|
||||||
if self._canceller is None:
|
if self._canceller is None:
|
||||||
log.error('Ctx has no canceller set!?')
|
log.error('Ctx has no canceller set!?')
|
||||||
|
|
||||||
|
cs: trio.CancelScope = self._scope
|
||||||
|
|
||||||
|
# ?TODO? see comment @ .start_remote_task()`
|
||||||
|
#
|
||||||
|
# if not cs:
|
||||||
|
# from .devx import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
# raise RuntimeError(
|
||||||
|
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
|
||||||
|
# f'{self}\n'
|
||||||
|
# f'\n'
|
||||||
|
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
|
||||||
|
# )
|
||||||
|
|
||||||
# Cancel the local `._scope`, catch that
|
# Cancel the local `._scope`, catch that
|
||||||
# `._scope.cancelled_caught` and re-raise any remote error
|
# `._scope.cancelled_caught` and re-raise any remote error
|
||||||
# once exiting (or manually calling `.wait_for_result()`) the
|
# once exiting (or manually calling `.wait_for_result()`) the
|
||||||
# `.open_context()` block.
|
# `.open_context()` block.
|
||||||
cs: trio.CancelScope = self._scope
|
|
||||||
if (
|
if (
|
||||||
cs
|
cs
|
||||||
|
and not cs.cancel_called
|
||||||
|
|
||||||
# XXX this is an expected cancel request response
|
# XXX this is an expected cancel request response
|
||||||
# message and we **don't need to raise it** in the
|
# message and we **don't need to raise it** in the
|
||||||
|
@ -802,8 +881,7 @@ class Context:
|
||||||
# if `._cancel_called` then `.cancel_acked and .cancel_called`
|
# if `._cancel_called` then `.cancel_acked and .cancel_called`
|
||||||
# always should be set.
|
# always should be set.
|
||||||
and not self._is_self_cancelled()
|
and not self._is_self_cancelled()
|
||||||
and not cs.cancel_called
|
# and not cs.cancelled_caught
|
||||||
and not cs.cancelled_caught
|
|
||||||
):
|
):
|
||||||
if (
|
if (
|
||||||
msgerr
|
msgerr
|
||||||
|
@ -814,7 +892,7 @@ class Context:
|
||||||
not self._cancel_on_msgerr
|
not self._cancel_on_msgerr
|
||||||
):
|
):
|
||||||
message: str = (
|
message: str = (
|
||||||
'NOT Cancelling `Context._scope` since,\n'
|
f'NOT Cancelling `Context._scope` since,\n'
|
||||||
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
||||||
f'AND we got a msg-type-error!\n'
|
f'AND we got a msg-type-error!\n'
|
||||||
f'{error}\n'
|
f'{error}\n'
|
||||||
|
@ -824,13 +902,43 @@ class Context:
|
||||||
# `trio.Cancelled` subtype here ;)
|
# `trio.Cancelled` subtype here ;)
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||||
# from .devx import pause_from_sync
|
cs.cancel()
|
||||||
# pause_from_sync()
|
|
||||||
self._scope.cancel()
|
# TODO, explicit condition for OoB (self-)cancellation?
|
||||||
else:
|
# - we called `Portal.cancel_actor()` from this actor
|
||||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
# and the peer ctx task delivered ctxc due to it.
|
||||||
|
# - currently `self._is_self_cancelled()` will be true
|
||||||
|
# since the ctxc.canceller check will match us even though it
|
||||||
|
# wasn't from this ctx specifically!
|
||||||
|
elif (
|
||||||
|
cs
|
||||||
|
and self._is_self_cancelled()
|
||||||
|
and not cs.cancel_called
|
||||||
|
):
|
||||||
|
message: str = (
|
||||||
|
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
|
||||||
|
'\n'
|
||||||
|
)
|
||||||
# from .devx import mk_pdb
|
# from .devx import mk_pdb
|
||||||
# mk_pdb().set_trace()
|
# mk_pdb().set_trace()
|
||||||
|
# TODO XXX, required to fix timeout failure in
|
||||||
|
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
|
||||||
|
#
|
||||||
|
|
||||||
|
# XXX NOTE XXX, this is SUPER SUBTLE!
|
||||||
|
# we only want to cancel our embedded `._scope`
|
||||||
|
# if the ctx's current/using task is NOT blocked
|
||||||
|
# on `._rx_chan.receive()` and on some other
|
||||||
|
# `trio`-checkpoint since in the former case
|
||||||
|
# any `._remote_error` will be relayed through
|
||||||
|
# the rx-chan and appropriately raised by the owning
|
||||||
|
# `._task` directly. IF the owner task is however
|
||||||
|
# blocking elsewhere we need to interrupt it **now**.
|
||||||
|
if not self._is_blocked_on_rx_chan():
|
||||||
|
cs.cancel()
|
||||||
|
else:
|
||||||
|
# rx_stats = self._rx_chan.statistics()
|
||||||
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
|
|
||||||
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
|
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
|
||||||
if (
|
if (
|
||||||
|
@ -854,6 +962,7 @@ class Context:
|
||||||
+
|
+
|
||||||
cs_fmt
|
cs_fmt
|
||||||
)
|
)
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
message
|
message
|
||||||
+
|
+
|
||||||
|
@ -946,8 +1055,9 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
side: str = self.side
|
side: str = self.side
|
||||||
# XXX for debug via the `@.setter`
|
self._cancel_called = True
|
||||||
self.cancel_called = True
|
# ^ XXX for debug via the `@.setter`
|
||||||
|
# self.cancel_called = True
|
||||||
|
|
||||||
header: str = (
|
header: str = (
|
||||||
f'Cancelling ctx from {side!r}-side\n'
|
f'Cancelling ctx from {side!r}-side\n'
|
||||||
|
@ -2011,6 +2121,9 @@ async def open_context_from_portal(
|
||||||
f'|_{portal.actor}\n'
|
f'|_{portal.actor}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ?TODO? could we move this to inside the `tn` block?
|
||||||
|
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
|
||||||
|
# -> would allow a `if not ._scope: => raise RTE` ?
|
||||||
ctx: Context = await portal.actor.start_remote_task(
|
ctx: Context = await portal.actor.start_remote_task(
|
||||||
portal.channel,
|
portal.channel,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
|
@ -2037,6 +2150,7 @@ async def open_context_from_portal(
|
||||||
scope_err: BaseException|None = None
|
scope_err: BaseException|None = None
|
||||||
ctxc_from_child: ContextCancelled|None = None
|
ctxc_from_child: ContextCancelled|None = None
|
||||||
try:
|
try:
|
||||||
|
# from .devx import pause
|
||||||
async with (
|
async with (
|
||||||
collapse_eg(),
|
collapse_eg(),
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
|
@ -2059,6 +2173,10 @@ async def open_context_from_portal(
|
||||||
# the dialog, the `Error` msg should be raised from the `msg`
|
# the dialog, the `Error` msg should be raised from the `msg`
|
||||||
# handling block below.
|
# handling block below.
|
||||||
try:
|
try:
|
||||||
|
log.runtime(
|
||||||
|
f'IPC ctx parent waiting on Started msg..\n'
|
||||||
|
f'ctx.cid: {ctx.cid!r}\n'
|
||||||
|
)
|
||||||
started_msg, first = await ctx._pld_rx.recv_msg(
|
started_msg, first = await ctx._pld_rx.recv_msg(
|
||||||
ipc=ctx,
|
ipc=ctx,
|
||||||
expect_msg=Started,
|
expect_msg=Started,
|
||||||
|
@ -2067,16 +2185,16 @@ async def open_context_from_portal(
|
||||||
)
|
)
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
ctx_cs: trio.CancelScope = ctx._scope
|
ctx_cs: trio.CancelScope = ctx._scope
|
||||||
|
log.cancel(
|
||||||
|
f'IPC ctx was cancelled during "child" task sync due to\n\n'
|
||||||
|
f'.cid: {ctx.cid!r}\n'
|
||||||
|
f'.maybe_error: {ctx.maybe_error!r}\n'
|
||||||
|
)
|
||||||
|
# await pause(shield=True)
|
||||||
|
|
||||||
if not ctx_cs.cancel_called:
|
if not ctx_cs.cancel_called:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# from .devx import pause
|
|
||||||
# await pause(shield=True)
|
|
||||||
|
|
||||||
log.cancel(
|
|
||||||
'IPC ctx was cancelled during "child" task sync due to\n\n'
|
|
||||||
f'{ctx.maybe_error}\n'
|
|
||||||
)
|
|
||||||
# OW if the ctx's scope was cancelled manually,
|
# OW if the ctx's scope was cancelled manually,
|
||||||
# likely the `Context` was cancelled via a call to
|
# likely the `Context` was cancelled via a call to
|
||||||
# `._maybe_cancel_and_set_remote_error()` so ensure
|
# `._maybe_cancel_and_set_remote_error()` so ensure
|
||||||
|
@ -2272,13 +2390,16 @@ async def open_context_from_portal(
|
||||||
match scope_err:
|
match scope_err:
|
||||||
case trio.Cancelled():
|
case trio.Cancelled():
|
||||||
logmeth = log.cancel
|
logmeth = log.cancel
|
||||||
|
cause: str = 'cancelled'
|
||||||
|
|
||||||
# XXX explicitly report on any non-graceful-taskc cases
|
# XXX explicitly report on any non-graceful-taskc cases
|
||||||
case _:
|
case _:
|
||||||
|
cause: str = 'errored'
|
||||||
logmeth = log.exception
|
logmeth = log.exception
|
||||||
|
|
||||||
logmeth(
|
logmeth(
|
||||||
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
|
f'ctx {ctx.side!r}-side {cause!r} with,\n'
|
||||||
|
f'{ctx.repr_outcome()!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
if debug_mode():
|
if debug_mode():
|
||||||
|
@ -2303,6 +2424,7 @@ async def open_context_from_portal(
|
||||||
# told us it's cancelled ;p
|
# told us it's cancelled ;p
|
||||||
if ctxc_from_child is None:
|
if ctxc_from_child is None:
|
||||||
try:
|
try:
|
||||||
|
# await pause(shield=True)
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
@ -2459,8 +2581,10 @@ async def open_context_from_portal(
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context cancelled by local {ctx.side!r}-side task\n'
|
f'Context cancelled by local {ctx.side!r}-side task\n'
|
||||||
f'c)>\n'
|
f'c)>\n'
|
||||||
f' |_{ctx._task}\n\n'
|
f' |_{ctx.parent_task}\n'
|
||||||
f'{repr(scope_err)}\n'
|
f' .cid={ctx.cid!r}\n'
|
||||||
|
f'\n'
|
||||||
|
f'{scope_err!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: should we add a `._cancel_req_received`
|
# TODO: should we add a `._cancel_req_received`
|
||||||
|
|
|
@ -446,12 +446,12 @@ class ActorNursery:
|
||||||
@acm
|
@acm
|
||||||
async def _open_and_supervise_one_cancels_all_nursery(
|
async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
tb_hide: bool = False,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
|
|
||||||
# normally don't need to show user by default
|
# normally don't need to show user by default
|
||||||
__tracebackhide__: bool = tb_hide
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
outer_err: BaseException|None = None
|
outer_err: BaseException|None = None
|
||||||
inner_err: BaseException|None = None
|
inner_err: BaseException|None = None
|
||||||
|
|
198
tractor/log.py
198
tractor/log.py
|
@ -14,11 +14,22 @@
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
Log like a forester!
|
An enhanced logging subsys.
|
||||||
|
|
||||||
"""
|
An extended logging layer using (for now) the stdlib's `logging`
|
||||||
|
+ `colorlog` which embeds concurrency-primitive/runtime info into
|
||||||
|
records (headers) to help you better grok your distributed systems
|
||||||
|
built on `tractor`.
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
from collections.abc import Mapping
|
from collections.abc import Mapping
|
||||||
|
from inspect import (
|
||||||
|
FrameInfo,
|
||||||
|
getmodule,
|
||||||
|
stack,
|
||||||
|
)
|
||||||
import sys
|
import sys
|
||||||
import logging
|
import logging
|
||||||
from logging import (
|
from logging import (
|
||||||
|
@ -26,8 +37,10 @@ from logging import (
|
||||||
Logger,
|
Logger,
|
||||||
StreamHandler,
|
StreamHandler,
|
||||||
)
|
)
|
||||||
import colorlog # type: ignore
|
from types import ModuleType
|
||||||
|
import warnings
|
||||||
|
|
||||||
|
import colorlog # type: ignore
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
|
@ -39,7 +52,7 @@ _default_loglevel: str = 'ERROR'
|
||||||
# Super sexy formatting thanks to ``colorlog``.
|
# Super sexy formatting thanks to ``colorlog``.
|
||||||
# (NOTE: we use the '{' format style)
|
# (NOTE: we use the '{' format style)
|
||||||
# Here, `thin_white` is just the layperson's gray.
|
# Here, `thin_white` is just the layperson's gray.
|
||||||
LOG_FORMAT = (
|
LOG_FORMAT: str = (
|
||||||
# "{bold_white}{log_color}{asctime}{reset}"
|
# "{bold_white}{log_color}{asctime}{reset}"
|
||||||
"{log_color}{asctime}{reset}"
|
"{log_color}{asctime}{reset}"
|
||||||
" {bold_white}{thin_white}({reset}"
|
" {bold_white}{thin_white}({reset}"
|
||||||
|
@ -51,7 +64,7 @@ LOG_FORMAT = (
|
||||||
" {reset}{bold_white}{thin_white}{message}"
|
" {reset}{bold_white}{thin_white}{message}"
|
||||||
)
|
)
|
||||||
|
|
||||||
DATE_FORMAT = '%b %d %H:%M:%S'
|
DATE_FORMAT: str = '%b %d %H:%M:%S'
|
||||||
|
|
||||||
# FYI, ERROR is 40
|
# FYI, ERROR is 40
|
||||||
# TODO: use a `bidict` to avoid the :155 check?
|
# TODO: use a `bidict` to avoid the :155 check?
|
||||||
|
@ -75,7 +88,10 @@ STD_PALETTE = {
|
||||||
'TRANSPORT': 'cyan',
|
'TRANSPORT': 'cyan',
|
||||||
}
|
}
|
||||||
|
|
||||||
BOLD_PALETTE = {
|
BOLD_PALETTE: dict[
|
||||||
|
str,
|
||||||
|
dict[int, str],
|
||||||
|
] = {
|
||||||
'bold': {
|
'bold': {
|
||||||
level: f"bold_{color}" for level, color in STD_PALETTE.items()}
|
level: f"bold_{color}" for level, color in STD_PALETTE.items()}
|
||||||
}
|
}
|
||||||
|
@ -97,10 +113,17 @@ def at_least_level(
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
# TODO: this isn't showing the correct '{filename}'
|
# TODO, compare with using a "filter" instead?
|
||||||
# as it did before..
|
# - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
|
||||||
|
# |_corresponding dict-config,
|
||||||
|
# https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig/7507842#7507842
|
||||||
|
# - [ ] what's the benefit/tradeoffs?
|
||||||
|
#
|
||||||
class StackLevelAdapter(LoggerAdapter):
|
class StackLevelAdapter(LoggerAdapter):
|
||||||
|
'''
|
||||||
|
A (software) stack oriented logger "adapter".
|
||||||
|
|
||||||
|
'''
|
||||||
def at_least_level(
|
def at_least_level(
|
||||||
self,
|
self,
|
||||||
level: str,
|
level: str,
|
||||||
|
@ -284,7 +307,9 @@ class ActorContextInfo(Mapping):
|
||||||
|
|
||||||
def get_logger(
|
def get_logger(
|
||||||
name: str|None = None,
|
name: str|None = None,
|
||||||
_root_name: str = _proj_name,
|
pkg_name: str = _proj_name,
|
||||||
|
# XXX, deprecated, use ^
|
||||||
|
_root_name: str|None = None,
|
||||||
|
|
||||||
logger: Logger|None = None,
|
logger: Logger|None = None,
|
||||||
|
|
||||||
|
@ -293,22 +318,89 @@ def get_logger(
|
||||||
# |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
|
# |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
|
||||||
# |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
|
# |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
|
||||||
subsys_spec: str|None = None,
|
subsys_spec: str|None = None,
|
||||||
|
mk_sublog: bool = True,
|
||||||
|
|
||||||
) -> StackLevelAdapter:
|
) -> StackLevelAdapter:
|
||||||
'''
|
'''
|
||||||
Return the `tractor`-library root logger or a sub-logger for
|
Return the `tractor`-library root logger or a sub-logger for
|
||||||
`name` if provided.
|
`name` if provided.
|
||||||
|
|
||||||
|
When `name` is left null we try to auto-detect the caller's
|
||||||
|
`mod.__name__` and use that as a the sub-logger key.
|
||||||
|
This allows for example creating a module level instance like,
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
|
log = tractor.log.get_logger(_root_name='mylib')
|
||||||
|
|
||||||
|
and by default all console record headers will show the caller's
|
||||||
|
(of any `log.<level>()`-method) correct sub-pkg's
|
||||||
|
+ py-module-file.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
if _root_name:
|
||||||
|
msg: str = (
|
||||||
|
'The `_root_name: str` param of `get_logger()` is now deprecated.\n'
|
||||||
|
'Use the new `pkg_name: str` instead, it is the same usage.\n'
|
||||||
|
)
|
||||||
|
warnings.warn(
|
||||||
|
msg,
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
|
pkg_name: str = _root_name or pkg_name
|
||||||
log: Logger
|
log: Logger
|
||||||
log = rlog = logger or logging.getLogger(_root_name)
|
log = rlog = logger or logging.getLogger(pkg_name)
|
||||||
|
|
||||||
|
# Implicitly introspect the caller's module-name whenever `name`
|
||||||
|
# if left as the null default.
|
||||||
|
#
|
||||||
|
# When the `pkg_name` is `in` in the `mod.__name__` we presume
|
||||||
|
# this instance can be created as a sub-`StackLevelAdapter` and
|
||||||
|
# that the intention is get free module-path tracing and
|
||||||
|
# filtering (well once we implement that) oriented around the
|
||||||
|
# py-module code hierarchy of the consuming project.
|
||||||
|
if (
|
||||||
|
pkg_name != _proj_name
|
||||||
|
and
|
||||||
|
name is None
|
||||||
|
and
|
||||||
|
mk_sublog
|
||||||
|
):
|
||||||
|
callstack: list[FrameInfo] = stack()
|
||||||
|
caller_fi: FrameInfo = callstack[1]
|
||||||
|
caller_mod: ModuleType = getmodule(caller_fi.frame)
|
||||||
|
if caller_mod:
|
||||||
|
# ?how is this `mod.__name__` defined?
|
||||||
|
# -> well by how the mod is imported..
|
||||||
|
# |_https://stackoverflow.com/a/15883682
|
||||||
|
mod_name: str = caller_mod.__name__
|
||||||
|
mod_pkg: str = caller_mod.__package__
|
||||||
|
log.info(
|
||||||
|
f'Generating sub-logger name,\n'
|
||||||
|
f'{mod_pkg}.{mod_name}\n'
|
||||||
|
)
|
||||||
|
# if pkg_name in caller_mod.__package__:
|
||||||
|
# from tractor.devx.debug import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
|
||||||
|
if (
|
||||||
|
pkg_name
|
||||||
|
# and
|
||||||
|
# pkg_name in mod_name
|
||||||
|
):
|
||||||
|
name = mod_name
|
||||||
|
|
||||||
|
# XXX, lowlevel debuggin..
|
||||||
|
# if pkg_name != _proj_name:
|
||||||
|
# from tractor.devx.debug import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
|
||||||
if (
|
if (
|
||||||
name
|
|
||||||
and
|
|
||||||
name != _proj_name
|
name != _proj_name
|
||||||
|
and
|
||||||
|
name
|
||||||
):
|
):
|
||||||
|
|
||||||
# NOTE: for handling for modules that use `get_logger(__name__)`
|
# NOTE: for handling for modules that use `get_logger(__name__)`
|
||||||
# we make the following stylistic choice:
|
# we make the following stylistic choice:
|
||||||
# - always avoid duplicate project-package token
|
# - always avoid duplicate project-package token
|
||||||
|
@ -318,24 +410,63 @@ def get_logger(
|
||||||
# since in python the {filename} is always this same
|
# since in python the {filename} is always this same
|
||||||
# module-file.
|
# module-file.
|
||||||
|
|
||||||
sub_name: None|str = None
|
rname: str = pkg_name
|
||||||
rname, _, sub_name = name.partition('.')
|
pkg_path: str = name
|
||||||
pkgpath, _, modfilename = sub_name.rpartition('.')
|
|
||||||
|
|
||||||
# NOTE: for tractor itself never include the last level
|
# ex. modden.runtime.progman
|
||||||
# module key in the name such that something like: eg.
|
# -> rname='modden', _, pkg_path='runtime.progman'
|
||||||
# 'tractor.trionics._broadcast` only includes the first
|
if pkg_name in name:
|
||||||
# 2 tokens in the (coloured) name part.
|
rname, _, pkg_path = name.partition('.')
|
||||||
if rname == 'tractor':
|
|
||||||
sub_name = pkgpath
|
|
||||||
|
|
||||||
if _root_name in sub_name:
|
# ex. modden.runtime.progman
|
||||||
duplicate, _, sub_name = sub_name.partition('.')
|
# -> pkgpath='runtime', _, leaf_mod='progman'
|
||||||
|
subpkg_path, _, leaf_mod = pkg_path.rpartition('.')
|
||||||
|
|
||||||
if not sub_name:
|
# NOTE: special usage for passing `name=__name__`,
|
||||||
|
#
|
||||||
|
# - remove duplication of any root-pkg-name in the
|
||||||
|
# (sub/child-)logger name; i.e. never include the
|
||||||
|
# `pkg_name` *twice* in the top-most-pkg-name/level
|
||||||
|
#
|
||||||
|
# -> this happens normally since it is added to `.getChild()`
|
||||||
|
# and as the name of its root-logger.
|
||||||
|
#
|
||||||
|
# => So for ex. (module key in the name) something like
|
||||||
|
# `name='tractor.trionics._broadcast` is passed,
|
||||||
|
# only includes the first 2 sub-pkg name-tokens in the
|
||||||
|
# child-logger's name; the colored "pkg-namespace" header
|
||||||
|
# will then correctly show the same value as `name`.
|
||||||
|
if rname == pkg_name:
|
||||||
|
pkg_path = subpkg_path
|
||||||
|
|
||||||
|
# XXX, do some double-checks for duplication of,
|
||||||
|
# - root-pkg-name, already in root logger
|
||||||
|
# - leaf-module-name already in `{filename}` header-field
|
||||||
|
if pkg_name in pkg_path:
|
||||||
|
_duplicate, _, pkg_path = pkg_path.partition('.')
|
||||||
|
if _duplicate:
|
||||||
|
# assert _duplicate == rname
|
||||||
|
_root_log.warning(
|
||||||
|
f'Duplicate pkg-name in sub-logger key?\n'
|
||||||
|
f'pkg_name = {pkg_name!r}\n'
|
||||||
|
f'pkg_path = {pkg_path!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
leaf_mod
|
||||||
|
and
|
||||||
|
leaf_mod in pkg_path
|
||||||
|
):
|
||||||
|
_root_log.warning(
|
||||||
|
f'Duplicate leaf-module-name in sub-logger key?\n'
|
||||||
|
f'leaf_mod = {leaf_mod!r}\n'
|
||||||
|
f'pkg_path = {pkg_path!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if not pkg_path:
|
||||||
log = rlog
|
log = rlog
|
||||||
else:
|
elif mk_sublog:
|
||||||
log = rlog.getChild(sub_name)
|
log = rlog.getChild(pkg_path)
|
||||||
|
|
||||||
log.level = rlog.level
|
log.level = rlog.level
|
||||||
|
|
||||||
|
@ -350,8 +481,13 @@ def get_logger(
|
||||||
for name, val in CUSTOM_LEVELS.items():
|
for name, val in CUSTOM_LEVELS.items():
|
||||||
logging.addLevelName(val, name)
|
logging.addLevelName(val, name)
|
||||||
|
|
||||||
# ensure customs levels exist as methods
|
# ensure our custom adapter levels exist as methods
|
||||||
assert getattr(logger, name.lower()), f'Logger does not define {name}'
|
assert getattr(
|
||||||
|
logger,
|
||||||
|
name.lower()
|
||||||
|
), (
|
||||||
|
f'Logger does not define {name}'
|
||||||
|
)
|
||||||
|
|
||||||
return logger
|
return logger
|
||||||
|
|
||||||
|
@ -425,4 +561,4 @@ def get_loglevel() -> str:
|
||||||
|
|
||||||
|
|
||||||
# global module logger for tractor itself
|
# global module logger for tractor itself
|
||||||
log: StackLevelAdapter = get_logger('tractor')
|
_root_log: StackLevelAdapter = get_logger('tractor')
|
||||||
|
|
|
@ -613,10 +613,9 @@ async def drain_to_final_msg(
|
||||||
# msg: dict = await ctx._rx_chan.receive()
|
# msg: dict = await ctx._rx_chan.receive()
|
||||||
# if res_cs.cancelled_caught:
|
# if res_cs.cancelled_caught:
|
||||||
#
|
#
|
||||||
# -[ ] make sure pause points work here for REPLing
|
# -[x] make sure pause points work here for REPLing
|
||||||
# the runtime itself; i.e. ensure there's no hangs!
|
# the runtime itself; i.e. ensure there's no hangs!
|
||||||
# |_from tractor.devx.debug import pause
|
# |_see masked code below in .cancel_called path
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# NOTE: we get here if the far end was
|
# NOTE: we get here if the far end was
|
||||||
# `ContextCancelled` in 2 cases:
|
# `ContextCancelled` in 2 cases:
|
||||||
|
@ -652,6 +651,10 @@ async def drain_to_final_msg(
|
||||||
f'IPC ctx cancelled externally during result drain ?\n'
|
f'IPC ctx cancelled externally during result drain ?\n'
|
||||||
f'{ctx}'
|
f'{ctx}'
|
||||||
)
|
)
|
||||||
|
# XXX, for tracing `Cancelled`..
|
||||||
|
# from tractor.devx.debug import pause
|
||||||
|
# await pause(shield=True)
|
||||||
|
|
||||||
# CASE 2: mask the local cancelled-error(s)
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
|
|
Loading…
Reference in New Issue