1
0
Fork 0
tractor/tractor/_rpc.py

1134 lines
39 KiB
Python
Raw Normal View History

# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Remote (task) Procedure Call (scheduling) with SC transitive semantics.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
aclosing,
)
from functools import partial
import inspect
from pprint import pformat
from typing import (
Any,
Callable,
Coroutine,
TYPE_CHECKING,
)
import warnings
import trio
from trio import (
CancelScope,
Nursery,
TaskStatus,
)
from .msg import NamespacePath
from ._ipc import Channel
from ._context import (
Context,
)
from ._exceptions import (
ContextCancelled,
ModuleNotExposed,
MsgTypeError,
TransportClosed,
is_multi_cancelled,
pack_error,
unpack_error,
)
from .devx import (
maybe_wait_for_debugger,
_debug,
)
from . import _state
from .log import get_logger
2024-04-02 17:41:52 +00:00
from tractor.msg.types import (
CancelAck,
Error,
Msg,
Return,
2024-04-02 17:41:52 +00:00
Start,
StartAck,
Started,
Stop,
Yield,
)
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger('tractor')
async def _invoke_non_context(
actor: Actor,
cancel_scope: CancelScope,
ctx: Context,
cid: str,
chan: Channel,
func: Callable,
coro: Coroutine,
kwargs: dict[str, Any],
treat_as_gen: bool,
is_rpc: bool,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
):
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
2024-04-02 17:41:52 +00:00
await chan.send(
StartAck(
cid=cid,
functype='asyncgen',
)
)
# XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be
# triggered in the underlying async gen! So we
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
async with aclosing(coro) as agen:
async for item in agen:
# TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
2024-04-02 17:41:52 +00:00
await chan.send(
Yield(
cid=cid,
pld=item,
)
)
log.runtime(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
2024-04-02 17:41:52 +00:00
await chan.send(
Stop(cid=cid)
)
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen:
2024-04-02 17:41:52 +00:00
await chan.send(
StartAck(
cid=cid,
functype='asyncgen',
)
)
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
2024-04-02 17:41:52 +00:00
await chan.send(
Stop(cid=cid)
)
else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
failed_resp: bool = False
try:
2024-04-02 17:41:52 +00:00
await chan.send(
StartAck(
cid=cid,
functype='asyncfunc',
)
)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
failed_resp = True
if is_rpc:
raise
else:
# TODO: should this be an `.exception()` call?
log.warning(
f'Failed to respond to non-rpc request: {func}\n'
f'{ipc_err}'
)
with cancel_scope as cs:
ctx._scope: CancelScope = cs
task_status.started(ctx)
result = await coro
fname: str = func.__name__
log.runtime(
'RPC complete:\n'
f'task: {ctx._task}\n'
f'|_cid={ctx.cid}\n'
f'|_{fname}() -> {pformat(result)}\n'
)
# NOTE: only send result if we know IPC isn't down
if (
not failed_resp
and chan.connected()
):
try:
2024-04-02 17:41:52 +00:00
await chan.send(
return_msg(
2024-04-02 17:41:52 +00:00
cid=cid,
pld=result,
)
)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to return result:\n'
f'{func}@{actor.uid}\n'
f'remote chan: {chan.uid}'
)
@acm
async def _errors_relayed_via_ipc(
actor: Actor,
chan: Channel,
ctx: Context,
is_rpc: bool,
hide_tb: bool = False,
debug_kbis: bool = False,
task_status: TaskStatus[
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
) -> None:
__tracebackhide__: bool = hide_tb # TODO: use hide_tb here?
try:
yield # run RPC invoke body
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
except (
Exception,
BaseExceptionGroup,
KeyboardInterrupt,
) as err:
# always hide this frame from debug REPL if the crash
# originated from an rpc task and we DID NOT fail due to
# an IPC transport error!
if (
is_rpc
and chan.connected()
):
__tracebackhide__: bool = hide_tb
if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
# if not isinstance(err, trio.ClosedResourceError) and (
# if not is_multi_cancelled(err) and (
entered_debug: bool = False
if (
(
not isinstance(err, ContextCancelled)
or (
isinstance(err, ContextCancelled)
and ctx._cancel_called
# if the root blocks the debugger lock request from a child
# we will get a remote-cancelled condition.
and ctx._enter_debugger_on_cancel
)
)
and
(
not isinstance(err, KeyboardInterrupt)
or (
isinstance(err, KeyboardInterrupt)
and debug_kbis
)
)
):
# await _debug.pause()
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this
# type of failure will be useful for respawns or
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
First try "relayed boxed errors", or "inceptions" Since adding more complex inter-peer (actor) testing scenarios, we definitely have an immediate need for `trio`'s style of "inceptions" but for nesting `RemoteActorError`s as they're relayed through multiple actor-IPC hops. So for example, a remote error relayed "through" some proxy actor to another ends up packing a `RemoteActorError` into another one such that there are 2 layers of RAEs with the first containing/boxing an original src actor error (type). In support of this extension to `RemoteActorError` we add: - `get_err_type()` error type resolver helper (factored fromthe body of `unpack_error()`) to be used whenever rendering `.src_type`/`.boxed_type`. - `.src_type_str: str` which is pulled from `.msgdata` and holds the above (eventually when unpacked) type as `str`. - `._src_type: BaseException|None` for the original "source" actor's error as unpacked in any remote (actor's) env and exposed as a readonly property `.src_type`. - `.boxed_type_str: str` the same as above but for the "last" boxed error's type; when the RAE is unpacked at its first hop this will be **the same as** `.src_type_str`. - `._boxed_type: BaseException` which now similarly should be "rendered" from the below type-`str` field instead of passed in as a error-type via `boxed_type` (though we still do for the ctxc case atm, see notes). |_ new sanity checks in `.__init__()` mostly as a reminder to handle that ^ ctxc case ^ more elegantly at some point.. |_ obvi we discard the previous `suberror_type` input arg. - fully remove the `.type`/`.type_str` properties instead expecting usage of `.boxed_/.src_` equivalents. - start deprecation of `.src_actor_uid` and make it delegate to new `.src_uid` - add `.relay_uid` propery for the last relay/hop's actor uid. - add `.relay_path: list[str]` which holds the per-hop updated sequence of relay actor uid's which consecutively did boxing of an RAE. - only include `.src_uid` and `.relay_path` in reprol() output. - factor field-to-str rendering into a new `_mk_fields_str()` and use it in `.__repr__()`/`.reprol()`. - add an `.unwrap()` to (attempt to) render the src error. - rework `pack_error()` to handle inceptions including, - packing the correct field-values for the new `boxed_type_str`, `relay_uid`, `src_uid`, `src_type_str`. - always updating the `relay_path` sequence with the uid of the current actor. - adjust `unpack_error()` to match all these changes, - pulling `boxed_type_str` and passing any resolved `boxed_type` to `RemoteActorError.__init__()`. - use the new `Context.maybe_raise()` convenience method. Adjust `._rpc` packing to `ContextCancelled(boxed_type=trio.Cancelled)` and tweak some more log msg formats.
2024-03-18 14:21:37 +00:00
log.exception(
'RPC task crashed\n'
f'|_{ctx}'
)
# always (try to) ship RPC errors back to caller
if is_rpc:
#
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
await try_ship_error_to_remote(
chan,
err,
cid=ctx.cid,
remote_descr='caller',
hide_tb=hide_tb,
)
# error is probably from above coro running code *not from
# the target rpc invocation since a scope was never
# allocated around the coroutine await.
if ctx._scope is None:
# we don't ever raise directly here to allow the
# msg-loop-scheduler to continue running for this
# channel.
task_status.started(err)
# always reraise KBIs so they propagate at the sys-process
# level.
if isinstance(err, KeyboardInterrupt):
raise
# RPC task bookeeping
finally:
try:
ctx, func, is_complete = actor._rpc_tasks.pop(
(chan, ctx.cid)
)
is_complete.set()
except KeyError:
if is_rpc:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warning(
'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
else:
log.cancel(
'Failed to de-alloc internal runtime cancel task?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
finally:
if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed")
actor._ongoing_rpc_tasks.set()
async def _invoke(
actor: Actor,
cid: str,
chan: Channel,
func: Callable,
kwargs: dict[str, Any],
is_rpc: bool = True,
hide_tb: bool = True,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
):
'''
Schedule a `trio` task-as-func and deliver result(s) over
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`.
'''
__tracebackhide__: bool = hide_tb
treat_as_gen: bool = False
if _state.debug_mode():
Refine and test `tractor.pause_from_sync()` Now supports use from any `trio` task, any sync thread started with `trio.to_thread.run_sync()` AND also via `breakpoint()` builtin API! The only bit missing now is support for `asyncio` tasks when in infected mode.. Bo `greenback` setup/API adjustments: - move `._rpc.maybe_import_gb()` to -> `devx._debug` and factor out the cached import checking into a sync func whilst placing the async `.ensure_portal()` bootstrapping into a new async `maybe_init_greenback()`. - use the new init-er func inside `open_root_actor()` with the output predicating whether we override the `breakpoint()` hook. core `devx._debug` implementation deatz: - make `mk_mpdb()` only return the `pdp.Pdb` subtype instance since the sigint unshielding func is now accessible from the `Lock` singleton from anywhere. - add non-main thread support (at least for `trio.to_thread` use cases) to our `Lock` with a new `.is_trio_thread()` predicate that delegates directly to `trio`'s internal version. - do `Lock.is_trio_thread()` checks inside any methods which require special provisions when invoked from a non-main `trio` thread: - `.[un]shield_sigint()` methods since `signal.signal` usage is only allowed from cpython's main thread. - `.release()` since `trio.StrictFIFOLock` can only be called from a `trio` task. - rework `.pause_from_sync()` itself to directly call `._set_trace()` and don't bother with `greenback._await()` when we're already calling it from a `.to_thread.run_sync()` thread, oh and try to use the thread/task name when setting `Lock.local_task_in_debug`. - make it an RTE for now if you try to use `.pause_from_sync()` from any infected-`asyncio` task, but support is (hopefully) coming soon! For testing we add a new `test_debugger.py::test_pause_from_sync()` which includes a ctrl-c parametrization around the `examples/debugging/sync_bp.py` script which includes all currently supported/working usages: - `tractor.pause_from_sync()`. - via `breakpoint()` overload. - from a `trio.to_thread.run_sync()` spawn.
2024-03-22 20:41:49 +00:00
# XXX for .pause_from_sync()` usage we need to make sure
# `greenback` is boostrapped in the subactor!
await _debug.maybe_init_greenback()
# TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)?
# tb = None
cancel_scope = CancelScope()
# activated cancel scope ref
cs: CancelScope|None = None
ctx = actor.get_context(
chan=chan,
cid=cid,
nsf=NamespacePath.from_ref(func),
# TODO: if we wanted to get cray and support it?
# side='callee',
# We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to
# open the stream with this option.
# allow_overruns=True,
)
context: bool = False
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
params = sig.parameters
# compat with old api
kwargs['ctx'] = ctx
treat_as_gen = True
if 'ctx' in params:
warnings.warn(
"`@tractor.stream decorated funcs should now declare "
"a `stream` arg, `ctx` is now designated for use with "
"@tractor.context",
DeprecationWarning,
stacklevel=2,
)
elif 'stream' in params:
assert 'stream' in params
kwargs['stream'] = ctx
elif getattr(func, '_tractor_context_function', False):
# handle decorated ``@tractor.context`` async function
kwargs['ctx'] = ctx
context = True
# errors raised inside this block are propgated back to caller
async with _errors_relayed_via_ipc(
actor,
chan,
ctx,
is_rpc,
hide_tb=hide_tb,
task_status=task_status,
):
if not (
inspect.isasyncgenfunction(func) or
inspect.iscoroutinefunction(func)
):
raise TypeError(f'{func} must be an async function!')
# init coroutine with `kwargs` to immediately catch any
# type-sig errors.
try:
coro = func(**kwargs)
except TypeError:
raise
# TODO: implement all these cases in terms of the
# `Context` one!
if not context:
await _invoke_non_context(
actor,
cancel_scope,
ctx,
cid,
chan,
func,
coro,
kwargs,
treat_as_gen,
is_rpc,
return_msg,
task_status,
)
# below is only for `@context` funcs
return
# our most general case: a remote SC-transitive,
# IPC-linked, cross-actor-task "context"
# ------ - ------
# TODO: every other "func type" should be implemented from
# a special case of this impl eventually!
# -[ ] streaming funcs should instead of being async-for
# handled directly here wrapped in
# a async-with-open_stream() closure that does the
# normal thing you'd expect a far end streaming context
# to (if written by the app-dev).
# -[ ] one off async funcs can literally just be called
# here and awaited directly, possibly just with a small
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
# a "context" endpoint type is the most general and
# "least sugary" type of RPC ep with support for
# bi-dir streaming B)
2024-04-02 17:41:52 +00:00
# StartAck
await chan.send(
StartAck(
cid=cid,
functype='context',
)
)
# TODO: should we also use an `.open_context()` equiv
# for this callee side by factoring the impl from
# `Portal.open_context()` into a common helper?
#
# NOTE: there are many different ctx state details
# in a callee side instance according to current impl:
# - `.cancelled_caught` can never be `True`.
# -> the below scope is never exposed to the
# `@context` marked RPC function.
# - `._portal` is never set.
try:
async with trio.open_nursery() as tn:
ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope
task_status.started(ctx)
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
res: Any = await coro
ctx._result = res
# deliver final result to caller side.
2024-04-02 17:41:52 +00:00
await chan.send(
return_msg(
2024-04-02 17:41:52 +00:00
cid=cid,
pld=res,
)
)
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
# - *this* callee task manually calling `ctx.cancel()`.
# - the runtime calling `ctx._deliver_msg()` which
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser.
# - currently a never-should-happen-fallthrough case
# inside ._context._drain_to_final_msg()`..
# # TODO: remove this ^ right?
if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid
# first check for and raise any remote error
# before raising any context cancelled case
# so that real remote errors don't get masked as
# ``ContextCancelled``s.
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
cs: CancelScope = ctx._scope
if cs.cancel_called:
canceller: tuple = ctx.canceller
msg: str = (
'actor was cancelled by '
)
# NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call
# ``Context.cancel()`` directly, we're going to
# need to change this logic branch since it
# will always enter..
if ctx._cancel_called:
# TODO: test for this!!!!!
canceller: tuple = our_uid
msg += 'itself '
# if the channel which spawned the ctx is the
# one that cancelled it then we report that, vs.
# it being some other random actor that for ex.
# some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx.
elif canceller == ctx.chan.uid:
msg += 'its caller'
else:
msg += 'a remote peer'
div_chars: str = '------ - ------'
div_offset: int = (
round(len(msg)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
msg += (
div_str +
f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n'
f' |_{ctx._task}()'
# TODO: instead just show the
# ctx.__str__() here?
# -[ ] textwrap.indent() it correctly!
# -[ ] BUT we need to wait until
# the state is filled out before emitting
# this msg right ow its kinda empty? bleh..
#
# f' |_{ctx}'
)
# task-contex was either cancelled by request using
# ``Portal.cancel_actor()`` or ``Context.cancel()``
# on the far end, or it was cancelled by the local
# (callee) task, so relay this cancel signal to the
# other side.
ctxc = ContextCancelled(
message=msg,
First try "relayed boxed errors", or "inceptions" Since adding more complex inter-peer (actor) testing scenarios, we definitely have an immediate need for `trio`'s style of "inceptions" but for nesting `RemoteActorError`s as they're relayed through multiple actor-IPC hops. So for example, a remote error relayed "through" some proxy actor to another ends up packing a `RemoteActorError` into another one such that there are 2 layers of RAEs with the first containing/boxing an original src actor error (type). In support of this extension to `RemoteActorError` we add: - `get_err_type()` error type resolver helper (factored fromthe body of `unpack_error()`) to be used whenever rendering `.src_type`/`.boxed_type`. - `.src_type_str: str` which is pulled from `.msgdata` and holds the above (eventually when unpacked) type as `str`. - `._src_type: BaseException|None` for the original "source" actor's error as unpacked in any remote (actor's) env and exposed as a readonly property `.src_type`. - `.boxed_type_str: str` the same as above but for the "last" boxed error's type; when the RAE is unpacked at its first hop this will be **the same as** `.src_type_str`. - `._boxed_type: BaseException` which now similarly should be "rendered" from the below type-`str` field instead of passed in as a error-type via `boxed_type` (though we still do for the ctxc case atm, see notes). |_ new sanity checks in `.__init__()` mostly as a reminder to handle that ^ ctxc case ^ more elegantly at some point.. |_ obvi we discard the previous `suberror_type` input arg. - fully remove the `.type`/`.type_str` properties instead expecting usage of `.boxed_/.src_` equivalents. - start deprecation of `.src_actor_uid` and make it delegate to new `.src_uid` - add `.relay_uid` propery for the last relay/hop's actor uid. - add `.relay_path: list[str]` which holds the per-hop updated sequence of relay actor uid's which consecutively did boxing of an RAE. - only include `.src_uid` and `.relay_path` in reprol() output. - factor field-to-str rendering into a new `_mk_fields_str()` and use it in `.__repr__()`/`.reprol()`. - add an `.unwrap()` to (attempt to) render the src error. - rework `pack_error()` to handle inceptions including, - packing the correct field-values for the new `boxed_type_str`, `relay_uid`, `src_uid`, `src_type_str`. - always updating the `relay_path` sequence with the uid of the current actor. - adjust `unpack_error()` to match all these changes, - pulling `boxed_type_str` and passing any resolved `boxed_type` to `RemoteActorError.__init__()`. - use the new `Context.maybe_raise()` convenience method. Adjust `._rpc` packing to `ContextCancelled(boxed_type=trio.Cancelled)` and tweak some more log msg formats.
2024-03-18 14:21:37 +00:00
boxed_type=trio.Cancelled,
canceller=canceller,
)
# assign local error so that the `.outcome`
# resolves to an error for both reporting and
# state checks.
ctx._local_error = ctxc
raise ctxc
# XXX: do we ever trigger this block any more?
except (
BaseExceptionGroup,
trio.Cancelled,
BaseException,
) as scope_error:
# always set this (callee) side's exception as the
# local error on the context
ctx._local_error: BaseException = scope_error
# if a remote error was set then likely the
# exception group was raised due to that, so
# and we instead raise that error immediately!
ctx.maybe_raise()
# maybe TODO: pack in come kinda
# `trio.Cancelled.__traceback__` here so they can be
# unwrapped and displayed on the caller side? no se..
raise
# `@context` entrypoint task bookeeping.
# i.e. only pop the context tracking if used ;)
finally:
assert chan.uid
# don't pop the local context until we know the
# associated child isn't in debug any more
await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((
chan.uid,
cid,
# ctx.side,
))
merr: Exception|None = ctx.maybe_error
(
res_type_str,
res_str,
) = (
('error', f'{type(merr)}',)
if merr
else (
'result',
f'`{repr(ctx.outcome)}`',
)
)
First try "relayed boxed errors", or "inceptions" Since adding more complex inter-peer (actor) testing scenarios, we definitely have an immediate need for `trio`'s style of "inceptions" but for nesting `RemoteActorError`s as they're relayed through multiple actor-IPC hops. So for example, a remote error relayed "through" some proxy actor to another ends up packing a `RemoteActorError` into another one such that there are 2 layers of RAEs with the first containing/boxing an original src actor error (type). In support of this extension to `RemoteActorError` we add: - `get_err_type()` error type resolver helper (factored fromthe body of `unpack_error()`) to be used whenever rendering `.src_type`/`.boxed_type`. - `.src_type_str: str` which is pulled from `.msgdata` and holds the above (eventually when unpacked) type as `str`. - `._src_type: BaseException|None` for the original "source" actor's error as unpacked in any remote (actor's) env and exposed as a readonly property `.src_type`. - `.boxed_type_str: str` the same as above but for the "last" boxed error's type; when the RAE is unpacked at its first hop this will be **the same as** `.src_type_str`. - `._boxed_type: BaseException` which now similarly should be "rendered" from the below type-`str` field instead of passed in as a error-type via `boxed_type` (though we still do for the ctxc case atm, see notes). |_ new sanity checks in `.__init__()` mostly as a reminder to handle that ^ ctxc case ^ more elegantly at some point.. |_ obvi we discard the previous `suberror_type` input arg. - fully remove the `.type`/`.type_str` properties instead expecting usage of `.boxed_/.src_` equivalents. - start deprecation of `.src_actor_uid` and make it delegate to new `.src_uid` - add `.relay_uid` propery for the last relay/hop's actor uid. - add `.relay_path: list[str]` which holds the per-hop updated sequence of relay actor uid's which consecutively did boxing of an RAE. - only include `.src_uid` and `.relay_path` in reprol() output. - factor field-to-str rendering into a new `_mk_fields_str()` and use it in `.__repr__()`/`.reprol()`. - add an `.unwrap()` to (attempt to) render the src error. - rework `pack_error()` to handle inceptions including, - packing the correct field-values for the new `boxed_type_str`, `relay_uid`, `src_uid`, `src_type_str`. - always updating the `relay_path` sequence with the uid of the current actor. - adjust `unpack_error()` to match all these changes, - pulling `boxed_type_str` and passing any resolved `boxed_type` to `RemoteActorError.__init__()`. - use the new `Context.maybe_raise()` convenience method. Adjust `._rpc` packing to `ContextCancelled(boxed_type=trio.Cancelled)` and tweak some more log msg formats.
2024-03-18 14:21:37 +00:00
log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}\n'
)
async def try_ship_error_to_remote(
channel: Channel,
err: Exception|BaseExceptionGroup,
cid: str|None = None,
remote_descr: str = 'parent',
hide_tb: bool = True,
) -> None:
'''
Box, pack and encode a local runtime(-internal) exception for
an IPC channel `.send()` with transport/network failures and
local cancellation ignored but logged as critical(ly bad).
'''
__tracebackhide__: bool = hide_tb
with CancelScope(shield=True):
try:
# NOTE: normally only used for internal runtime errors
# so ship to peer actor without a cid.
2024-04-02 17:41:52 +00:00
# msg: dict = pack_error(
msg: Error = pack_error(
err,
cid=cid,
# TODO: special tb fmting for ctxc cases?
# tb=tb,
)
await channel.send(msg)
# XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
):
log.critical(
'IPC transport failure -> '
f'failed to ship error to {remote_descr}!\n\n'
f'X=> {channel.uid}\n\n'
# TODO: use `.msg.preetty_struct` for this!
2024-04-02 17:41:52 +00:00
f'{msg}\n'
)
async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
This is the low-level, per-IPC-channel, RPC task scheduler loop.
Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_n: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`)
are delivered to locally running, linked-via-`Context`, tasks
with any (boxed) errors and/or final results shipped back to
the remote side.
All higher level inter-actor comms ops are delivered in some
form by the msg processing here, including:
- lookup and invocation of any (async) funcs-as-tasks requested
by remote actors presuming the local actor has enabled their
containing module.
- IPC-session oriented `Context` and `MsgStream` msg payload
delivery such as `started`, `yield` and `return` msgs.
- cancellation handling for both `Context.cancel()` (which
translate to `Actor._cancel_task()` RPCs server side)
and `Actor.cancel()` process-wide-runtime-shutdown requests
(as utilized inside `Portal.cancel_actor()` ).
'''
assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
# https://github.com/python-trio/trio/issues/467
log.runtime(
'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
nursery_cancelled_before_task: bool = False
msg: Msg|None = None
try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having
# been cancelled (eg. `open_portal()` may call this method
# from a locally spawned task) and recieve this scope
# using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs:
task_status.started(loop_cs)
async for msg in chan:
log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: avoid fmting depending on loglevel for perf?
# -[ ] specifically `pformat()` sub-call..?
# -[ ] use `.msg.pretty_struct` here now instead!
f'{pformat(msg)}\n'
)
2024-04-02 17:41:52 +00:00
match msg:
# msg for an ongoing IPC ctx session, deliver msg to
# local task.
2024-04-02 17:41:52 +00:00
case (
StartAck(cid=cid)
| Started(cid=cid)
| Yield(cid=cid)
| Stop(cid=cid)
| Return(cid=cid)
| CancelAck(cid=cid)
# `.cid` means RPC-ctx-task specific
| Error(cid=cid)
# recv-side `MsgType` decode violation
| MsgTypeError(cid=cid)
2024-04-02 17:41:52 +00:00
):
# deliver response to local caller/waiter
# via its per-remote-context memory channel.
await actor._deliver_ctx_payload(
2024-04-02 17:41:52 +00:00
chan,
cid,
msg,
)
# `Actor`(-internal) runtime cancel requests
case Start(
ns='self',
func='cancel',
cid=cid,
kwargs=kwargs,
):
kwargs |= {'req_chan': chan}
# XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is
# currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
await _invoke(
actor,
cid,
chan,
actor.cancel,
kwargs,
is_rpc=False,
return_msg=CancelAck,
)
log.runtime(
'Cancelling IPC transport msg-loop with peer:\n'
f'|_{chan}\n'
)
loop_cs.cancel()
break
case Start(
ns='self',
func='_cancel_task',
cid=cid,
kwargs=kwargs,
):
target_cid: str = kwargs['cid']
kwargs |= {
'requesting_uid': chan.uid,
'ipc_msg': msg,
# XXX NOTE! ONLY the rpc-task-owning
# parent IPC channel should be able to
# cancel it!
'parent_chan': chan,
}
try:
await _invoke(
actor,
cid,
chan,
actor._cancel_task,
kwargs,
is_rpc=False,
return_msg=CancelAck,
)
except BaseException:
log.exception(
'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n'
f' |_{chan}\n\n'
f'=> {actor}\n'
f' |_cid: {target_cid}\n'
)
2024-04-02 17:41:52 +00:00
# the "MAIN" RPC endpoint to schedule-a-`trio.Task`
# ------ - ------
# -[x] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
2024-04-02 17:41:52 +00:00
case Start(
cid=cid,
ns=ns,
func=funcname,
Drop `None`-sentinel cancels RPC loop mechanism Pretty sure we haven't *needed it* for a while, it was always generally hazardous in terms of IPC msg types, AND it's definitely incompatible with a dynamically applied typed msg spec: you can't just expect a `None` to be willy nilly handled all the time XD For now I'm masking out all the code and leaving very detailed surrounding notes but am not removing it quite yet in case for strange reason it is needed by some edge case (though I haven't found according to the test suite). Backstory: ------ - ------ Originally (i'm pretty sure anyway) it was added as a super naive "remote cancellation" mechanism (back before there were specific `Actor` methods for such things) that was mostly (only?) used before IPC `Channel` closures to "more gracefully cancel" the connection's parented RPC tasks. Since we now have explicit runtime-RPC endpoints for conducting remote cancellation of both tasks and full actors, it should really be removed anyway, because: - a `None`-msg setinel is inconsistent with other RPC endpoint handling input patterns which (even prior to typed msging) had specific msg-value triggers. - the IPC endpoint's (block) implementation should use `Actor.cancel_rpc_tasks(parent_chan=chan)` instead of a manual loop through a `Actor._rpc_tasks.copy()`.. Deats: - mask the `Channel.send(None)` calls from both the `Actor._stream_handler()` tail as well as from the `._portal.open_portal()` was connected block. - mask the msg loop endpoint block and toss in lotsa notes. Unrelated tweaks: - drop `Actor._debug_mode`; unused. - make `Actor.cancel_server()` return a `bool`. - use `.msg.pretty_struct.Struct.pformat()` to show any msg that is ignored (bc invalid) in `._push_result()`.
2024-04-05 23:07:12 +00:00
kwargs=kwargs, # type-spec this? see `msg.types`
2024-04-02 17:41:52 +00:00
uid=actorid,
):
log.runtime(
'Handling RPC `Start` request from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
# runtime-internal endpoint: `Actor.<funcname>`
# only registry methods exist now yah,
# like ``.register_actor()`` etc. ?
2024-04-02 17:41:52 +00:00
if ns == 'self':
func: Callable = getattr(actor, funcname)
2024-04-02 17:41:52 +00:00
# application RPC endpoint
2024-04-02 17:41:52 +00:00
else:
try:
func: Callable = actor._get_rpc_func(
ns,
funcname,
)
2024-04-02 17:41:52 +00:00
except (
ModuleNotExposed,
AttributeError,
) as err:
# always complain to requester
# client about un-enabled modules
2024-04-02 17:41:52 +00:00
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
await chan.send(err_msg)
continue
# schedule a task for the requested RPC function
# in the actor's main "service nursery".
#
2024-04-02 17:41:52 +00:00
# TODO: possibly a service-tn per IPC channel for
# supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks`
# table?
log.runtime(
f'Spawning task for RPC request\n'
f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n'
# ^-TODO-^ maddr style repr?
2024-04-02 17:41:52 +00:00
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n'
f'=> {actor}\n'
f' |_cid: {cid}\n'
f' |>> {func}()\n'
)
try:
2024-04-02 17:41:52 +00:00
ctx: Context = await actor._service_n.start(
partial(
_invoke,
actor,
cid,
chan,
func,
kwargs,
),
name=funcname,
)
2024-04-02 17:41:52 +00:00
except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task: bool = True
break
# in the lone case where a ``Context`` is not
# delivered, it's likely going to be a locally
# scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception):
log.warning(
'Task for RPC failed?'
f'|_ {func}()\n\n'
f'{err}'
)
2024-04-02 17:41:52 +00:00
continue
2024-04-02 17:41:52 +00:00
else:
# mark our global state with ongoing rpc tasks
2024-04-02 17:41:52 +00:00
actor._ongoing_rpc_tasks = trio.Event()
2024-04-02 17:41:52 +00:00
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
actor._rpc_tasks[(chan, cid)] = (
ctx,
func,
trio.Event(),
)
# XXX remote (runtime scoped) error or uknown
# msg (type).
case Error() | _:
# NOTE: this is the non-rpc error case,
# that is, an error **not** raised inside
# a call to ``_invoke()`` (i.e. no cid was
# provided in the msg - see above). Push
# this error to all local channel
# consumers (normally portals) by marking
# the channel as errored
2024-04-02 17:41:52 +00:00
log.exception(
f'Unhandled IPC msg:\n\n'
f'{msg}\n'
)
# assert chan.uid
chan._exc: Exception = unpack_error(
2024-04-02 17:41:52 +00:00
msg,
chan=chan,
)
raise chan._exc
log.runtime(
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
# END-OF `async for`:
# IPC disconnected via `trio.EndOfChannel`, likely
# due to a (graceful) `Channel.aclose()`.
log.runtime(
f'channel for {chan.uid} disconnected, cancelling RPC tasks\n'
f'|_{chan}\n'
)
await actor.cancel_rpc_tasks(
req_uid=actor.uid,
# a "self cancel" in terms of the lifetime of the
# IPC connection which is presumed to be the
# source of any requests for spawned tasks.
parent_chan=chan,
)
except (
TransportClosed,
):
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up..
# TODO: add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
# -[ ] figure out how this will break with other transports?
log.runtime(
f'channel closed abruptly with\n'
f'peer: {chan.uid}\n'
f'|_{chan.raddr}\n'
)
# transport **was** disconnected
return True
except (
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task:
sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)
else:
# ship any "internal" exception (i.e. one from internal
# machinery not from an rpc task) to parent
match err:
case ContextCancelled():
log.cancel(
f'Actor: {actor.uid} was context-cancelled with,\n'
f'str(err)'
)
case _:
log.exception("Actor errored:")
if actor._parent_chan:
await try_ship_error_to_remote(
actor._parent_chan,
err,
)
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
raise
finally:
# msg debugging for when he machinery is brokey
log.runtime(
'Exiting IPC msg loop with\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n\n'
'final msg:\n'
f'{pformat(msg)}\n'
)
# transport **was not** disconnected
return False