Add a "current IPC `Context`" `ContextVar`

Expose it from `._state.current_ipc_ctx()` and set it inside
`._rpc._invoke()` for child and inside `Portal.open_context()` for
parent.

Still need to write a few more tests (particularly demonstrating usage
throughout multiple nested nurseries on each side) but this suffices as
a proto for testing with some debugger request-from-subactor stuff.

Other,
- use new `.devx.pformat.add_div()` for ctxc messages.
- add a block to always traceback dump on corrupted cs stacks.
- better handle non-RAEs exception output-formatting in context
  termination summary log message.
- use a summary for `start_status` for msg logging in RPC loop.
runtime_to_msgspec
Tyler Goodlet 2024-05-07 09:20:43 -04:00
parent b278164f83
commit fbc21a1dec
3 changed files with 106 additions and 50 deletions

View File

@ -25,6 +25,7 @@ from tractor._exceptions import (
StreamOverrun, StreamOverrun,
ContextCancelled, ContextCancelled,
) )
from tractor._state import current_ipc_ctx
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
@ -144,6 +145,8 @@ async def simple_setup_teardown(
global _state global _state
_state = True _state = True
assert current_ipc_ctx() is ctx
# signal to parent that we're up # signal to parent that we're up
await ctx.started(data + 1) await ctx.started(data + 1)
@ -204,6 +207,7 @@ def test_simple_context(
block_forever=callee_blocks_forever, block_forever=callee_blocks_forever,
) as (ctx, sent), ) as (ctx, sent),
): ):
assert current_ipc_ctx() is ctx
assert sent == 11 assert sent == 11
if callee_blocks_forever: if callee_blocks_forever:

View File

