Compare commits
4 Commits
4c3c3e4b56
...
f067cf48a7
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | f067cf48a7 | |
Tyler Goodlet | c56d4b0a79 | |
Tyler Goodlet | 7cafb59ab7 | |
Tyler Goodlet | 7458f99733 |
|
@ -939,6 +939,7 @@ async def tell_little_bro(
|
||||||
def test_peer_spawns_and_cancels_service_subactor(
|
def test_peer_spawns_and_cancels_service_subactor(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
raise_client_error: str,
|
raise_client_error: str,
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
):
|
):
|
||||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||||
# discovered as part of implementing workspace ctx
|
# discovered as part of implementing workspace ctx
|
||||||
|
@ -956,6 +957,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
|
registry_addrs=[reg_addr],
|
||||||
) as an:
|
) as an:
|
||||||
server: Portal = await an.start_actor(
|
server: Portal = await an.start_actor(
|
||||||
(server_name := 'spawn_server'),
|
(server_name := 'spawn_server'),
|
||||||
|
|
|
@ -1485,7 +1485,11 @@ class Context:
|
||||||
TODO: implement this using `outcome.Outcome` types?
|
TODO: implement this using `outcome.Outcome` types?
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.maybe_error or self._result
|
return (
|
||||||
|
self.maybe_error
|
||||||
|
or
|
||||||
|
self._result
|
||||||
|
)
|
||||||
|
|
||||||
# @property
|
# @property
|
||||||
def repr_outcome(
|
def repr_outcome(
|
||||||
|
@ -1520,16 +1524,28 @@ class Context:
|
||||||
#
|
#
|
||||||
# just the type name for now to avoid long lines
|
# just the type name for now to avoid long lines
|
||||||
# when tons of cancels..
|
# when tons of cancels..
|
||||||
return type(merr).__name__
|
return (
|
||||||
|
str(type(merr).__name__)
|
||||||
|
or
|
||||||
|
repr(merr)
|
||||||
|
)
|
||||||
|
|
||||||
# just the type name
|
# just the type name
|
||||||
# else: # but wen?
|
# else: # but wen?
|
||||||
# return type(merr).__name__
|
# return type(merr).__name__
|
||||||
|
|
||||||
# for all other errors show their regular output
|
# for all other errors show their regular output
|
||||||
return str(merr)
|
return (
|
||||||
|
str(merr)
|
||||||
|
or
|
||||||
|
repr(merr)
|
||||||
|
)
|
||||||
|
|
||||||
return str(self._result)
|
return (
|
||||||
|
str(self._result)
|
||||||
|
or
|
||||||
|
repr(self._result)
|
||||||
|
)
|
||||||
|
|
||||||
async def started(
|
async def started(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -35,7 +35,10 @@ from ._portal import (
|
||||||
open_portal,
|
open_portal,
|
||||||
LocalPortal,
|
LocalPortal,
|
||||||
)
|
)
|
||||||
from ._state import current_actor, _runtime_vars
|
from ._state import (
|
||||||
|
current_actor,
|
||||||
|
_runtime_vars,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -205,7 +208,11 @@ async def find_actor(
|
||||||
# every call since something may change it globally (eg.
|
# every call since something may change it globally (eg.
|
||||||
# like in our discovery test suite)!
|
# like in our discovery test suite)!
|
||||||
from . import _root
|
from . import _root
|
||||||
registry_addrs = _root._default_lo_addrs
|
registry_addrs = (
|
||||||
|
_runtime_vars['_registry_addrs']
|
||||||
|
or
|
||||||
|
_root._default_lo_addrs
|
||||||
|
)
|
||||||
|
|
||||||
maybe_portals: list[
|
maybe_portals: list[
|
||||||
AsyncContextManager[tuple[str, int]]
|
AsyncContextManager[tuple[str, int]]
|
||||||
|
|
|
@ -30,7 +30,7 @@ from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
Type,
|
# Type,
|
||||||
)
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
@ -41,8 +41,7 @@ from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from .trionics import maybe_open_nursery
|
from .trionics import maybe_open_nursery
|
||||||
from .devx import (
|
from .devx import (
|
||||||
# acquire_debug_lock,
|
# _debug,
|
||||||
# pause,
|
|
||||||
maybe_wait_for_debugger,
|
maybe_wait_for_debugger,
|
||||||
)
|
)
|
||||||
from ._state import (
|
from ._state import (
|
||||||
|
@ -673,6 +672,7 @@ class Portal:
|
||||||
# `Nursery.cancel_scope.cancel()`)
|
# `Nursery.cancel_scope.cancel()`)
|
||||||
except ContextCancelled as ctxc:
|
except ContextCancelled as ctxc:
|
||||||
scope_err = ctxc
|
scope_err = ctxc
|
||||||
|
ctx._local_error: BaseException = scope_err
|
||||||
ctxc_from_callee = ctxc
|
ctxc_from_callee = ctxc
|
||||||
|
|
||||||
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
|
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
|
||||||
|
@ -684,7 +684,7 @@ class Portal:
|
||||||
# debugging the tractor-runtime itself using it's
|
# debugging the tractor-runtime itself using it's
|
||||||
# own `.devx.` tooling!
|
# own `.devx.` tooling!
|
||||||
#
|
#
|
||||||
# await pause()
|
# await _debug.pause()
|
||||||
|
|
||||||
# CASE 2: context was cancelled by local task calling
|
# CASE 2: context was cancelled by local task calling
|
||||||
# `.cancel()`, we don't raise and the exit block should
|
# `.cancel()`, we don't raise and the exit block should
|
||||||
|
@ -745,18 +745,20 @@ class Portal:
|
||||||
|
|
||||||
) as caller_err:
|
) as caller_err:
|
||||||
scope_err = caller_err
|
scope_err = caller_err
|
||||||
|
ctx._local_error: BaseException = scope_err
|
||||||
|
|
||||||
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
|
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
|
||||||
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
|
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
|
||||||
# the `ContextCancelled` "self cancellation absorbed" case
|
# the `ContextCancelled` "self cancellation absorbed" case
|
||||||
# handled in the block above ^^^ !!
|
# handled in the block above ^^^ !!
|
||||||
|
# await _debug.pause()
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Context terminated due to\n\n'
|
'Context terminated due to\n\n'
|
||||||
f'{caller_err}\n'
|
f'.outcome => {ctx.repr_outcome()}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
if debug_mode():
|
if debug_mode():
|
||||||
# async with acquire_debug_lock(self.actor.uid):
|
# async with _debug.acquire_debug_lock(self.actor.uid):
|
||||||
# pass
|
# pass
|
||||||
# TODO: factor ^ into below for non-root cases?
|
# TODO: factor ^ into below for non-root cases?
|
||||||
was_acquired: bool = await maybe_wait_for_debugger(
|
was_acquired: bool = await maybe_wait_for_debugger(
|
||||||
|
@ -818,6 +820,7 @@ class Portal:
|
||||||
# this task didn't know until final teardown
|
# this task didn't know until final teardown
|
||||||
# / value collection.
|
# / value collection.
|
||||||
scope_err = berr
|
scope_err = berr
|
||||||
|
ctx._local_error: BaseException = scope_err
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# yes! this worx Bp
|
# yes! this worx Bp
|
||||||
|
@ -927,8 +930,10 @@ class Portal:
|
||||||
# should be stored as the `Context._local_error` and
|
# should be stored as the `Context._local_error` and
|
||||||
# used in determining `Context.cancelled_caught: bool`.
|
# used in determining `Context.cancelled_caught: bool`.
|
||||||
if scope_err is not None:
|
if scope_err is not None:
|
||||||
ctx._local_error: BaseException = scope_err
|
# sanity, tho can remove?
|
||||||
etype: Type[BaseException] = type(scope_err)
|
assert ctx._local_error is scope_err
|
||||||
|
# ctx._local_error: BaseException = scope_err
|
||||||
|
# etype: Type[BaseException] = type(scope_err)
|
||||||
|
|
||||||
# CASE 2
|
# CASE 2
|
||||||
if (
|
if (
|
||||||
|
|
|
@ -811,10 +811,10 @@ class Actor:
|
||||||
name: str,
|
name: str,
|
||||||
*,
|
*,
|
||||||
enable_modules: list[str] = [],
|
enable_modules: list[str] = [],
|
||||||
uid: str | None = None,
|
uid: str|None = None,
|
||||||
loglevel: str | None = None,
|
loglevel: str|None = None,
|
||||||
registry_addrs: list[tuple[str, int]] | None = None,
|
registry_addrs: list[tuple[str, int]]|None = None,
|
||||||
spawn_method: str | None = None,
|
spawn_method: str|None = None,
|
||||||
|
|
||||||
# TODO: remove!
|
# TODO: remove!
|
||||||
arbiter_addr: tuple[str, int] | None = None,
|
arbiter_addr: tuple[str, int] | None = None,
|
||||||
|
@ -896,6 +896,7 @@ class Actor:
|
||||||
self._reg_addrs: list[tuple[str, int]] = []
|
self._reg_addrs: list[tuple[str, int]] = []
|
||||||
if registry_addrs:
|
if registry_addrs:
|
||||||
self.reg_addrs: list[tuple[str, int]] = registry_addrs
|
self.reg_addrs: list[tuple[str, int]] = registry_addrs
|
||||||
|
_state._runtime_vars['_registry_addrs'] = registry_addrs
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def reg_addrs(self) -> list[tuple[str, int]]:
|
def reg_addrs(self) -> list[tuple[str, int]]:
|
||||||
|
|
|
@ -33,7 +33,8 @@ _last_actor_terminated: Actor|None = None
|
||||||
_runtime_vars: dict[str, Any] = {
|
_runtime_vars: dict[str, Any] = {
|
||||||
'_debug_mode': False,
|
'_debug_mode': False,
|
||||||
'_is_root': False,
|
'_is_root': False,
|
||||||
'_root_mailbox': (None, None)
|
'_root_mailbox': (None, None),
|
||||||
|
'_registry_addrs': [],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -216,7 +216,14 @@ def _run_asyncio_task(
|
||||||
try:
|
try:
|
||||||
result = await coro
|
result = await coro
|
||||||
except BaseException as aio_err:
|
except BaseException as aio_err:
|
||||||
log.exception('asyncio task errored')
|
if isinstance(aio_err, CancelledError):
|
||||||
|
log.runtime(
|
||||||
|
'`asyncio` task was cancelled..\n'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.exception(
|
||||||
|
'`asyncio` task errored\n'
|
||||||
|
)
|
||||||
chan._aio_err = aio_err
|
chan._aio_err = aio_err
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -271,12 +278,22 @@ def _run_asyncio_task(
|
||||||
except BaseException as terr:
|
except BaseException as terr:
|
||||||
task_err = terr
|
task_err = terr
|
||||||
|
|
||||||
|
msg: str = (
|
||||||
|
'Infected `asyncio` task {etype_str}\n'
|
||||||
|
f'|_{task}\n'
|
||||||
|
)
|
||||||
if isinstance(terr, CancelledError):
|
if isinstance(terr, CancelledError):
|
||||||
log.cancel(f'`asyncio` task cancelled: {task.get_name()}')
|
log.cancel(
|
||||||
|
msg.format(etype_str='cancelled')
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
log.exception(f'`asyncio` task: {task.get_name()} errored')
|
log.exception(
|
||||||
|
msg.format(etype_str='cancelled')
|
||||||
|
)
|
||||||
|
|
||||||
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
|
assert type(terr) is type(aio_err), (
|
||||||
|
'`asyncio` task error mismatch?!?'
|
||||||
|
)
|
||||||
|
|
||||||
if aio_err is not None:
|
if aio_err is not None:
|
||||||
# XXX: uhh is this true?
|
# XXX: uhh is this true?
|
||||||
|
@ -289,18 +306,22 @@ def _run_asyncio_task(
|
||||||
# We might want to change this in the future though.
|
# We might want to change this in the future though.
|
||||||
from_aio.close()
|
from_aio.close()
|
||||||
|
|
||||||
if type(aio_err) is CancelledError:
|
if task_err is None:
|
||||||
log.cancel("infected task was cancelled")
|
|
||||||
|
|
||||||
# TODO: show that the cancellation originated
|
|
||||||
# from the ``trio`` side? right?
|
|
||||||
# if cancel_scope.cancelled:
|
|
||||||
# raise aio_err from err
|
|
||||||
|
|
||||||
elif task_err is None:
|
|
||||||
assert aio_err
|
assert aio_err
|
||||||
aio_err.with_traceback(aio_err.__traceback__)
|
aio_err.with_traceback(aio_err.__traceback__)
|
||||||
log.error('infected task errorred')
|
# log.error(
|
||||||
|
# 'infected task errorred'
|
||||||
|
# )
|
||||||
|
|
||||||
|
# TODO: show that the cancellation originated
|
||||||
|
# from the ``trio`` side? right?
|
||||||
|
# elif type(aio_err) is CancelledError:
|
||||||
|
# log.cancel(
|
||||||
|
# 'infected task was cancelled'
|
||||||
|
# )
|
||||||
|
|
||||||
|
# if cancel_scope.cancelled:
|
||||||
|
# raise aio_err from err
|
||||||
|
|
||||||
# XXX: alway cancel the scope on error
|
# XXX: alway cancel the scope on error
|
||||||
# in case the trio task is blocking
|
# in case the trio task is blocking
|
||||||
|
|
Loading…
Reference in New Issue