Set `_ctxvar_Context` for child-side RPC tasks

Just inside `._invoke()` after the `ctx: Context` is retrieved.

Also try our best to *not hide* internal frames when a non-user-code
crash happens, normally either due to a runtime RPC EP bug or
a transport failure.
runtime_to_msgspec
Tyler Goodlet 2024-05-20 16:18:42 -04:00
parent e78fdf2f69
commit 4ef77bb64f
1 changed files with 37 additions and 18 deletions

View File

@ -70,7 +70,6 @@ from .msg import (
from tractor.msg.types import ( from tractor.msg.types import (
CancelAck, CancelAck,
Error, Error,
Msg,
MsgType, MsgType,
Return, Return,
Start, Start,
@ -250,10 +249,17 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# NOTE: we normally always hide this frame in call-stack tracebacks
# if the crash originated from an RPC task (since normally the
# user is only going to care about their own code not this
# internal runtime frame) and we DID NOT
# fail due to an IPC transport error!
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode! # TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn: # async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()` # => see matching comment in side `._debug._pause()`
rpc_err: BaseException|None = None
try: try:
yield # run RPC invoke body yield # run RPC invoke body
@ -264,16 +270,7 @@ async def _errors_relayed_via_ipc(
BaseExceptionGroup, BaseExceptionGroup,
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as err:
rpc_err = err
# NOTE: always hide this frame from debug REPL call stack
# if the crash originated from an RPC task and we DID NOT
# fail due to an IPC transport error!
if (
is_rpc
and
chan.connected()
):
__tracebackhide__: bool = hide_tb
# TODO: maybe we'll want different "levels" of debugging # TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ? # eventualy such as ('app', 'supervisory', 'runtime') ?
@ -318,11 +315,19 @@ async def _errors_relayed_via_ipc(
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
) )
if not entered_debug: if not entered_debug:
# if we prolly should have entered the REPL but
# didn't, maybe there was an internal error in
# the above code and we do want to show this
# frame!
if _state.debug_mode():
__tracebackhide__: bool = False
log.exception( log.exception(
'RPC task crashed\n' 'RPC task crashed\n'
f'|_{ctx}' f'|_{ctx}'
) )
# ALWAYS try to ship RPC errors back to parent/caller task # ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc: if is_rpc:
@ -355,6 +360,20 @@ async def _errors_relayed_via_ipc(
# `Actor._service_n`, we add "handles" to each such that # `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled. # they can be individually ccancelled.
finally: finally:
# if the error is not from user code and instead a failure
# of a runtime RPC or transport failure we do prolly want to
# show this frame
if (
rpc_err
and (
not is_rpc
or
not chan.connected()
)
):
__tracebackhide__: bool = False
try: try:
ctx: Context ctx: Context
func: Callable func: Callable
@ -444,9 +463,10 @@ async def _invoke(
# open the stream with this option. # open the stream with this option.
# allow_overruns=True, # allow_overruns=True,
) )
context: bool = False context_ep_func: bool = False
assert not _state._ctxvar_Context.get() # set the current IPC ctx var for this RPC task
_state._ctxvar_Context.set(ctx)
# TODO: deprecate this style.. # TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
@ -475,7 +495,7 @@ async def _invoke(
# handle decorated ``@tractor.context`` async function # handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False): elif getattr(func, '_tractor_context_function', False):
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
context = True context_ep_func = True
# errors raised inside this block are propgated back to caller # errors raised inside this block are propgated back to caller
async with _errors_relayed_via_ipc( async with _errors_relayed_via_ipc(
@ -501,7 +521,7 @@ async def _invoke(
raise raise
# TODO: impl all these cases in terms of the `Context` one! # TODO: impl all these cases in terms of the `Context` one!
if not context: if not context_ep_func:
await _invoke_non_context( await _invoke_non_context(
actor, actor,
cancel_scope, cancel_scope,
@ -571,7 +591,6 @@ async def _invoke(
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
ctx._scope_nursery = tn ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx) task_status.started(ctx)
# TODO: should would be nice to have our # TODO: should would be nice to have our
@ -831,7 +850,7 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ). (as utilized inside `Portal.cancel_actor()` ).
''' '''
assert actor._service_n # state sanity assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
@ -844,7 +863,7 @@ async def process_messages(
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: Msg|None = None msg: MsgType|None = None
try: try:
# NOTE: this internal scope allows for keeping this # NOTE: this internal scope allows for keeping this
# message loop running despite the current task having # message loop running despite the current task having