1
0
Fork 0
tractor/tractor/_rpc.py

1276 lines
45 KiB
Python
Raw Normal View History

# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Remote (task) Procedure Call (scheduling) with SC transitive semantics.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
aclosing,
)
from functools import partial
import inspect
from pprint import pformat
import traceback
from typing import (
Any,
Callable,
Coroutine,
TYPE_CHECKING,
)
import warnings
import trio
from trio import (
CancelScope,
Nursery,
TaskStatus,
)
from ._ipc import Channel
from ._context import (
Context,
)
from ._exceptions import (
ContextCancelled,
RemoteActorError,
ModuleNotExposed,
MsgTypeError,
TransportClosed,
is_multi_cancelled,
pack_error,
unpack_error,
)
from .devx import (
maybe_wait_for_debugger,
_debug,
add_div,
)
from . import _state
from .log import get_logger
from .msg import (
current_codec,
MsgCodec,
PayloadT,
NamespacePath,
# pretty_struct,
_ops as msgops,
)
2024-04-02 17:41:52 +00:00
from tractor.msg.types import (
CancelAck,
Error,
MsgType,
Return,
2024-04-02 17:41:52 +00:00
Start,
StartAck,
Started,
Stop,
Yield,
)
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger('tractor')
# ?TODO? move to a `tractor.lowlevel._rpc` with the below
# func-type-cases implemented "on top of" `@context` defs:
# -[ ] std async func helper decorated with `@rpc_func`?
# -[ ] `Portal.open_stream_from()` with async-gens?
# |_ possibly a duplex form of this with a
# `sent_from_peer = yield send_to_peer` form, which would require
# syncing the send/recv side with possibly `.receive_nowait()`
# on each `yield`?
# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with
# user only defining a single-`yield` generator-func?
async def _invoke_non_context(
actor: Actor,
cancel_scope: CancelScope,
ctx: Context,
cid: str,
chan: Channel,
func: Callable,
coro: Coroutine,
kwargs: dict[str, Any],
treat_as_gen: bool,
is_rpc: bool,
return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
):
__tracebackhide__: bool = True
cs: CancelScope|None = None # ref when activated
# ?TODO? can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
2024-04-02 17:41:52 +00:00
await chan.send(
StartAck(
cid=cid,
functype='asyncgen',
)
)
# XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be
# triggered in the underlying async gen! So we
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
async with aclosing(coro) as agen:
async for item in agen:
# TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
2024-04-02 17:41:52 +00:00
await chan.send(
Yield(
cid=cid,
pld=item,
)
)
log.runtime(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
2024-04-02 17:41:52 +00:00
await chan.send(
Stop(cid=cid)
)
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen:
2024-04-02 17:41:52 +00:00
await chan.send(
StartAck(
cid=cid,
functype='asyncgen',
)
)
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
2024-04-02 17:41:52 +00:00
await chan.send(
Stop(cid=cid)
)
# simplest function/method request-response pattern
# XXX: in the most minimally used case, just a scheduled internal runtime
# call to `Actor._cancel_task()` from the ctx-peer task since we
# don't (yet) have a dedicated IPC msg.
# ------ - ------
else:
failed_resp: bool = False
try:
ack = StartAck(
cid=cid,
functype='asyncfunc',
2024-04-02 17:41:52 +00:00
)
await chan.send(ack)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
failed_resp = True
if is_rpc:
raise ipc_err
else:
log.exception(
f'Failed to ack runtime RPC request\n\n'
f'{func} x=> {ctx.chan}\n\n'
f'{ack}\n'
)
with cancel_scope as cs:
ctx._scope: CancelScope = cs
task_status.started(ctx)
result = await coro
fname: str = func.__name__
log.runtime(
'RPC complete:\n'
f'task: {ctx._task}\n'
f'|_cid={ctx.cid}\n'
f'|_{fname}() -> {pformat(result)}\n'
)
# NOTE: only send result if we know IPC isn't down
if (
not failed_resp
and chan.connected()
):
try:
ret_msg = return_msg_type(
cid=cid,
pld=result,
2024-04-02 17:41:52 +00:00
)
await chan.send(ret_msg)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to send RPC result?\n'
f'|_{func}@{actor.uid}() -> {ret_msg}\n\n'
f'x=> peer: {chan.uid}\n'
)
@acm
async def _errors_relayed_via_ipc(
actor: Actor,
chan: Channel,
ctx: Context,
is_rpc: bool,
hide_tb: bool = False,
debug_kbis: bool = False,
task_status: TaskStatus[
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
) -> None:
# NOTE: we normally always hide this frame in call-stack tracebacks
# if the crash originated from an RPC task (since normally the
# user is only going to care about their own code not this
# internal runtime frame) and we DID NOT
# fail due to an IPC transport error!
__tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()`
rpc_err: BaseException|None = None
try:
yield # run RPC invoke body
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
except (
Exception,
BaseExceptionGroup,
KeyboardInterrupt,
) as err:
rpc_err = err
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
#
# -[ ] this if check is duplicate with `._maybe_enter_pm()`..
if not is_multi_cancelled(err):
entered_debug: bool = False
if (
(
not isinstance(err, ContextCancelled)
or (
isinstance(err, ContextCancelled)
and ctx._cancel_called
# if the root blocks the debugger lock request from a child
# we will get a remote-cancelled condition.
and ctx._enter_debugger_on_cancel
)
)
and
(
not isinstance(err, KeyboardInterrupt)
or (
isinstance(err, KeyboardInterrupt)
and debug_kbis
)
)
):
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this
# type of failure will be useful for respawns or
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
entered_debug = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if not entered_debug:
# if we prolly should have entered the REPL but
# didn't, maybe there was an internal error in
# the above code and we do want to show this
# frame!
if _state.debug_mode():
__tracebackhide__: bool = False
First try "relayed boxed errors", or "inceptions" Since adding more complex inter-peer (actor) testing scenarios, we definitely have an immediate need for `trio`'s style of "inceptions" but for nesting `RemoteActorError`s as they're relayed through multiple actor-IPC hops. So for example, a remote error relayed "through" some proxy actor to another ends up packing a `RemoteActorError` into another one such that there are 2 layers of RAEs with the first containing/boxing an original src actor error (type). In support of this extension to `RemoteActorError` we add: - `get_err_type()` error type resolver helper (factored fromthe body of `unpack_error()`) to be used whenever rendering `.src_type`/`.boxed_type`. - `.src_type_str: str` which is pulled from `.msgdata` and holds the above (eventually when unpacked) type as `str`. - `._src_type: BaseException|None` for the original "source" actor's error as unpacked in any remote (actor's) env and exposed as a readonly property `.src_type`. - `.boxed_type_str: str` the same as above but for the "last" boxed error's type; when the RAE is unpacked at its first hop this will be **the same as** `.src_type_str`. - `._boxed_type: BaseException` which now similarly should be "rendered" from the below type-`str` field instead of passed in as a error-type via `boxed_type` (though we still do for the ctxc case atm, see notes). |_ new sanity checks in `.__init__()` mostly as a reminder to handle that ^ ctxc case ^ more elegantly at some point.. |_ obvi we discard the previous `suberror_type` input arg. - fully remove the `.type`/`.type_str` properties instead expecting usage of `.boxed_/.src_` equivalents. - start deprecation of `.src_actor_uid` and make it delegate to new `.src_uid` - add `.relay_uid` propery for the last relay/hop's actor uid. - add `.relay_path: list[str]` which holds the per-hop updated sequence of relay actor uid's which consecutively did boxing of an RAE. - only include `.src_uid` and `.relay_path` in reprol() output. - factor field-to-str rendering into a new `_mk_fields_str()` and use it in `.__repr__()`/`.reprol()`. - add an `.unwrap()` to (attempt to) render the src error. - rework `pack_error()` to handle inceptions including, - packing the correct field-values for the new `boxed_type_str`, `relay_uid`, `src_uid`, `src_type_str`. - always updating the `relay_path` sequence with the uid of the current actor. - adjust `unpack_error()` to match all these changes, - pulling `boxed_type_str` and passing any resolved `boxed_type` to `RemoteActorError.__init__()`. - use the new `Context.maybe_raise()` convenience method. Adjust `._rpc` packing to `ContextCancelled(boxed_type=trio.Cancelled)` and tweak some more log msg formats.
2024-03-18 14:21:37 +00:00
log.exception(
'RPC task crashed\n'
f'|_{ctx}'
)
# ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc:
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
await try_ship_error_to_remote(
chan,
err,
cid=ctx.cid,
remote_descr='caller',
hide_tb=hide_tb,
)
# if the ctx cs is NOT allocated, the error is likely from
# above `coro` invocation machinery NOT from inside the
# `coro` itself, i.e. err is NOT a user application error.
if ctx._scope is None:
# we don't ever raise directly here to allow the
# msg-loop-scheduler to continue running for this
# channel.
task_status.started(err)
# always propagate KBIs at the sys-process level.
if (
isinstance(err, KeyboardInterrupt)
# ?TODO? except when running in asyncio mode?
# |_ wut if you want to open a `@context` FROM an
# infected_aio task?
# and not actor.is_infected_aio()
):
raise
# RPC task bookeeping.
# since RPC tasks are scheduled inside a flat
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally:
# if the error is not from user code and instead a failure
# of a runtime RPC or transport failure we do prolly want to
# show this frame
if (
rpc_err
and (
not is_rpc
or
not chan.connected()
)
):
__tracebackhide__: bool = False
try:
ctx: Context
func: Callable
is_complete: trio.Event
(
ctx,
func,
is_complete,
) = actor._rpc_tasks.pop(
(chan, ctx.cid)
)
is_complete.set()
except KeyError:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
if is_rpc:
log.warning(
'RPC task likely errored or cancelled before start?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
# TODO: remove this right? rn the only non-`is_rpc` cases
# are cancellation methods and according the RPC loop eps
# for thoses below, nothing is ever registered in
# `Actor._rpc_tasks` for those cases.. but should we?
#
# -[ ] maybe we should have an equiv `Actor._runtime_rpc_tasks`?
# else:
# log.cancel(
# 'Failed to de-alloc internal runtime cancel task?\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# )
finally:
if not actor._rpc_tasks:
log.runtime('All RPC tasks have completed')
actor._ongoing_rpc_tasks.set()
async def _invoke(
actor: Actor,
cid: str,
chan: Channel,
func: Callable,
kwargs: dict[str, Any],
is_rpc: bool = True,
hide_tb: bool = True,
return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
):
'''
Schedule a `trio` task-as-func and deliver result(s) over
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`.
'''
__tracebackhide__: bool = hide_tb
treat_as_gen: bool = False
if (
_state.debug_mode()
and
_state._runtime_vars['use_greenback']
):
Refine and test `tractor.pause_from_sync()` Now supports use from any `trio` task, any sync thread started with `trio.to_thread.run_sync()` AND also via `breakpoint()` builtin API! The only bit missing now is support for `asyncio` tasks when in infected mode.. Bo `greenback` setup/API adjustments: - move `._rpc.maybe_import_gb()` to -> `devx._debug` and factor out the cached import checking into a sync func whilst placing the async `.ensure_portal()` bootstrapping into a new async `maybe_init_greenback()`. - use the new init-er func inside `open_root_actor()` with the output predicating whether we override the `breakpoint()` hook. core `devx._debug` implementation deatz: - make `mk_mpdb()` only return the `pdp.Pdb` subtype instance since the sigint unshielding func is now accessible from the `Lock` singleton from anywhere. - add non-main thread support (at least for `trio.to_thread` use cases) to our `Lock` with a new `.is_trio_thread()` predicate that delegates directly to `trio`'s internal version. - do `Lock.is_trio_thread()` checks inside any methods which require special provisions when invoked from a non-main `trio` thread: - `.[un]shield_sigint()` methods since `signal.signal` usage is only allowed from cpython's main thread. - `.release()` since `trio.StrictFIFOLock` can only be called from a `trio` task. - rework `.pause_from_sync()` itself to directly call `._set_trace()` and don't bother with `greenback._await()` when we're already calling it from a `.to_thread.run_sync()` thread, oh and try to use the thread/task name when setting `Lock.local_task_in_debug`. - make it an RTE for now if you try to use `.pause_from_sync()` from any infected-`asyncio` task, but support is (hopefully) coming soon! For testing we add a new `test_debugger.py::test_pause_from_sync()` which includes a ctrl-c parametrization around the `examples/debugging/sync_bp.py` script which includes all currently supported/working usages: - `tractor.pause_from_sync()`. - via `breakpoint()` overload. - from a `trio.to_thread.run_sync()` spawn.
2024-03-22 20:41:49 +00:00
# XXX for .pause_from_sync()` usage we need to make sure
# `greenback` is boostrapped in the subactor!
await _debug.maybe_init_greenback()
# TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)?
# tb: TracebackType = None
cancel_scope = CancelScope()
ctx = actor.get_context(
chan=chan,
cid=cid,
nsf=NamespacePath.from_ref(func),
# NOTE: no portal passed bc this is the "child"-side
# We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to
# open the stream with this option.
# allow_overruns=True,
)
context_ep_func: bool = False
# set the current IPC ctx var for this RPC task
_state._ctxvar_Context.set(ctx)
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
params = sig.parameters
# compat with old api
kwargs['ctx'] = ctx
treat_as_gen = True
if 'ctx' in params:
warnings.warn(
"`@tractor.stream decorated funcs should now declare "
"a `stream` arg, `ctx` is now designated for use with "
"@tractor.context",
DeprecationWarning,
stacklevel=2,
)
elif 'stream' in params:
assert 'stream' in params
kwargs['stream'] = ctx
# handle decorated ``@tractor.context`` async function
# - pull out any typed-pld-spec info and apply (below)
# - (TODO) store func-ref meta data for API-frame-info logging
elif (
ctx_meta := getattr(
func,
'_tractor_context_meta',
False,
)
):
# kwargs['ctx'] = ctx
# set the required `tractor.Context` typed input argument to
# the allocated RPC task context.
kwargs[ctx_meta['ctx_var_name']] = ctx
context_ep_func = True
# errors raised inside this block are propgated back to caller
async with _errors_relayed_via_ipc(
actor,
chan,
ctx,
is_rpc,
hide_tb=hide_tb,
task_status=task_status,
):
if not (
inspect.isasyncgenfunction(func)
or
inspect.iscoroutinefunction(func)
):
raise TypeError(f'{func} must be an async function!')
# init coroutine with `kwargs` to immediately catch any
# type-sig errors.
try:
coro = func(**kwargs)
except TypeError:
raise
# TODO: impl all these cases in terms of the `Context` one!
if not context_ep_func:
await _invoke_non_context(
actor,
cancel_scope,
ctx,
cid,
chan,
func,
coro,
kwargs,
treat_as_gen,
is_rpc,
return_msg_type,
task_status,
)
# XXX below fallthrough is ONLY for `@context` eps
return
# our most general case: a remote SC-transitive,
# IPC-linked, cross-actor-task "context"
# ------ - ------
# TODO: every other "func type" should be implemented from
# a special case of this impl eventually!
# -[ ] streaming funcs should instead of being async-for
# handled directly here wrapped in
# a async-with-open_stream() closure that does the
# normal thing you'd expect a far end streaming context
# to (if written by the app-dev).
# -[ ] one off async funcs can literally just be called
# here and awaited directly, possibly just with a small
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
# ------ - ------
# a "context" endpoint is the most general and
# "least sugary" type of RPC with support for
# bi-dir streaming B)
#
# the concurrency relation is simlar to a task nursery
# wherein a "parent" task (the one that enters
# `trio.open_nursery()` in some actor "opens" (via
# `Portal.open_context()`) an IPC ctx to another peer
# (which is maybe a sub-) actor who then schedules (aka
# `trio.Nursery.start()`s) a new "child" task to execute
# the `@context` annotated func; that is this func we're
# running directly below!
# ------ - ------
#
# StartAck: respond immediately with endpoint info
2024-04-02 17:41:52 +00:00
await chan.send(
StartAck(
cid=cid,
functype='context',
)
)
# TODO: should we also use an `.open_context()` equiv
# for this child side by factoring the impl from
# `Portal.open_context()` into a common helper?
#
# NOTE: there are many different ctx state details
# in a child side instance according to current impl:
# - `.cancelled_caught` can never be `True`.
# -> the below scope is never exposed to the
# `@context` marked RPC function.
# - `._portal` is never set.
try:
tn: trio.Nursery
rpc_ctx_cs: CancelScope
async with (
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=ctx_meta.get('pld_spec'),
dec_hook=ctx_meta.get('dec_hook'),
),
):
ctx._scope_nursery = tn
rpc_ctx_cs = ctx._scope = tn.cancel_scope
task_status.started(ctx)
# TODO: better `trionics` tooling:
# -[ ] should would be nice to have our `TaskMngr`
# nursery here!
# -[ ] payload value checking like we do with
# `.started()` such that the debbuger can engage
# here in the child task instead of waiting for the
# parent to crash with it's own MTE..
res: Any|PayloadT = await coro
return_msg: Return|CancelAck = return_msg_type(
cid=cid,
pld=res,
2024-04-02 17:41:52 +00:00
)
# set and shuttle final result to "parent"-side task.
ctx._result = res
await chan.send(return_msg)
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
# - *this* child task manually calling `ctx.cancel()`.
# - the runtime calling `ctx._deliver_msg()` which
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser.
if rpc_ctx_cs.cancelled_caught:
our_uid: tuple = actor.uid
# first check for and raise any remote error
# before raising any context cancelled case
# so that real remote errors don't get masked as
# ``ContextCancelled``s.
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
if rpc_ctx_cs.cancel_called:
canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by '
# NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call
# ``Context.cancel()`` directly, we're going to
# need to change this logic branch since it
# will always enter..
if ctx._cancel_called:
# TODO: test for this!!!!!
canceller: tuple = our_uid
explain += 'itself '
# if the channel which spawned the ctx is the
# one that cancelled it then we report that, vs.
# it being some other random actor that for ex.
# some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx.
#
# TODO: determine if the ctx peer task was the
# exact task which cancelled, vs. some other
# task in the same actor.
elif canceller == ctx.chan.uid:
explain += f'its {ctx.peer_side!r}-side peer'
elif canceller == our_uid:
explain += 'itself'
elif canceller:
explain += 'a remote peer'
else:
explain += 'an unknown cause?'
explain += (
add_div(message=explain)
+
f'<= canceller: {canceller}\n'
f'=> cancellee: {our_uid}\n'
# TODO: better repr for ctx tasks..
f' |_{ctx.side!r} {ctx._task}'
# TODO: instead just show the
# ctx.__str__() here?
# -[ ] textwrap.indent() it correctly!
# -[ ] BUT we need to wait until
# the state is filled out before emitting
# this msg right ow its kinda empty? bleh..
#
# f' |_{ctx}'
)
# task-contex was either cancelled by request
# using ``Portal.cancel_actor()`` or
# ``Context.cancel()`` on the far end, or it
# was cancelled by the local child (or callee)
# task, so relay this cancel signal to the
# other side.
ctxc = ContextCancelled(
message=explain,
First try "relayed boxed errors", or "inceptions" Since adding more complex inter-peer (actor) testing scenarios, we definitely have an immediate need for `trio`'s style of "inceptions" but for nesting `RemoteActorError`s as they're relayed through multiple actor-IPC hops. So for example, a remote error relayed "through" some proxy actor to another ends up packing a `RemoteActorError` into another one such that there are 2 layers of RAEs with the first containing/boxing an original src actor error (type). In support of this extension to `RemoteActorError` we add: - `get_err_type()` error type resolver helper (factored fromthe body of `unpack_error()`) to be used whenever rendering `.src_type`/`.boxed_type`. - `.src_type_str: str` which is pulled from `.msgdata` and holds the above (eventually when unpacked) type as `str`. - `._src_type: BaseException|None` for the original "source" actor's error as unpacked in any remote (actor's) env and exposed as a readonly property `.src_type`. - `.boxed_type_str: str` the same as above but for the "last" boxed error's type; when the RAE is unpacked at its first hop this will be **the same as** `.src_type_str`. - `._boxed_type: BaseException` which now similarly should be "rendered" from the below type-`str` field instead of passed in as a error-type via `boxed_type` (though we still do for the ctxc case atm, see notes). |_ new sanity checks in `.__init__()` mostly as a reminder to handle that ^ ctxc case ^ more elegantly at some point.. |_ obvi we discard the previous `suberror_type` input arg. - fully remove the `.type`/`.type_str` properties instead expecting usage of `.boxed_/.src_` equivalents. - start deprecation of `.src_actor_uid` and make it delegate to new `.src_uid` - add `.relay_uid` propery for the last relay/hop's actor uid. - add `.relay_path: list[str]` which holds the per-hop updated sequence of relay actor uid's which consecutively did boxing of an RAE. - only include `.src_uid` and `.relay_path` in reprol() output. - factor field-to-str rendering into a new `_mk_fields_str()` and use it in `.__repr__()`/`.reprol()`. - add an `.unwrap()` to (attempt to) render the src error. - rework `pack_error()` to handle inceptions including, - packing the correct field-values for the new `boxed_type_str`, `relay_uid`, `src_uid`, `src_type_str`. - always updating the `relay_path` sequence with the uid of the current actor. - adjust `unpack_error()` to match all these changes, - pulling `boxed_type_str` and passing any resolved `boxed_type` to `RemoteActorError.__init__()`. - use the new `Context.maybe_raise()` convenience method. Adjust `._rpc` packing to `ContextCancelled(boxed_type=trio.Cancelled)` and tweak some more log msg formats.
2024-03-18 14:21:37 +00:00
boxed_type=trio.Cancelled,
canceller=canceller,
)
raise ctxc
# XXX: do we ever trigger this block any more?
except (
BaseExceptionGroup,
trio.Cancelled,
BaseException,
) as scope_error:
if (
isinstance(scope_error, RuntimeError)
and scope_error.args
and 'Cancel scope stack corrupted' in scope_error.args[0]
):
log.exception('Cancel scope stack corrupted!?\n')
# _debug.mk_pdb().set_trace()
# always set this (child) side's exception as the
# local error on the context
ctx._local_error: BaseException = scope_error
# ^-TODO-^ question,
# does this matter other then for
# consistentcy/testing?
# |_ no user code should be in this scope at this point
# AND we already set this in the block below?
# if a remote error was set then likely the
# exception group was raised due to that, so
# and we instead raise that error immediately!
ctx.maybe_raise()
# maybe TODO: pack in come kinda
# `trio.Cancelled.__traceback__` here so they can be
# unwrapped and displayed on the caller side? no se..
raise
# `@context` entrypoint task bookeeping.
# i.e. only pop the context tracking if used ;)
finally:
assert chan.uid
# don't pop the local context until we know the
# associated child isn't in debug any more
await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((
chan.uid,
cid,
))
logmeth: Callable = log.runtime
merr: Exception|None = ctx.maybe_error
message: str = 'IPC context terminated '
descr_str: str = (
f'after having {ctx.repr_state!r}\n'
)
if merr:
logmeth: Callable = log.error
if isinstance(merr, ContextCancelled):
logmeth: Callable = log.runtime
if not isinstance(merr, RemoteActorError):
tb_str: str = ''.join(traceback.format_exception(merr))
descr_str += (
f'\n{merr!r}\n' # needed?
f'{tb_str}\n'
)
else:
descr_str += f'\n{merr!r}\n'
else:
descr_str += f'\nand final result {ctx.outcome!r}\n'
logmeth(
message
+
descr_str
)
async def try_ship_error_to_remote(
channel: Channel,
err: Exception|BaseExceptionGroup,
cid: str|None = None,
remote_descr: str = 'parent',
hide_tb: bool = True,
) -> None:
'''
Box, pack and encode a local runtime(-internal) exception for
an IPC channel `.send()` with transport/network failures and
local cancellation ignored but logged as critical(ly bad).
'''
__tracebackhide__: bool = hide_tb
with CancelScope(shield=True):
try:
# NOTE: normally only used for internal runtime errors
# so ship to peer actor without a cid.
2024-04-02 17:41:52 +00:00
# msg: dict = pack_error(
msg: Error = pack_error(
err,
cid=cid,
# TODO: special tb fmting for ctxc cases?
# tb=tb,
)
await channel.send(msg)
# XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
):
log.critical(
'IPC transport failure -> '
f'failed to ship error to {remote_descr}!\n\n'
f'X=> {channel.uid}\n\n'
# TODO: use `.msg.preetty_struct` for this!
2024-04-02 17:41:52 +00:00
f'{msg}\n'
)
except BaseException:
log.exception(
'Errored while attempting error shipment?'
)
__tracebackhide__: bool = False
raise
async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
) -> (
bool, # chan diconnected
MsgType, # last msg
):
'''
This is the low-level, per-IPC-channel, RPC task scheduler loop.
Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_n: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`)
are delivered to locally running, linked-via-`Context`, tasks
with any (boxed) errors and/or final results shipped back to
the remote side.
All higher level inter-actor comms ops are delivered in some
form by the msg processing here, including:
- lookup and invocation of any (async) funcs-as-tasks requested
by remote actors presuming the local actor has enabled their
containing module.
- IPC-session oriented `Context` and `MsgStream` msg payload
delivery such as `started`, `yield` and `return` msgs.
- cancellation handling for both `Context.cancel()` (which
translate to `Actor._cancel_task()` RPCs server side)
and `Actor.cancel()` process-wide-runtime-shutdown requests
(as utilized inside `Portal.cancel_actor()` ).
'''
assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
# -[ ] existing GH https://github.com/python-trio/trio/issues/467
# -[ ] for other transports (like QUIC) we can possibly just
# entirely avoid the feeder mem-chans since each msg will be
# delivered with a ctx-id already?
#
# |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
nursery_cancelled_before_task: bool = False
msg: MsgType|None = None
try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having
# been cancelled (eg. `open_portal()` may call this method
# from a locally spawned task) and recieve this scope
# using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs:
task_status.started(loop_cs)
async for msg in chan:
log.transport( # type: ignore
f'IPC msg from peer\n'
f'<= {chan.uid}\n\n'
# TODO: use of the pprinting of structs is
# FRAGILE and should prolly not be
#
# avoid fmting depending on loglevel for perf?
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this?
# -[ ] use `.msg.pretty_struct` here now instead!
# f'{pretty_struct.pformat(msg)}\n'
f'{msg}\n'
)
2024-04-02 17:41:52 +00:00
match msg:
# msg for an ongoing IPC ctx session, deliver msg to
# local task.
2024-04-02 17:41:52 +00:00
case (
StartAck(cid=cid)
| Started(cid=cid)
| Yield(cid=cid)
| Stop(cid=cid)
| Return(cid=cid)
| CancelAck(cid=cid)
# `.cid` indicates RPC-ctx-task scoped
| Error(cid=cid)
# recv-side `MsgType` decode violation
| MsgTypeError(cid=cid)
2024-04-02 17:41:52 +00:00
):
# deliver response to local caller/waiter
# via its per-remote-context memory channel.
await actor._deliver_ctx_payload(
2024-04-02 17:41:52 +00:00
chan,
cid,
msg,
)
# `Actor`(-internal) runtime cancel requests
case Start(
ns='self',
func='cancel',
cid=cid,
kwargs=kwargs,
):
kwargs |= {'req_chan': chan}
# XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is
# currently in debug mode!
pdb_complete: trio.Event|None = _debug.DebugStatus.repl_release
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
await _invoke(
actor,
cid,
chan,
actor.cancel,
kwargs,
is_rpc=False,
return_msg_type=CancelAck,
)
log.runtime(
'Cancelling IPC transport msg-loop with peer:\n'
f'|_{chan}\n'
)
loop_cs.cancel()
break
case Start(
ns='self',
func='_cancel_task',
cid=cid,
kwargs=kwargs,
):
target_cid: str = kwargs['cid']
kwargs |= {
'requesting_uid': chan.uid,
'ipc_msg': msg,
# XXX NOTE! ONLY the rpc-task-owning
# parent IPC channel should be able to
# cancel it!
'parent_chan': chan,
}
try:
await _invoke(
actor,
cid,
chan,
actor._cancel_task,
kwargs,
is_rpc=False,
return_msg_type=CancelAck,
)
except BaseException:
log.exception(
'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n'
f' |_{chan}\n\n'
f'=> {actor}\n'
f' |_cid: {target_cid}\n'
)
2024-04-02 17:41:52 +00:00
# the "MAIN" RPC endpoint to schedule-a-`trio.Task`
# ------ - ------
# -[x] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
2024-04-02 17:41:52 +00:00
case Start(
cid=cid,
ns=ns,
func=funcname,
Drop `None`-sentinel cancels RPC loop mechanism Pretty sure we haven't *needed it* for a while, it was always generally hazardous in terms of IPC msg types, AND it's definitely incompatible with a dynamically applied typed msg spec: you can't just expect a `None` to be willy nilly handled all the time XD For now I'm masking out all the code and leaving very detailed surrounding notes but am not removing it quite yet in case for strange reason it is needed by some edge case (though I haven't found according to the test suite). Backstory: ------ - ------ Originally (i'm pretty sure anyway) it was added as a super naive "remote cancellation" mechanism (back before there were specific `Actor` methods for such things) that was mostly (only?) used before IPC `Channel` closures to "more gracefully cancel" the connection's parented RPC tasks. Since we now have explicit runtime-RPC endpoints for conducting remote cancellation of both tasks and full actors, it should really be removed anyway, because: - a `None`-msg setinel is inconsistent with other RPC endpoint handling input patterns which (even prior to typed msging) had specific msg-value triggers. - the IPC endpoint's (block) implementation should use `Actor.cancel_rpc_tasks(parent_chan=chan)` instead of a manual loop through a `Actor._rpc_tasks.copy()`.. Deats: - mask the `Channel.send(None)` calls from both the `Actor._stream_handler()` tail as well as from the `._portal.open_portal()` was connected block. - mask the msg loop endpoint block and toss in lotsa notes. Unrelated tweaks: - drop `Actor._debug_mode`; unused. - make `Actor.cancel_server()` return a `bool`. - use `.msg.pretty_struct.Struct.pformat()` to show any msg that is ignored (bc invalid) in `._push_result()`.
2024-04-05 23:07:12 +00:00
kwargs=kwargs, # type-spec this? see `msg.types`
2024-04-02 17:41:52 +00:00
uid=actorid,
):
start_status: str = (
'Handling RPC `Start` request\n'
f'<= peer: {actorid}\n\n'
f' |_{chan}\n'
f' |_cid: {cid}\n\n'
# f' |_{ns}.{funcname}({kwargs})\n'
f'>> {actor.uid}\n'
f' |_{actor}\n'
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
# f' |_{ns}.{funcname}({kwargs})\n\n'
# f'{pretty_struct.pformat(msg)}\n'
2024-04-02 17:41:52 +00:00
)
# runtime-internal endpoint: `Actor.<funcname>`
# only registry methods exist now yah,
# like ``.register_actor()`` etc. ?
2024-04-02 17:41:52 +00:00
if ns == 'self':
func: Callable = getattr(actor, funcname)
2024-04-02 17:41:52 +00:00
# application RPC endpoint
2024-04-02 17:41:52 +00:00
else:
try:
func: Callable = actor._get_rpc_func(
ns,
funcname,
)
2024-04-02 17:41:52 +00:00
except (
ModuleNotExposed,
AttributeError,
) as err:
# always complain to requester
# client about un-enabled modules
2024-04-02 17:41:52 +00:00
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
await chan.send(err_msg)
continue
start_status += (
f' -> func: {func}\n'
)
2024-04-02 17:41:52 +00:00
# schedule a task for the requested RPC function
# in the actor's main "service nursery".
#
2024-04-02 17:41:52 +00:00
# TODO: possibly a service-tn per IPC channel for
# supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks`
# table?
start_status += ' -> scheduling new task..\n'
log.runtime(start_status)
try:
2024-04-02 17:41:52 +00:00
ctx: Context = await actor._service_n.start(
partial(
_invoke,
actor,
cid,
chan,
func,
kwargs,
),
name=funcname,
)
2024-04-02 17:41:52 +00:00
except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task: bool = True
break
# in the lone case where a ``Context`` is not
# delivered, it's likely going to be a locally
# scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception):
log.warning(
start_status
+
' -> task for RPC failed?\n\n'
2024-04-02 17:41:52 +00:00
f'{err}'
)
2024-04-02 17:41:52 +00:00
continue
2024-04-02 17:41:52 +00:00
else:
# mark our global state with ongoing rpc tasks
2024-04-02 17:41:52 +00:00
actor._ongoing_rpc_tasks = trio.Event()
2024-04-02 17:41:52 +00:00
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
actor._rpc_tasks[(chan, cid)] = (
ctx,
func,
trio.Event(),
)
# runtime-scoped remote (internal) error
# (^- bc no `Error.cid` -^)
#
# NOTE: this is the non-rpc error case, that
# is, an error NOT raised inside a call to
# `_invoke()` (i.e. no cid was provided in the
# msg - see above). Raise error inline and
# mark the channel as "globally errored" for
# all downstream consuming primitives.
case Error():
chan._exc: Exception = unpack_error(
2024-04-02 17:41:52 +00:00
msg,
chan=chan,
)
raise chan._exc
# unknown/invalid msg type?
case _:
codec: MsgCodec = current_codec()
message: str = (
f'Unhandled IPC msg for codec?\n\n'
f'|_{codec}\n\n'
f'{msg}\n'
)
log.exception(message)
raise RuntimeError(message)
log.transport(
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
# END-OF `async for`:
# IPC disconnected via `trio.EndOfChannel`, likely
# due to a (graceful) `Channel.aclose()`.
log.runtime(
f'channel for {chan.uid} disconnected, cancelling RPC tasks\n'
f'|_{chan}\n'
)
await actor.cancel_rpc_tasks(
req_uid=actor.uid,
# a "self cancel" in terms of the lifetime of the
# IPC connection which is presumed to be the
# source of any requests for spawned tasks.
parent_chan=chan,
)
More formal `TransportClosed` reporting/raising Since it was all ad-hoc defined inside `._ipc.MsgpackTCPStream._iter_pkts()` more or less, this starts formalizing a way for particular transport backends to indicate whether a disconnect condition should be re-raised in the RPC msg loop and if not what log level to report it at (if any). Based on our lone transport currently we try to suppress any logging noise from ephemeral connections expected during normal actor interaction and discovery subsys ops: - any short lived discovery related TCP connects are only logged as `.transport()` level. - both `.error()` and raise on any underlying `trio.ClosedResource` cause since that normally means some task touched transport layer internals that it shouldn't have. - do a `.warning()` on anything else unexpected. Impl deats: - extend the `._exceptions.TransportClosed` to accept an input log level, raise-on-report toggle and custom reporting & raising via a new `.report_n_maybe_raise()` method. - construct the TCs with inputs per case in (the newly named) `._iter_pkts(). - call ^ this method from the `TransportClosed` handler block inside the RPC msg loop thus delegating reporting levels and/or raising to the backend's per-case TC instantiating. Related `._ipc` changes: - mask out all the `MsgpackTCPStream._codec` debug helper stuff and drop any lingering cruft from the initial proto-ing of msg-codecs. - rename some attrs/methods: |_`MsgpackTCPStream._iter_packets()` -> `._iter_pkts()` and `._agen` -> `_aiter_pkts`. |_`Channel._aiter_recv()` -> `._aiter_msgs()` and `._agen` -> `_aiter_msgs`. - add `hide_tb: bool` support to `Channel.send()` and only show the frame on non-MTEs.
2024-07-02 16:21:26 +00:00
except TransportClosed as tc:
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up..
#
# TODO: maybe add a teardown handshake? and,
More formal `TransportClosed` reporting/raising Since it was all ad-hoc defined inside `._ipc.MsgpackTCPStream._iter_pkts()` more or less, this starts formalizing a way for particular transport backends to indicate whether a disconnect condition should be re-raised in the RPC msg loop and if not what log level to report it at (if any). Based on our lone transport currently we try to suppress any logging noise from ephemeral connections expected during normal actor interaction and discovery subsys ops: - any short lived discovery related TCP connects are only logged as `.transport()` level. - both `.error()` and raise on any underlying `trio.ClosedResource` cause since that normally means some task touched transport layer internals that it shouldn't have. - do a `.warning()` on anything else unexpected. Impl deats: - extend the `._exceptions.TransportClosed` to accept an input log level, raise-on-report toggle and custom reporting & raising via a new `.report_n_maybe_raise()` method. - construct the TCs with inputs per case in (the newly named) `._iter_pkts(). - call ^ this method from the `TransportClosed` handler block inside the RPC msg loop thus delegating reporting levels and/or raising to the backend's per-case TC instantiating. Related `._ipc` changes: - mask out all the `MsgpackTCPStream._codec` debug helper stuff and drop any lingering cruft from the initial proto-ing of msg-codecs. - rename some attrs/methods: |_`MsgpackTCPStream._iter_packets()` -> `._iter_pkts()` and `._agen` -> `_aiter_pkts`. |_`Channel._aiter_recv()` -> `._aiter_msgs()` and `._agen` -> `_aiter_msgs`. - add `hide_tb: bool` support to `Channel.send()` and only show the frame on non-MTEs.
2024-07-02 16:21:26 +00:00
# -[x] don't show this msg if it's an ephemeral discovery ep call?
# |_ see the below `.report_n_maybe_raise()` impl as well as
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
# for different read-failure cases.
# -[ ] figure out how this will break with other transports?
More formal `TransportClosed` reporting/raising Since it was all ad-hoc defined inside `._ipc.MsgpackTCPStream._iter_pkts()` more or less, this starts formalizing a way for particular transport backends to indicate whether a disconnect condition should be re-raised in the RPC msg loop and if not what log level to report it at (if any). Based on our lone transport currently we try to suppress any logging noise from ephemeral connections expected during normal actor interaction and discovery subsys ops: - any short lived discovery related TCP connects are only logged as `.transport()` level. - both `.error()` and raise on any underlying `trio.ClosedResource` cause since that normally means some task touched transport layer internals that it shouldn't have. - do a `.warning()` on anything else unexpected. Impl deats: - extend the `._exceptions.TransportClosed` to accept an input log level, raise-on-report toggle and custom reporting & raising via a new `.report_n_maybe_raise()` method. - construct the TCs with inputs per case in (the newly named) `._iter_pkts(). - call ^ this method from the `TransportClosed` handler block inside the RPC msg loop thus delegating reporting levels and/or raising to the backend's per-case TC instantiating. Related `._ipc` changes: - mask out all the `MsgpackTCPStream._codec` debug helper stuff and drop any lingering cruft from the initial proto-ing of msg-codecs. - rename some attrs/methods: |_`MsgpackTCPStream._iter_packets()` -> `._iter_pkts()` and `._agen` -> `_aiter_pkts`. |_`Channel._aiter_recv()` -> `._aiter_msgs()` and `._agen` -> `_aiter_msgs`. - add `hide_tb: bool` support to `Channel.send()` and only show the frame on non-MTEs.
2024-07-02 16:21:26 +00:00
tc.report_n_maybe_raise(
message=(
f'peer IPC channel closed abruptly?\n\n'
f'<=x {chan}\n'
f' |_{chan.raddr}\n\n'
)
+
tc.message
)
# transport **WAS** disconnected
return (True, msg)
except (
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task:
sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)
else:
# ship any "internal" exception (i.e. one from internal
# machinery not from an rpc task) to parent
match err:
case ContextCancelled():
log.cancel(
f'Actor: {actor.uid} was context-cancelled with,\n'
f'str(err)'
)
case _:
log.exception("Actor errored:")
if actor._parent_chan:
await try_ship_error_to_remote(
actor._parent_chan,
err,
)
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
raise
finally:
# msg debugging for when he machinery is brokey
if msg is None:
message: str = 'Exiting IPC msg loop without receiving a msg?'
else:
message: str = (
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
# f'{pretty_struct.pformat(msg)}'
)
log.runtime(message)
# transport **WAS NOT** disconnected
return (False, msg)