Compare commits

...

4 Commits

Author SHA1 Message Date
Tyler Goodlet f067cf48a7 Unify some log msgs in `.to_asyncio`
Much like similar recent changes throughout the core, build out `msg:
str` depending on error cases and emit with `.cancel()` level as
appropes. Also mute (via level) some duplication in the cancel case
inside `_run_asyncio_task()` for console noise reduction.
2024-03-08 16:07:17 -05:00
Tyler Goodlet c56d4b0a79 Assign `ctx._local_error` ASAP from `.open_context()`
Such that `.outcome` related fields render nicely asap for logging
withing `Portal.open_context()` itself.
2024-03-08 16:03:13 -05:00
Tyler Goodlet 7cafb59ab7 Tweak `Context.repr_outcome()` for KBIs
Since apparently `str(KeyboardInterrupt()) == ''`? So instead add little
`<str> or repr(merr)` expressions throughout to avoid blank strings
rendering if various `repr()`/`.__str__()` outputs..
2024-03-08 15:46:42 -05:00
Tyler Goodlet 7458f99733 Add a `._state._runtime_vars['_registry_addrs']`
Such that it's set to whatever `Actor.reg_addrs: list[tuple]` is during
the actor's init-after-spawn guaranteeing each actor has at least the
registry infos from its parent. Ensure we read this if defined over
`_root._default_lo_addrs` in `._discovery` routines, namely
`.find_actor()` since it's the one API normally used without expecting
the runtime's `current_actor()` to be up.

Update the latest inter-peer cancellation test to use the `reg_addr`
fixture (and thus test this new runtime-vars value via `find_actor()`
usage) since it was failing if run *after* the infected `asyncio` suite
due to registry contact failure.
2024-03-08 15:34:20 -05:00
7 changed files with 86 additions and 33 deletions

View File

