forked from goodboy/tractor
1
0
Fork 0

Another `._rpc` mod passthrough

- tweaking logging to include more `MsgType` dumps on IPC faults.
- removing some commented cruft.
- comment formatting / cleanups / add-ons.
- more type annots.
- fill out some TODO content.
runtime_to_msgspec
Tyler Goodlet 2024-04-25 12:33:10 -04:00
parent 6b30c86eca
commit 188ff0e0e5
1 changed files with 80 additions and 83 deletions

View File

@ -181,12 +181,11 @@ async def _invoke_non_context(
# way: using the linked IPC context machinery. # way: using the linked IPC context machinery.
failed_resp: bool = False failed_resp: bool = False
try: try:
await chan.send( ack = StartAck(
StartAck( cid=cid,
cid=cid, functype='asyncfunc',
functype='asyncfunc',
)
) )
await chan.send(ack)
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
@ -194,12 +193,11 @@ async def _invoke_non_context(
) as ipc_err: ) as ipc_err:
failed_resp = True failed_resp = True
if is_rpc: if is_rpc:
raise raise ipc_err
else: else:
# TODO: should this be an `.exception()` call? log.exception(
log.warning( f'Failed to respond to runtime RPC request for\n\n'
f'Failed to respond to non-rpc request: {func}\n' f'{ack}\n'
f'{ipc_err}'
) )
with cancel_scope as cs: with cancel_scope as cs:
@ -220,20 +218,19 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
await chan.send( ret_msg = return_msg(
return_msg( cid=cid,
cid=cid, pld=result,
pld=result,
)
) )
await chan.send(ret_msg)
except ( except (
BrokenPipeError, BrokenPipeError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.warning( log.warning(
'Failed to return result:\n' 'Failed to send RPC result?\n'
f'{func}@{actor.uid}\n' f'|_{func}@{actor.uid}() -> {ret_msg}\n\n'
f'remote chan: {chan.uid}' f'x=> peer: {chan.uid}\n'
) )
@acm @acm
@ -250,7 +247,7 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
__tracebackhide__: bool = hide_tb # TODO: use hide_tb here? __tracebackhide__: bool = hide_tb
try: try:
yield # run RPC invoke body yield # run RPC invoke body
@ -262,23 +259,19 @@ async def _errors_relayed_via_ipc(
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as err:
# always hide this frame from debug REPL if the crash # NOTE: always hide this frame from debug REPL call stack
# originated from an rpc task and we DID NOT fail due to # if the crash originated from an RPC task and we DID NOT
# an IPC transport error! # fail due to an IPC transport error!
if ( if (
is_rpc is_rpc
and chan.connected() and
chan.connected()
): ):
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
# if not isinstance(err, trio.ClosedResourceError) and (
# if not is_multi_cancelled(err) and (
entered_debug: bool = False entered_debug: bool = False
if ( if (
( (
@ -310,19 +303,18 @@ async def _errors_relayed_via_ipc(
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err) entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug: if not entered_debug:
log.exception( log.exception(
'RPC task crashed\n' 'RPC task crashed\n'
f'|_{ctx}' f'|_{ctx}'
) )
# always (try to) ship RPC errors back to caller # ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc: if is_rpc:
#
# TODO: tests for this scenario: # TODO: tests for this scenario:
# - RPC caller closes connection before getting a response # - RPC caller closes connection before getting a response
# should **not** crash this actor.. # should **not** crash this actor..
await try_ship_error_to_remote( await try_ship_error_to_remote(
chan, chan,
err, err,
@ -331,33 +323,41 @@ async def _errors_relayed_via_ipc(
hide_tb=hide_tb, hide_tb=hide_tb,
) )
# error is probably from above coro running code *not from # if the ctx cs is NOT allocated, the error is likely from
# the target rpc invocation since a scope was never # above `coro` invocation machinery NOT from inside the
# allocated around the coroutine await. # `coro` itself, i.e. err is NOT a user application error.
if ctx._scope is None: if ctx._scope is None:
# we don't ever raise directly here to allow the # we don't ever raise directly here to allow the
# msg-loop-scheduler to continue running for this # msg-loop-scheduler to continue running for this
# channel. # channel.
task_status.started(err) task_status.started(err)
# always reraise KBIs so they propagate at the sys-process # always reraise KBIs so they propagate at the sys-process level.
# level.
if isinstance(err, KeyboardInterrupt): if isinstance(err, KeyboardInterrupt):
raise raise
# RPC task bookeeping.
# RPC task bookeeping # since RPC tasks are scheduled inside a flat
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally: finally:
try: try:
ctx, func, is_complete = actor._rpc_tasks.pop( ctx: Context
func: Callable
is_complete: trio.Event
(
ctx,
func,
is_complete,
) = actor._rpc_tasks.pop(
(chan, ctx.cid) (chan, ctx.cid)
) )
is_complete.set() is_complete.set()
except KeyError: except KeyError:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
if is_rpc: if is_rpc:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warning( log.warning(
'RPC task likely errored or cancelled before start?' 'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
@ -372,7 +372,7 @@ async def _errors_relayed_via_ipc(
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed") log.runtime('All RPC tasks have completed')
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
@ -414,19 +414,16 @@ async def _invoke(
# TODO: possibly a specially formatted traceback # TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)? # (not sure what typing is for this..)?
# tb = None # tb: TracebackType = None
cancel_scope = CancelScope() cancel_scope = CancelScope()
# activated cancel scope ref cs: CancelScope|None = None # ref when activated
cs: CancelScope|None = None
ctx = actor.get_context( ctx = actor.get_context(
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=NamespacePath.from_ref(func), nsf=NamespacePath.from_ref(func),
# TODO: if we wanted to get cray and support it? # NOTE: no portal passed bc this is the "child"-side
# side='callee',
# We shouldn't ever need to pass this through right? # We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to # it's up to the soon-to-be called rpc task to
@ -459,8 +456,8 @@ async def _invoke(
kwargs['stream'] = ctx kwargs['stream'] = ctx
# handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False): elif getattr(func, '_tractor_context_function', False):
# handle decorated ``@tractor.context`` async function
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
context = True context = True
@ -474,7 +471,8 @@ async def _invoke(
task_status=task_status, task_status=task_status,
): ):
if not ( if not (
inspect.isasyncgenfunction(func) or inspect.isasyncgenfunction(func)
or
inspect.iscoroutinefunction(func) inspect.iscoroutinefunction(func)
): ):
raise TypeError(f'{func} must be an async function!') raise TypeError(f'{func} must be an async function!')
@ -486,8 +484,7 @@ async def _invoke(
except TypeError: except TypeError:
raise raise
# TODO: implement all these cases in terms of the # TODO: impl all these cases in terms of the `Context` one!
# `Context` one!
if not context: if not context:
await _invoke_non_context( await _invoke_non_context(
actor, actor,
@ -503,7 +500,7 @@ async def _invoke(
return_msg, return_msg,
task_status, task_status,
) )
# below is only for `@context` funcs # XXX below fallthrough is ONLY for `@context` eps
return return
# our most general case: a remote SC-transitive, # our most general case: a remote SC-transitive,
@ -580,9 +577,6 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()` # itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error # which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser. # is not a `.cancel_acked` pleaser.
# - currently a never-should-happen-fallthrough case
# inside ._context._drain_to_final_msg()`..
# # TODO: remove this ^ right?
if ctx._scope.cancelled_caught: if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid our_uid: tuple = actor.uid
@ -598,9 +592,7 @@ async def _invoke(
if cs.cancel_called: if cs.cancel_called:
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
msg: str = ( msg: str = 'actor was cancelled by '
'actor was cancelled by '
)
# NOTE / TODO: if we end up having # NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call # ``Actor._cancel_task()`` call
@ -623,6 +615,8 @@ async def _invoke(
else: else:
msg += 'a remote peer' msg += 'a remote peer'
# TODO: move this "div centering" into
# a helper for use elsewhere!
div_chars: str = '------ - ------' div_chars: str = '------ - ------'
div_offset: int = ( div_offset: int = (
round(len(msg)/2)+1 round(len(msg)/2)+1
@ -702,11 +696,9 @@ async def _invoke(
ctx: Context = actor._contexts.pop(( ctx: Context = actor._contexts.pop((
chan.uid, chan.uid,
cid, cid,
# ctx.side,
)) ))
merr: Exception|None = ctx.maybe_error merr: Exception|None = ctx.maybe_error
( (
res_type_str, res_type_str,
res_str, res_str,
@ -720,7 +712,7 @@ async def _invoke(
) )
log.runtime( log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n' f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}\n' f'{ctx}'
) )
@ -806,13 +798,19 @@ async def process_messages(
and `Actor.cancel()` process-wide-runtime-shutdown requests and `Actor.cancel()` process-wide-runtime-shutdown requests
(as utilized inside `Portal.cancel_actor()` ). (as utilized inside `Portal.cancel_actor()` ).
''' '''
assert actor._service_n # state sanity assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
# https://github.com/python-trio/trio/issues/467 # -[ ] existing GH https://github.com/python-trio/trio/issues/467
# -[ ] for other transports (like QUIC) we can possibly just
# entirely avoid the feeder mem-chans since each msg will be
# delivered with a ctx-id already?
#
# |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
log.runtime( log.runtime(
'Entering RPC msg loop:\n' 'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
@ -850,7 +848,7 @@ async def process_messages(
| Return(cid=cid) | Return(cid=cid)
| CancelAck(cid=cid) | CancelAck(cid=cid)
# `.cid` means RPC-ctx-task specific # `.cid` indicates RPC-ctx-task scoped
| Error(cid=cid) | Error(cid=cid)
# recv-side `MsgType` decode violation # recv-side `MsgType` decode violation
@ -1046,16 +1044,16 @@ async def process_messages(
trio.Event(), trio.Event(),
) )
# runtime-scoped remote error (since no `.cid`) # runtime-scoped remote (internal) error
# (^- bc no `Error.cid` -^)
#
# NOTE: this is the non-rpc error case, that
# is, an error NOT raised inside a call to
# `_invoke()` (i.e. no cid was provided in the
# msg - see above). Raise error inline and
# mark the channel as "globally errored" for
# all downstream consuming primitives.
case Error(): case Error():
# NOTE: this is the non-rpc error case,
# that is, an error **not** raised inside
# a call to ``_invoke()`` (i.e. no cid was
# provided in the msg - see above). Push
# this error to all local channel
# consumers (normally portals) by marking
# the channel as errored
# assert chan.uid
chan._exc: Exception = unpack_error( chan._exc: Exception = unpack_error(
msg, msg,
chan=chan, chan=chan,
@ -1111,7 +1109,7 @@ async def process_messages(
f'|_{chan.raddr}\n' f'|_{chan.raddr}\n'
) )
# transport **was** disconnected # transport **WAS** disconnected
return True return True
except ( except (
@ -1150,12 +1148,11 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( log.runtime(
'Exiting IPC msg loop with\n' 'Exiting IPC msg loop with final msg\n\n'
f'peer: {chan.uid}\n' f'<= peer: {chan.uid}\n'
f'|_{chan}\n\n' f'|_{chan}\n\n'
'final msg:\n' f'{pformat(msg)}\n\n'
f'{pformat(msg)}\n'
) )
# transport **was not** disconnected # transport **WAS NOT** disconnected
return False return False