@ -57,6 +57,7 @@ from ._exceptions import (
from .devx import ( from .devx import (
maybe_wait_for_debugger, maybe_wait_for_debugger,
_debug, _debug,
add_div,
) )
from . import _state from . import _state
from .log import get_logger from .log import get_logger
@ -250,6 +251,9 @@ async def _errors_relayed_via_ipc(
) -> None: ) -> None:
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()`
try: try:
yield # run RPC invoke body yield # run RPC invoke body
@ -273,6 +277,8 @@ async def _errors_relayed_via_ipc(
# TODO: maybe we'll want different "levels" of debugging # TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ? # eventualy such as ('app', 'supervisory', 'runtime') ?
#
# -[ ] this if check is duplicate with `._maybe_enter_pm()`..
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
entered_debug: bool = False entered_debug: bool = False
if ( if (
@ -296,7 +302,6 @@ async def _errors_relayed_via_ipc(
) )
) )
): ):
# await _debug.pause()
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this # => I can't think of a reason that inspecting this
@ -304,7 +309,14 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of # recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err) log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
entered_debug = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if not entered_debug: if not entered_debug:
log.exception( log.exception(
'RPC task crashed\n' 'RPC task crashed\n'
@ -434,6 +446,8 @@ async def _invoke(
) )
context: bool = False context: bool = False
assert not _state._ctxvar_Context.get()
# TODO: deprecate this style.. # TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
@ -557,6 +571,7 @@ async def _invoke(
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
ctx._scope_nursery = tn ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx) task_status.started(ctx)
# TODO: should would be nice to have our # TODO: should would be nice to have our
@ -592,7 +607,6 @@ async def _invoke(
cs: CancelScope = ctx._scope cs: CancelScope = ctx._scope
if cs.cancel_called: if cs.cancel_called:
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by ' explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -621,23 +635,9 @@ async def _invoke(
else: else:
explain += 'a remote peer' explain += 'a remote peer'
# TODO: move this "div centering" into
# a helper for use elsewhere!
div_chars: str = '------ - ------'
div_offset: int = (
round(len(explain)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
explain += ( explain += (
div_str + add_div(message=explain)
+
f'<= canceller: {canceller}\n' f'<= canceller: {canceller}\n'
f'=> cancellee: {our_uid}\n' f'=> cancellee: {our_uid}\n'
# TODO: better repr for ctx tasks.. # TODO: better repr for ctx tasks..
@ -664,10 +664,10 @@ async def _invoke(
boxed_type=trio.Cancelled, boxed_type=trio.Cancelled,
canceller=canceller, canceller=canceller,
) )
# assign local error so that the `.outcome` # does this matter other then for
# resolves to an error for both reporting and # consistentcy/testing? |_ no user code should be
# state checks. # in this scope at this point..
ctx._local_error = ctxc # ctx._local_error = ctxc
raise ctxc raise ctxc
# XXX: do we ever trigger this block any more? # XXX: do we ever trigger this block any more?
@ -677,6 +677,13 @@ async def _invoke(
BaseException, BaseException,
) as scope_error: ) as scope_error:
if (
isinstance(scope_error, RuntimeError)
and scope_error.args
and 'Cancel scope stack corrupted' in scope_error.args[0]
):
log.exception('Cancel scope stack corrupted!?\n')
# _debug.mk_pdb().set_trace()
# always set this (child) side's exception as the # always set this (child) side's exception as the
# local error on the context # local error on the context
@ -710,17 +717,32 @@ async def _invoke(
res_type_str, res_type_str,
res_str, res_str,
) = ( ) = (
('error', f'{type(merr)}',) ('error', f'{type(merr)}',) if merr
if merr
else ( else (
'result', 'result',
f'`{repr(ctx.outcome)}`', f'`{repr(ctx.outcome)}`',
) )
) )
log.runtime( message: str = (
f'IPC context terminated with a final {res_type_str}\n\n' f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}' f'{ctx}'
) )
if merr:
from tractor import RemoteActorError
if not isinstance(merr, RemoteActorError):
fmt_merr: str = (
f'\n{merr!r}\n'
# f'{merr.args[0]!r}\n'
)
else:
fmt_merr = f'\n{merr!r}'
log.error(
message
+
fmt_merr
)
else:
log.runtime(message)
async def try_ship_error_to_remote( async def try_ship_error_to_remote(
@ -955,12 +977,19 @@ async def process_messages(
kwargs=kwargs, # type-spec this? see `msg.types` kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid, uid=actorid,
): ):
log.runtime( start_status: str = (
'Handling RPC `Start` request\n' 'Handling RPC `Start` request\n'
f'<= peer: {actorid}\n' f'<= peer: {actorid}\n\n'
f' |_{ns}.{funcname}({kwargs})\n\n' f' |_{chan}\n'
f' |_cid: {cid}\n\n'
# f' |_{ns}.{funcname}({kwargs})\n'
f'>> {actor.uid}\n'
f' |_{actor}\n'
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
f'{pretty_struct.pformat(msg)}\n' # f' |_{ns}.{funcname}({kwargs})\n\n'
# f'{pretty_struct.pformat(msg)}\n'
) )
# runtime-internal endpoint: `Actor.<funcname>` # runtime-internal endpoint: `Actor.<funcname>`
@ -989,6 +1018,10 @@ async def process_messages(
await chan.send(err_msg) await chan.send(err_msg)
continue continue
start_status += (
f' -> func: {func}\n'
)
# schedule a task for the requested RPC function # schedule a task for the requested RPC function
# in the actor's main "service nursery". # in the actor's main "service nursery".
# #
@ -996,18 +1029,8 @@ async def process_messages(
# supervision isolation? would avoid having to # supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks` # manage RPC tasks individually in `._rpc_tasks`
# table? # table?
log.runtime( start_status += ' -> scheduling new task..\n'
f'Spawning task for RPC request\n' log.runtime(start_status)
f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n'
# ^-TODO-^ maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n'
f'=> {actor}\n'
f' |_cid: {cid}\n'
f' |>> {func}()\n'
)
try: try:
ctx: Context = await actor._service_n.start( ctx: Context = await actor._service_n.start(
partial( partial(
@ -1035,8 +1058,9 @@ async def process_messages(
# scoped exception from ``_invoke()`` itself. # scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception): if isinstance(err := ctx, Exception):
log.warning( log.warning(
'Task for RPC failed?' start_status
f'|_ {func}()\n\n' +
' -> task for RPC failed?\n\n'
f'{err}' f'{err}'
) )
continue continue
@ -1155,12 +1179,17 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( if msg is None:
'Exiting IPC msg loop with final msg\n\n' message: str = 'Exiting IPC msg loop without receiving a msg?'
f'<= peer: {chan.uid}\n' else:
f' |_{chan}\n\n' message: str = (
f'{pretty_struct.pformat(msg)}' 'Exiting IPC msg loop with final msg\n\n'
) f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
f'{pretty_struct.pformat(msg)}'
)
log.runtime(message)
# transport **WAS NOT** disconnected # transport **WAS NOT** disconnected
return (False, msg) return (False, msg)

View File

@ -19,13 +19,19 @@ Per process state
""" """
from __future__ import annotations from __future__ import annotations
from contextvars import (
ContextVar,
)
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
) )
from trio.lowlevel import current_task
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
from ._context import Context
_current_actor: Actor|None = None # type: ignore # noqa _current_actor: Actor|None = None # type: ignore # noqa
@ -110,3 +116,20 @@ def debug_mode() -> bool:
def is_root_process() -> bool: def is_root_process() -> bool:
return _runtime_vars['_is_root'] return _runtime_vars['_is_root']
_ctxvar_Context: ContextVar[Context] = ContextVar(
'ipc_context',
default=None,
)
def current_ipc_ctx() -> Context:
ctx: Context = _ctxvar_Context.get()
if not ctx:
from ._exceptions import InternalError
raise InternalError(
'No IPC context has been allocated for this task yet?\n'
f'|_{current_task()}\n'
)
return ctx