@ -939,6 +939,7 @@ async def tell_little_bro(
def test_peer_spawns_and_cancels_service_subactor( def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool, debug_mode: bool,
raise_client_error: str, raise_client_error: str,
reg_addr: tuple[str, int],
): ):
# NOTE: this tests for the modden `mod wks open piker` bug # NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx # discovered as part of implementing workspace ctx
@ -956,6 +957,7 @@ def test_peer_spawns_and_cancels_service_subactor(
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this. # NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode, debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an: ) as an:
server: Portal = await an.start_actor( server: Portal = await an.start_actor(
(server_name := 'spawn_server'), (server_name := 'spawn_server'),

View File

@ -1485,7 +1485,11 @@ class Context:
TODO: implement this using `outcome.Outcome` types? TODO: implement this using `outcome.Outcome` types?
''' '''
return self.maybe_error or self._result return (
self.maybe_error
or
self._result
)
# @property # @property
def repr_outcome( def repr_outcome(
@ -1520,16 +1524,28 @@ class Context:
# #
# just the type name for now to avoid long lines # just the type name for now to avoid long lines
# when tons of cancels.. # when tons of cancels..
return type(merr).__name__ return (
str(type(merr).__name__)
or
repr(merr)
)
# just the type name # just the type name
# else: # but wen? # else: # but wen?
# return type(merr).__name__ # return type(merr).__name__
# for all other errors show their regular output # for all other errors show their regular output
return str(merr) return (
str(merr)
or
repr(merr)
)
return str(self._result) return (
str(self._result)
or
repr(self._result)
)
async def started( async def started(
self, self,

View File

@ -35,7 +35,10 @@ from ._portal import (
open_portal, open_portal,
LocalPortal, LocalPortal,
) )
from ._state import current_actor, _runtime_vars from ._state import (
current_actor,
_runtime_vars,
)
if TYPE_CHECKING: if TYPE_CHECKING:
@ -205,7 +208,11 @@ async def find_actor(
# every call since something may change it globally (eg. # every call since something may change it globally (eg.
# like in our discovery test suite)! # like in our discovery test suite)!
from . import _root from . import _root
registry_addrs = _root._default_lo_addrs registry_addrs = (
_runtime_vars['_registry_addrs']
or
_root._default_lo_addrs
)
maybe_portals: list[ maybe_portals: list[
AsyncContextManager[tuple[str, int]] AsyncContextManager[tuple[str, int]]

View File

@ -30,7 +30,7 @@ from typing import (
Any, Any,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
Type, # Type,
) )
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
@ -41,8 +41,7 @@ from async_generator import asynccontextmanager
from .trionics import maybe_open_nursery from .trionics import maybe_open_nursery
from .devx import ( from .devx import (
# acquire_debug_lock, # _debug,
# pause,
maybe_wait_for_debugger, maybe_wait_for_debugger,
) )
from ._state import ( from ._state import (
@ -673,6 +672,7 @@ class Portal:
# `Nursery.cancel_scope.cancel()`) # `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
scope_err = ctxc scope_err = ctxc
ctx._local_error: BaseException = scope_err
ctxc_from_callee = ctxc ctxc_from_callee = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
@ -684,7 +684,7 @@ class Portal:
# debugging the tractor-runtime itself using it's # debugging the tractor-runtime itself using it's
# own `.devx.` tooling! # own `.devx.` tooling!
# #
# await pause() # await _debug.pause()
# CASE 2: context was cancelled by local task calling # CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should # `.cancel()`, we don't raise and the exit block should
@ -745,18 +745,20 @@ class Portal:
) as caller_err: ) as caller_err:
scope_err = caller_err scope_err = caller_err
ctx._local_error: BaseException = scope_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR. # XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in # NOTE: `Context.cancel()` is conversely NEVER CALLED in
# the `ContextCancelled` "self cancellation absorbed" case # the `ContextCancelled` "self cancellation absorbed" case
# handled in the block above ^^^ !! # handled in the block above ^^^ !!
# await _debug.pause()
log.cancel( log.cancel(
'Context terminated due to\n\n' 'Context terminated due to\n\n'
f'{caller_err}\n' f'.outcome => {ctx.repr_outcome()}\n'
) )
if debug_mode(): if debug_mode():
# async with acquire_debug_lock(self.actor.uid): # async with _debug.acquire_debug_lock(self.actor.uid):
# pass # pass
# TODO: factor ^ into below for non-root cases? # TODO: factor ^ into below for non-root cases?
was_acquired: bool = await maybe_wait_for_debugger( was_acquired: bool = await maybe_wait_for_debugger(
@ -818,6 +820,7 @@ class Portal:
# this task didn't know until final teardown # this task didn't know until final teardown
# / value collection. # / value collection.
scope_err = berr scope_err = berr
ctx._local_error: BaseException = scope_err
raise raise
# yes! this worx Bp # yes! this worx Bp
@ -927,8 +930,10 @@ class Portal:
# should be stored as the `Context._local_error` and # should be stored as the `Context._local_error` and
# used in determining `Context.cancelled_caught: bool`. # used in determining `Context.cancelled_caught: bool`.
if scope_err is not None: if scope_err is not None:
ctx._local_error: BaseException = scope_err # sanity, tho can remove?
etype: Type[BaseException] = type(scope_err) assert ctx._local_error is scope_err
# ctx._local_error: BaseException = scope_err
# etype: Type[BaseException] = type(scope_err)
# CASE 2 # CASE 2
if ( if (

View File

@ -896,6 +896,7 @@ class Actor:
self._reg_addrs: list[tuple[str, int]] = [] self._reg_addrs: list[tuple[str, int]] = []
if registry_addrs: if registry_addrs:
self.reg_addrs: list[tuple[str, int]] = registry_addrs self.reg_addrs: list[tuple[str, int]] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs
@property @property
def reg_addrs(self) -> list[tuple[str, int]]: def reg_addrs(self) -> list[tuple[str, int]]:

View File

@ -33,7 +33,8 @@ _last_actor_terminated: Actor|None = None
_runtime_vars: dict[str, Any] = { _runtime_vars: dict[str, Any] = {
'_debug_mode': False, '_debug_mode': False,
'_is_root': False, '_is_root': False,
'_root_mailbox': (None, None) '_root_mailbox': (None, None),
'_registry_addrs': [],
} }

View File

@ -216,7 +216,14 @@ def _run_asyncio_task(
try: try:
result = await coro result = await coro
except BaseException as aio_err: except BaseException as aio_err:
log.exception('asyncio task errored') if isinstance(aio_err, CancelledError):
log.runtime(
'`asyncio` task was cancelled..\n'
)
else:
log.exception(
'`asyncio` task errored\n'
)
chan._aio_err = aio_err chan._aio_err = aio_err
raise raise
@ -271,12 +278,22 @@ def _run_asyncio_task(
except BaseException as terr: except BaseException as terr:
task_err = terr task_err = terr
msg: str = (
'Infected `asyncio` task {etype_str}\n'
f'|_{task}\n'
)
if isinstance(terr, CancelledError): if isinstance(terr, CancelledError):
log.cancel(f'`asyncio` task cancelled: {task.get_name()}') log.cancel(
msg.format(etype_str='cancelled')
)
else: else:
log.exception(f'`asyncio` task: {task.get_name()} errored') log.exception(
msg.format(etype_str='cancelled')
)
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' assert type(terr) is type(aio_err), (
'`asyncio` task error mismatch?!?'
)
if aio_err is not None: if aio_err is not None:
# XXX: uhh is this true? # XXX: uhh is this true?
@ -289,19 +306,23 @@ def _run_asyncio_task(
# We might want to change this in the future though. # We might want to change this in the future though.
from_aio.close() from_aio.close()
if type(aio_err) is CancelledError: if task_err is None:
log.cancel("infected task was cancelled") assert aio_err
aio_err.with_traceback(aio_err.__traceback__)
# log.error(
# 'infected task errorred'
# )
# TODO: show that the cancellation originated # TODO: show that the cancellation originated
# from the ``trio`` side? right? # from the ``trio`` side? right?
# elif type(aio_err) is CancelledError:
# log.cancel(
# 'infected task was cancelled'
# )
# if cancel_scope.cancelled: # if cancel_scope.cancelled:
# raise aio_err from err # raise aio_err from err
elif task_err is None:
assert aio_err
aio_err.with_traceback(aio_err.__traceback__)
log.error('infected task errorred')
# XXX: alway cancel the scope on error # XXX: alway cancel the scope on error
# in case the trio task is blocking # in case the trio task is blocking
# on a checkpoint. # on a checkpoint.