2024-03-13 19:57:15 +00:00
|
|
|
# tractor: structured concurrent "actors".
|
|
|
|
|
# Copyright 2018-eternity Tyler Goodlet.
|
|
|
|
|
|
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
|
# (at your option) any later version.
|
|
|
|
|
|
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
|
# GNU Affero General Public License for more details.
|
|
|
|
|
|
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
Remote (task) Procedure Call (scheduling) with SC transitive semantics.
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
from contextlib import (
|
|
|
|
|
asynccontextmanager as acm,
|
2024-03-13 22:41:24 +00:00
|
|
|
aclosing,
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
|
|
|
|
from functools import partial
|
|
|
|
|
import inspect
|
|
|
|
|
from pprint import pformat
|
2024-06-14 19:49:30 +00:00
|
|
|
import traceback
|
2024-03-13 19:57:15 +00:00
|
|
|
from typing import (
|
|
|
|
|
Any,
|
|
|
|
|
Callable,
|
|
|
|
|
Coroutine,
|
|
|
|
|
TYPE_CHECKING,
|
|
|
|
|
)
|
|
|
|
|
import warnings
|
|
|
|
|
|
|
|
|
|
import trio
|
|
|
|
|
from trio import (
|
2025-06-16 01:22:08 +00:00
|
|
|
Cancelled,
|
2024-03-13 19:57:15 +00:00
|
|
|
CancelScope,
|
|
|
|
|
Nursery,
|
|
|
|
|
TaskStatus,
|
|
|
|
|
)
|
|
|
|
|
|
2025-03-13 23:41:30 +00:00
|
|
|
from .ipc import Channel
|
2024-03-13 19:57:15 +00:00
|
|
|
from ._context import (
|
|
|
|
|
Context,
|
|
|
|
|
)
|
|
|
|
|
from ._exceptions import (
|
2024-04-09 17:58:10 +00:00
|
|
|
ContextCancelled,
|
2024-06-14 19:49:30 +00:00
|
|
|
RemoteActorError,
|
2024-03-13 19:57:15 +00:00
|
|
|
ModuleNotExposed,
|
2024-04-09 17:58:10 +00:00
|
|
|
MsgTypeError,
|
|
|
|
|
TransportClosed,
|
2024-03-13 19:57:15 +00:00
|
|
|
pack_error,
|
|
|
|
|
unpack_error,
|
|
|
|
|
)
|
2025-06-16 01:22:08 +00:00
|
|
|
from .trionics import (
|
|
|
|
|
collapse_eg,
|
|
|
|
|
is_multi_cancelled,
|
|
|
|
|
maybe_raise_from_masking_exc,
|
|
|
|
|
)
|
2024-05-07 13:20:43 +00:00
|
|
|
from .devx import (
|
2025-05-13 16:13:12 +00:00
|
|
|
debug,
|
2024-05-07 13:20:43 +00:00
|
|
|
add_div,
|
2025-06-29 18:47:03 +00:00
|
|
|
pformat as _pformat,
|
2024-05-07 13:20:43 +00:00
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
from . import _state
|
|
|
|
|
from .log import get_logger
|
2024-04-14 23:31:50 +00:00
|
|
|
from .msg import (
|
|
|
|
|
current_codec,
|
|
|
|
|
MsgCodec,
|
2024-05-28 13:36:26 +00:00
|
|
|
PayloadT,
|
2024-04-14 23:31:50 +00:00
|
|
|
NamespacePath,
|
2025-06-29 18:47:03 +00:00
|
|
|
pretty_struct,
|
2024-06-17 02:50:43 +00:00
|
|
|
_ops as msgops,
|
2024-04-14 23:31:50 +00:00
|
|
|
)
|
2024-04-02 17:41:52 +00:00
|
|
|
from tractor.msg.types import (
|
2024-04-07 14:40:01 +00:00
|
|
|
CancelAck,
|
|
|
|
|
Error,
|
2024-04-30 16:15:46 +00:00
|
|
|
MsgType,
|
2024-04-07 14:40:01 +00:00
|
|
|
Return,
|
2024-04-02 17:41:52 +00:00
|
|
|
Start,
|
|
|
|
|
StartAck,
|
|
|
|
|
Started,
|
|
|
|
|
Stop,
|
|
|
|
|
Yield,
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from ._runtime import Actor
|
|
|
|
|
|
|
|
|
|
log = get_logger('tractor')
|
|
|
|
|
|
|
|
|
|
|
2024-06-28 23:06:17 +00:00
|
|
|
# ?TODO? move to a `tractor.lowlevel._rpc` with the below
|
|
|
|
|
# func-type-cases implemented "on top of" `@context` defs:
|
|
|
|
|
# -[ ] std async func helper decorated with `@rpc_func`?
|
|
|
|
|
# -[ ] `Portal.open_stream_from()` with async-gens?
|
|
|
|
|
# |_ possibly a duplex form of this with a
|
|
|
|
|
# `sent_from_peer = yield send_to_peer` form, which would require
|
|
|
|
|
# syncing the send/recv side with possibly `.receive_nowait()`
|
|
|
|
|
# on each `yield`?
|
|
|
|
|
# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with
|
|
|
|
|
# user only defining a single-`yield` generator-func?
|
2024-03-13 19:57:15 +00:00
|
|
|
async def _invoke_non_context(
|
|
|
|
|
actor: Actor,
|
|
|
|
|
cancel_scope: CancelScope,
|
|
|
|
|
ctx: Context,
|
|
|
|
|
cid: str,
|
|
|
|
|
chan: Channel,
|
|
|
|
|
func: Callable,
|
|
|
|
|
coro: Coroutine,
|
|
|
|
|
kwargs: dict[str, Any],
|
|
|
|
|
|
|
|
|
|
treat_as_gen: bool,
|
|
|
|
|
is_rpc: bool,
|
2024-05-28 13:36:26 +00:00
|
|
|
return_msg_type: Return|CancelAck = Return,
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
task_status: TaskStatus[
|
|
|
|
|
Context | BaseException
|
|
|
|
|
] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
):
|
2024-04-14 23:31:50 +00:00
|
|
|
__tracebackhide__: bool = True
|
2024-06-28 23:06:17 +00:00
|
|
|
cs: CancelScope|None = None # ref when activated
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-06-28 23:06:17 +00:00
|
|
|
# ?TODO? can we unify this with the `context=True` impl below?
|
2024-03-13 19:57:15 +00:00
|
|
|
if inspect.isasyncgen(coro):
|
2024-04-02 17:41:52 +00:00
|
|
|
await chan.send(
|
|
|
|
|
StartAck(
|
|
|
|
|
cid=cid,
|
|
|
|
|
functype='asyncgen',
|
|
|
|
|
)
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
# XXX: massive gotcha! If the containing scope
|
|
|
|
|
# is cancelled and we execute the below line,
|
|
|
|
|
# any ``ActorNursery.__aexit__()`` WON'T be
|
|
|
|
|
# triggered in the underlying async gen! So we
|
|
|
|
|
# have to properly handle the closing (aclosing)
|
|
|
|
|
# of the async gen in order to be sure the cancel
|
|
|
|
|
# is propagated!
|
|
|
|
|
with cancel_scope as cs:
|
|
|
|
|
ctx._scope = cs
|
|
|
|
|
task_status.started(ctx)
|
|
|
|
|
async with aclosing(coro) as agen:
|
|
|
|
|
async for item in agen:
|
|
|
|
|
# TODO: can we send values back in here?
|
|
|
|
|
# it's gonna require a `while True:` and
|
|
|
|
|
# some non-blocking way to retrieve new `asend()`
|
|
|
|
|
# values from the channel:
|
|
|
|
|
# to_send = await chan.recv_nowait()
|
|
|
|
|
# if to_send is not None:
|
|
|
|
|
# to_yield = await coro.asend(to_send)
|
2024-04-02 17:41:52 +00:00
|
|
|
await chan.send(
|
|
|
|
|
Yield(
|
|
|
|
|
cid=cid,
|
|
|
|
|
pld=item,
|
|
|
|
|
)
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
log.runtime(f"Finished iterating {coro}")
|
|
|
|
|
# TODO: we should really support a proper
|
|
|
|
|
# `StopAsyncIteration` system here for returning a final
|
|
|
|
|
# value if desired
|
2024-04-02 17:41:52 +00:00
|
|
|
await chan.send(
|
|
|
|
|
Stop(cid=cid)
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# one way @stream func that gets treated like an async gen
|
|
|
|
|
# TODO: can we unify this with the `context=True` impl below?
|
|
|
|
|
elif treat_as_gen:
|
2024-04-02 17:41:52 +00:00
|
|
|
await chan.send(
|
|
|
|
|
StartAck(
|
|
|
|
|
cid=cid,
|
|
|
|
|
functype='asyncgen',
|
|
|
|
|
)
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
with cancel_scope as cs:
|
|
|
|
|
ctx._scope = cs
|
|
|
|
|
task_status.started(ctx)
|
|
|
|
|
await coro
|
|
|
|
|
|
|
|
|
|
if not cs.cancelled_caught:
|
|
|
|
|
# task was not cancelled so we can instruct the
|
|
|
|
|
# far end async gen to tear down
|
2024-04-02 17:41:52 +00:00
|
|
|
await chan.send(
|
|
|
|
|
Stop(cid=cid)
|
|
|
|
|
)
|
2024-06-28 23:06:17 +00:00
|
|
|
|
|
|
|
|
# simplest function/method request-response pattern
|
|
|
|
|
# XXX: in the most minimally used case, just a scheduled internal runtime
|
|
|
|
|
# call to `Actor._cancel_task()` from the ctx-peer task since we
|
|
|
|
|
# don't (yet) have a dedicated IPC msg.
|
|
|
|
|
# ------ - ------
|
2024-03-13 19:57:15 +00:00
|
|
|
else:
|
|
|
|
|
failed_resp: bool = False
|
|
|
|
|
try:
|
2024-04-25 16:33:10 +00:00
|
|
|
ack = StartAck(
|
|
|
|
|
cid=cid,
|
|
|
|
|
functype='asyncfunc',
|
2024-04-02 17:41:52 +00:00
|
|
|
)
|
2024-04-25 16:33:10 +00:00
|
|
|
await chan.send(ack)
|
2024-03-13 19:57:15 +00:00
|
|
|
except (
|
|
|
|
|
trio.ClosedResourceError,
|
|
|
|
|
trio.BrokenResourceError,
|
|
|
|
|
BrokenPipeError,
|
|
|
|
|
) as ipc_err:
|
|
|
|
|
failed_resp = True
|
|
|
|
|
if is_rpc:
|
2024-04-25 16:33:10 +00:00
|
|
|
raise ipc_err
|
2024-03-13 19:57:15 +00:00
|
|
|
else:
|
2024-04-25 16:33:10 +00:00
|
|
|
log.exception(
|
2024-06-14 19:49:30 +00:00
|
|
|
f'Failed to ack runtime RPC request\n\n'
|
|
|
|
|
f'{func} x=> {ctx.chan}\n\n'
|
2024-04-25 16:33:10 +00:00
|
|
|
f'{ack}\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with cancel_scope as cs:
|
|
|
|
|
ctx._scope: CancelScope = cs
|
|
|
|
|
task_status.started(ctx)
|
|
|
|
|
result = await coro
|
|
|
|
|
fname: str = func.__name__
|
2025-06-29 18:47:03 +00:00
|
|
|
|
|
|
|
|
op_nested_task: str = _pformat.nest_from_op(
|
|
|
|
|
input_op=f')> cid: {ctx.cid!r}',
|
|
|
|
|
text=f'{ctx._task}',
|
|
|
|
|
nest_indent=1, # under >
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
log.runtime(
|
2025-06-29 18:47:03 +00:00
|
|
|
f'RPC task complete\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'{op_nested_task}\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f')> {fname}() -> {pformat(result)}\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# NOTE: only send result if we know IPC isn't down
|
|
|
|
|
if (
|
|
|
|
|
not failed_resp
|
|
|
|
|
and chan.connected()
|
|
|
|
|
):
|
|
|
|
|
try:
|
2024-05-28 13:36:26 +00:00
|
|
|
ret_msg = return_msg_type(
|
2024-04-25 16:33:10 +00:00
|
|
|
cid=cid,
|
|
|
|
|
pld=result,
|
2024-04-02 17:41:52 +00:00
|
|
|
)
|
2024-04-25 16:33:10 +00:00
|
|
|
await chan.send(ret_msg)
|
2024-03-13 19:57:15 +00:00
|
|
|
except (
|
|
|
|
|
BrokenPipeError,
|
|
|
|
|
trio.BrokenResourceError,
|
|
|
|
|
):
|
|
|
|
|
log.warning(
|
2024-04-25 16:33:10 +00:00
|
|
|
'Failed to send RPC result?\n'
|
|
|
|
|
f'|_{func}@{actor.uid}() -> {ret_msg}\n\n'
|
|
|
|
|
f'x=> peer: {chan.uid}\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@acm
|
|
|
|
|
async def _errors_relayed_via_ipc(
|
|
|
|
|
actor: Actor,
|
|
|
|
|
chan: Channel,
|
|
|
|
|
ctx: Context,
|
|
|
|
|
is_rpc: bool,
|
|
|
|
|
|
2025-06-17 16:30:59 +00:00
|
|
|
hide_tb: bool = True,
|
2024-03-13 19:57:15 +00:00
|
|
|
debug_kbis: bool = False,
|
|
|
|
|
task_status: TaskStatus[
|
|
|
|
|
Context | BaseException
|
|
|
|
|
] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
|
|
|
|
|
) -> None:
|
2024-05-20 20:18:42 +00:00
|
|
|
# NOTE: we normally always hide this frame in call-stack tracebacks
|
|
|
|
|
# if the crash originated from an RPC task (since normally the
|
|
|
|
|
# user is only going to care about their own code not this
|
|
|
|
|
# internal runtime frame) and we DID NOT
|
|
|
|
|
# fail due to an IPC transport error!
|
2024-04-25 16:33:10 +00:00
|
|
|
__tracebackhide__: bool = hide_tb
|
2024-05-20 20:18:42 +00:00
|
|
|
|
2024-05-07 13:20:43 +00:00
|
|
|
# TODO: a debug nursery when in debug mode!
|
|
|
|
|
# async with maybe_open_debugger_nursery() as debug_tn:
|
2025-05-13 16:13:12 +00:00
|
|
|
# => see matching comment in side `.debug._pause()`
|
2024-05-20 20:18:42 +00:00
|
|
|
rpc_err: BaseException|None = None
|
2024-03-13 19:57:15 +00:00
|
|
|
try:
|
|
|
|
|
yield # run RPC invoke body
|
|
|
|
|
|
2026-02-19 21:02:05 +00:00
|
|
|
# NOTE, never REPL any pseudo-expected tpt-disconnect.
|
|
|
|
|
except TransportClosed as err:
|
|
|
|
|
rpc_err = err
|
|
|
|
|
log.warning(
|
|
|
|
|
f'Tpt disconnect during remote-exc relay due to,\n'
|
|
|
|
|
f'{err!r}\n'
|
|
|
|
|
)
|
|
|
|
|
raise err
|
2025-08-19 16:58:31 +00:00
|
|
|
|
2024-03-13 19:57:15 +00:00
|
|
|
# box and ship RPC errors for wire-transit via
|
|
|
|
|
# the task's requesting parent IPC-channel.
|
|
|
|
|
except (
|
|
|
|
|
Exception,
|
|
|
|
|
BaseExceptionGroup,
|
|
|
|
|
KeyboardInterrupt,
|
|
|
|
|
) as err:
|
2024-05-20 20:18:42 +00:00
|
|
|
rpc_err = err
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# TODO: maybe we'll want different "levels" of debugging
|
|
|
|
|
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
2024-05-07 13:20:43 +00:00
|
|
|
#
|
|
|
|
|
# -[ ] this if check is duplicate with `._maybe_enter_pm()`..
|
2024-03-13 19:57:15 +00:00
|
|
|
if not is_multi_cancelled(err):
|
|
|
|
|
entered_debug: bool = False
|
|
|
|
|
if (
|
|
|
|
|
(
|
|
|
|
|
not isinstance(err, ContextCancelled)
|
|
|
|
|
or (
|
|
|
|
|
isinstance(err, ContextCancelled)
|
|
|
|
|
and ctx._cancel_called
|
|
|
|
|
|
|
|
|
|
# if the root blocks the debugger lock request from a child
|
|
|
|
|
# we will get a remote-cancelled condition.
|
|
|
|
|
and ctx._enter_debugger_on_cancel
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
and
|
|
|
|
|
(
|
|
|
|
|
not isinstance(err, KeyboardInterrupt)
|
|
|
|
|
or (
|
|
|
|
|
isinstance(err, KeyboardInterrupt)
|
|
|
|
|
and debug_kbis
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
):
|
|
|
|
|
# XXX QUESTION XXX: is there any case where we'll
|
|
|
|
|
# want to debug IPC disconnects as a default?
|
|
|
|
|
# => I can't think of a reason that inspecting this
|
|
|
|
|
# type of failure will be useful for respawns or
|
|
|
|
|
# recovery logic - the only case is some kind of
|
|
|
|
|
# strange bug in our transport layer itself? Going
|
|
|
|
|
# to keep this open ended for now.
|
2025-08-19 16:58:31 +00:00
|
|
|
|
|
|
|
|
if _state.debug_mode():
|
|
|
|
|
log.exception(
|
|
|
|
|
f'RPC task crashed!\n'
|
|
|
|
|
f'Attempting to enter debugger\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'{ctx}'
|
|
|
|
|
)
|
|
|
|
|
|
2025-05-13 16:13:12 +00:00
|
|
|
entered_debug = await debug._maybe_enter_pm(
|
2024-05-07 13:20:43 +00:00
|
|
|
err,
|
|
|
|
|
api_frame=inspect.currentframe(),
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
if not entered_debug:
|
2024-05-20 20:18:42 +00:00
|
|
|
# if we prolly should have entered the REPL but
|
|
|
|
|
# didn't, maybe there was an internal error in
|
|
|
|
|
# the above code and we do want to show this
|
|
|
|
|
# frame!
|
|
|
|
|
if _state.debug_mode():
|
|
|
|
|
__tracebackhide__: bool = False
|
|
|
|
|
|
2024-03-18 14:21:37 +00:00
|
|
|
log.exception(
|
|
|
|
|
'RPC task crashed\n'
|
|
|
|
|
f'|_{ctx}'
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# ALWAYS try to ship RPC errors back to parent/caller task
|
2024-03-13 19:57:15 +00:00
|
|
|
if is_rpc:
|
2024-04-25 16:33:10 +00:00
|
|
|
|
2024-03-13 19:57:15 +00:00
|
|
|
# TODO: tests for this scenario:
|
|
|
|
|
# - RPC caller closes connection before getting a response
|
2024-04-25 16:33:10 +00:00
|
|
|
# should **not** crash this actor..
|
2024-03-13 19:57:15 +00:00
|
|
|
await try_ship_error_to_remote(
|
|
|
|
|
chan,
|
|
|
|
|
err,
|
|
|
|
|
cid=ctx.cid,
|
|
|
|
|
remote_descr='caller',
|
|
|
|
|
hide_tb=hide_tb,
|
|
|
|
|
)
|
|
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# if the ctx cs is NOT allocated, the error is likely from
|
|
|
|
|
# above `coro` invocation machinery NOT from inside the
|
|
|
|
|
# `coro` itself, i.e. err is NOT a user application error.
|
2024-03-13 19:57:15 +00:00
|
|
|
if ctx._scope is None:
|
|
|
|
|
# we don't ever raise directly here to allow the
|
|
|
|
|
# msg-loop-scheduler to continue running for this
|
|
|
|
|
# channel.
|
|
|
|
|
task_status.started(err)
|
|
|
|
|
|
2024-06-28 23:06:17 +00:00
|
|
|
# always propagate KBIs at the sys-process level.
|
|
|
|
|
if (
|
|
|
|
|
isinstance(err, KeyboardInterrupt)
|
|
|
|
|
|
|
|
|
|
# ?TODO? except when running in asyncio mode?
|
|
|
|
|
# |_ wut if you want to open a `@context` FROM an
|
|
|
|
|
# infected_aio task?
|
|
|
|
|
# and not actor.is_infected_aio()
|
|
|
|
|
):
|
2024-03-13 19:57:15 +00:00
|
|
|
raise
|
|
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# RPC task bookeeping.
|
|
|
|
|
# since RPC tasks are scheduled inside a flat
|
Heh, add back `Actor._root_tn`, it has purpose..
Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.
Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.
Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
`.trionics.maybe_open_nursery()` calls against optionally passed
`._[root/service]_tn` are allocated-if-not-provided (the
`._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
Bp).
- obvi adjust all internal usage to match new naming.
Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
service-tn and is either,
+ assigned in `._runtime.async_main()` for sub-actors OR,
+ assigned in `._root.open_root_actor()` for the root actor.
**THE primary reason** to keep this "upper" tn is that during
a full-`Actor`-cancellation condition (more details below) we want to
ensure that the IPC connection with a sub-actor's parent is **the last
thing to be cancelled**; this is most simply implemented by ensuring
that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
"policy" by scheduling a task in the `._root_tn` which,
* waits for the `._service_tn` to complete and then,
* cancels the `._root_tn.cancel_scope`,
* includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
|
|
|
# `Actor._service_tn`, we add "handles" to each such that
|
2024-04-25 16:33:10 +00:00
|
|
|
# they can be individually ccancelled.
|
2024-03-13 19:57:15 +00:00
|
|
|
finally:
|
2024-05-20 20:18:42 +00:00
|
|
|
|
2025-06-17 16:30:59 +00:00
|
|
|
# if the error is not from user code and instead a failure of
|
|
|
|
|
# an internal-runtime-RPC or IPC-connection, we do (prolly) want
|
|
|
|
|
# to show this frame!
|
2024-05-20 20:18:42 +00:00
|
|
|
if (
|
|
|
|
|
rpc_err
|
|
|
|
|
and (
|
|
|
|
|
not is_rpc
|
|
|
|
|
or
|
|
|
|
|
not chan.connected()
|
|
|
|
|
)
|
|
|
|
|
):
|
|
|
|
|
__tracebackhide__: bool = False
|
|
|
|
|
|
2024-03-13 19:57:15 +00:00
|
|
|
try:
|
2024-04-25 16:33:10 +00:00
|
|
|
ctx: Context
|
|
|
|
|
func: Callable
|
|
|
|
|
is_complete: trio.Event
|
|
|
|
|
(
|
|
|
|
|
ctx,
|
|
|
|
|
func,
|
|
|
|
|
is_complete,
|
|
|
|
|
) = actor._rpc_tasks.pop(
|
2024-03-13 19:57:15 +00:00
|
|
|
(chan, ctx.cid)
|
|
|
|
|
)
|
|
|
|
|
is_complete.set()
|
|
|
|
|
|
|
|
|
|
except KeyError:
|
2024-04-25 16:33:10 +00:00
|
|
|
# If we're cancelled before the task returns then the
|
|
|
|
|
# cancel scope will not have been inserted yet
|
2024-03-13 19:57:15 +00:00
|
|
|
if is_rpc:
|
|
|
|
|
log.warning(
|
2026-02-19 21:02:05 +00:00
|
|
|
'RPC task likely crashed or cancelled before start?\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
f'|_{ctx._task}\n'
|
|
|
|
|
f' >> {ctx.repr_rpc}\n'
|
|
|
|
|
)
|
2024-05-28 20:03:36 +00:00
|
|
|
# TODO: remove this right? rn the only non-`is_rpc` cases
|
|
|
|
|
# are cancellation methods and according the RPC loop eps
|
|
|
|
|
# for thoses below, nothing is ever registered in
|
|
|
|
|
# `Actor._rpc_tasks` for those cases.. but should we?
|
|
|
|
|
#
|
|
|
|
|
# -[ ] maybe we should have an equiv `Actor._runtime_rpc_tasks`?
|
|
|
|
|
# else:
|
|
|
|
|
# log.cancel(
|
|
|
|
|
# 'Failed to de-alloc internal runtime cancel task?\n'
|
|
|
|
|
# f'|_{ctx._task}\n'
|
|
|
|
|
# f' >> {ctx.repr_rpc}\n'
|
|
|
|
|
# )
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
if not actor._rpc_tasks:
|
2024-04-25 16:33:10 +00:00
|
|
|
log.runtime('All RPC tasks have completed')
|
2024-03-13 19:57:15 +00:00
|
|
|
actor._ongoing_rpc_tasks.set()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _invoke(
|
|
|
|
|
actor: Actor,
|
|
|
|
|
cid: str,
|
|
|
|
|
chan: Channel,
|
|
|
|
|
func: Callable,
|
|
|
|
|
kwargs: dict[str, Any],
|
|
|
|
|
|
|
|
|
|
is_rpc: bool = True,
|
2025-08-19 16:58:31 +00:00
|
|
|
hide_tb: bool = False,
|
2024-05-28 13:36:26 +00:00
|
|
|
return_msg_type: Return|CancelAck = Return,
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
task_status: TaskStatus[
|
|
|
|
|
Context | BaseException
|
|
|
|
|
] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
):
|
|
|
|
|
'''
|
|
|
|
|
Schedule a `trio` task-as-func and deliver result(s) over
|
|
|
|
|
connected IPC channel.
|
|
|
|
|
|
|
|
|
|
This is the core "RPC" `trio.Task` scheduling machinery used to start every
|
Heh, add back `Actor._root_tn`, it has purpose..
Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.
Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.
Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
`.trionics.maybe_open_nursery()` calls against optionally passed
`._[root/service]_tn` are allocated-if-not-provided (the
`._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
Bp).
- obvi adjust all internal usage to match new naming.
Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
service-tn and is either,
+ assigned in `._runtime.async_main()` for sub-actors OR,
+ assigned in `._root.open_root_actor()` for the root actor.
**THE primary reason** to keep this "upper" tn is that during
a full-`Actor`-cancellation condition (more details below) we want to
ensure that the IPC connection with a sub-actor's parent is **the last
thing to be cancelled**; this is most simply implemented by ensuring
that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
"policy" by scheduling a task in the `._root_tn` which,
* waits for the `._service_tn` to complete and then,
* cancels the `._root_tn.cancel_scope`,
* includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
|
|
|
remotely invoked function, normally in `Actor._service_tn: Nursery`.
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
__tracebackhide__: bool = hide_tb
|
|
|
|
|
treat_as_gen: bool = False
|
|
|
|
|
|
2024-04-14 23:31:50 +00:00
|
|
|
if (
|
|
|
|
|
_state.debug_mode()
|
|
|
|
|
and
|
|
|
|
|
_state._runtime_vars['use_greenback']
|
|
|
|
|
):
|
2024-03-22 20:41:49 +00:00
|
|
|
# XXX for .pause_from_sync()` usage we need to make sure
|
|
|
|
|
# `greenback` is boostrapped in the subactor!
|
2025-05-13 16:13:12 +00:00
|
|
|
await debug.maybe_init_greenback()
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# TODO: possibly a specially formatted traceback
|
|
|
|
|
# (not sure what typing is for this..)?
|
2024-04-25 16:33:10 +00:00
|
|
|
# tb: TracebackType = None
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
cancel_scope = CancelScope()
|
|
|
|
|
ctx = actor.get_context(
|
|
|
|
|
chan=chan,
|
|
|
|
|
cid=cid,
|
|
|
|
|
nsf=NamespacePath.from_ref(func),
|
|
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# NOTE: no portal passed bc this is the "child"-side
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# We shouldn't ever need to pass this through right?
|
|
|
|
|
# it's up to the soon-to-be called rpc task to
|
|
|
|
|
# open the stream with this option.
|
|
|
|
|
# allow_overruns=True,
|
|
|
|
|
)
|
2024-05-20 20:18:42 +00:00
|
|
|
context_ep_func: bool = False
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-05-20 20:18:42 +00:00
|
|
|
# set the current IPC ctx var for this RPC task
|
|
|
|
|
_state._ctxvar_Context.set(ctx)
|
2024-05-07 13:20:43 +00:00
|
|
|
|
2024-03-13 19:57:15 +00:00
|
|
|
# TODO: deprecate this style..
|
|
|
|
|
if getattr(func, '_tractor_stream_function', False):
|
|
|
|
|
# handle decorated ``@tractor.stream`` async functions
|
|
|
|
|
sig = inspect.signature(func)
|
|
|
|
|
params = sig.parameters
|
|
|
|
|
|
|
|
|
|
# compat with old api
|
|
|
|
|
kwargs['ctx'] = ctx
|
|
|
|
|
treat_as_gen = True
|
|
|
|
|
|
|
|
|
|
if 'ctx' in params:
|
|
|
|
|
warnings.warn(
|
|
|
|
|
"`@tractor.stream decorated funcs should now declare "
|
|
|
|
|
"a `stream` arg, `ctx` is now designated for use with "
|
|
|
|
|
"@tractor.context",
|
|
|
|
|
DeprecationWarning,
|
|
|
|
|
stacklevel=2,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
elif 'stream' in params:
|
|
|
|
|
assert 'stream' in params
|
|
|
|
|
kwargs['stream'] = ctx
|
|
|
|
|
|
|
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# handle decorated ``@tractor.context`` async function
|
2024-06-17 02:50:43 +00:00
|
|
|
# - pull out any typed-pld-spec info and apply (below)
|
|
|
|
|
# - (TODO) store func-ref meta data for API-frame-info logging
|
|
|
|
|
elif (
|
|
|
|
|
ctx_meta := getattr(
|
|
|
|
|
func,
|
|
|
|
|
'_tractor_context_meta',
|
|
|
|
|
False,
|
|
|
|
|
)
|
|
|
|
|
):
|
|
|
|
|
# kwargs['ctx'] = ctx
|
|
|
|
|
# set the required `tractor.Context` typed input argument to
|
|
|
|
|
# the allocated RPC task context.
|
|
|
|
|
kwargs[ctx_meta['ctx_var_name']] = ctx
|
2024-05-20 20:18:42 +00:00
|
|
|
context_ep_func = True
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# errors raised inside this block are propgated back to caller
|
|
|
|
|
async with _errors_relayed_via_ipc(
|
|
|
|
|
actor,
|
|
|
|
|
chan,
|
|
|
|
|
ctx,
|
|
|
|
|
is_rpc,
|
|
|
|
|
hide_tb=hide_tb,
|
|
|
|
|
task_status=task_status,
|
|
|
|
|
):
|
|
|
|
|
if not (
|
2024-04-25 16:33:10 +00:00
|
|
|
inspect.isasyncgenfunction(func)
|
|
|
|
|
or
|
2024-03-13 19:57:15 +00:00
|
|
|
inspect.iscoroutinefunction(func)
|
|
|
|
|
):
|
|
|
|
|
raise TypeError(f'{func} must be an async function!')
|
|
|
|
|
|
|
|
|
|
# init coroutine with `kwargs` to immediately catch any
|
|
|
|
|
# type-sig errors.
|
|
|
|
|
try:
|
|
|
|
|
coro = func(**kwargs)
|
|
|
|
|
except TypeError:
|
|
|
|
|
raise
|
|
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# TODO: impl all these cases in terms of the `Context` one!
|
2024-05-20 20:18:42 +00:00
|
|
|
if not context_ep_func:
|
2024-03-13 19:57:15 +00:00
|
|
|
await _invoke_non_context(
|
|
|
|
|
actor,
|
|
|
|
|
cancel_scope,
|
|
|
|
|
ctx,
|
|
|
|
|
cid,
|
|
|
|
|
chan,
|
|
|
|
|
func,
|
|
|
|
|
coro,
|
|
|
|
|
kwargs,
|
|
|
|
|
treat_as_gen,
|
|
|
|
|
is_rpc,
|
2024-05-28 13:36:26 +00:00
|
|
|
return_msg_type,
|
2024-03-13 19:57:15 +00:00
|
|
|
task_status,
|
|
|
|
|
)
|
2024-04-25 16:33:10 +00:00
|
|
|
# XXX below fallthrough is ONLY for `@context` eps
|
2024-03-13 19:57:15 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# our most general case: a remote SC-transitive,
|
|
|
|
|
# IPC-linked, cross-actor-task "context"
|
|
|
|
|
# ------ - ------
|
|
|
|
|
# TODO: every other "func type" should be implemented from
|
|
|
|
|
# a special case of this impl eventually!
|
|
|
|
|
# -[ ] streaming funcs should instead of being async-for
|
|
|
|
|
# handled directly here wrapped in
|
|
|
|
|
# a async-with-open_stream() closure that does the
|
|
|
|
|
# normal thing you'd expect a far end streaming context
|
|
|
|
|
# to (if written by the app-dev).
|
|
|
|
|
# -[ ] one off async funcs can literally just be called
|
|
|
|
|
# here and awaited directly, possibly just with a small
|
|
|
|
|
# wrapper that calls `Context.started()` and then does
|
|
|
|
|
# the `await coro()`?
|
|
|
|
|
|
2024-04-14 23:31:50 +00:00
|
|
|
# ------ - ------
|
|
|
|
|
# a "context" endpoint is the most general and
|
|
|
|
|
# "least sugary" type of RPC with support for
|
2024-03-13 19:57:15 +00:00
|
|
|
# bi-dir streaming B)
|
2024-04-14 23:31:50 +00:00
|
|
|
#
|
|
|
|
|
# the concurrency relation is simlar to a task nursery
|
|
|
|
|
# wherein a "parent" task (the one that enters
|
|
|
|
|
# `trio.open_nursery()` in some actor "opens" (via
|
|
|
|
|
# `Portal.open_context()`) an IPC ctx to another peer
|
|
|
|
|
# (which is maybe a sub-) actor who then schedules (aka
|
|
|
|
|
# `trio.Nursery.start()`s) a new "child" task to execute
|
|
|
|
|
# the `@context` annotated func; that is this func we're
|
|
|
|
|
# running directly below!
|
|
|
|
|
# ------ - ------
|
|
|
|
|
#
|
|
|
|
|
# StartAck: respond immediately with endpoint info
|
2024-04-02 17:41:52 +00:00
|
|
|
await chan.send(
|
|
|
|
|
StartAck(
|
|
|
|
|
cid=cid,
|
|
|
|
|
functype='context',
|
|
|
|
|
)
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# TODO: should we also use an `.open_context()` equiv
|
2024-04-14 23:31:50 +00:00
|
|
|
# for this child side by factoring the impl from
|
2024-03-13 19:57:15 +00:00
|
|
|
# `Portal.open_context()` into a common helper?
|
|
|
|
|
#
|
|
|
|
|
# NOTE: there are many different ctx state details
|
2024-04-14 23:31:50 +00:00
|
|
|
# in a child side instance according to current impl:
|
2024-03-13 19:57:15 +00:00
|
|
|
# - `.cancelled_caught` can never be `True`.
|
|
|
|
|
# -> the below scope is never exposed to the
|
|
|
|
|
# `@context` marked RPC function.
|
|
|
|
|
# - `._portal` is never set.
|
2025-06-16 01:22:08 +00:00
|
|
|
scope_err: BaseException|None = None
|
2024-03-13 19:57:15 +00:00
|
|
|
try:
|
2025-06-16 01:22:08 +00:00
|
|
|
# TODO: better `trionics` primitive/tooling usage here!
|
|
|
|
|
# -[ ] should would be nice to have our `TaskMngr`
|
|
|
|
|
# nursery here!
|
|
|
|
|
# -[ ] payload value checking like we do with
|
|
|
|
|
# `.started()` such that the debbuger can engage
|
|
|
|
|
# here in the child task instead of waiting for the
|
|
|
|
|
# parent to crash with it's own MTE..
|
|
|
|
|
#
|
|
|
|
|
tn: Nursery
|
2024-06-28 23:06:17 +00:00
|
|
|
rpc_ctx_cs: CancelScope
|
2024-06-17 02:50:43 +00:00
|
|
|
async with (
|
2025-07-29 19:01:47 +00:00
|
|
|
collapse_eg(hide_tb=False),
|
2025-06-16 01:22:08 +00:00
|
|
|
trio.open_nursery() as tn,
|
2024-06-17 02:50:43 +00:00
|
|
|
msgops.maybe_limit_plds(
|
|
|
|
|
ctx=ctx,
|
|
|
|
|
spec=ctx_meta.get('pld_spec'),
|
|
|
|
|
dec_hook=ctx_meta.get('dec_hook'),
|
|
|
|
|
),
|
2025-06-16 01:22:08 +00:00
|
|
|
|
|
|
|
|
# XXX NOTE, this being the "most embedded"
|
|
|
|
|
# scope ensures unasking of the `await coro` below
|
|
|
|
|
# *should* never be interfered with!!
|
|
|
|
|
maybe_raise_from_masking_exc(
|
2025-08-19 17:04:46 +00:00
|
|
|
unmask_from=(Cancelled,),
|
2025-06-16 01:22:08 +00:00
|
|
|
) as _mbme, # maybe boxed masked exc
|
2024-06-17 02:50:43 +00:00
|
|
|
):
|
2024-03-13 19:57:15 +00:00
|
|
|
ctx._scope_nursery = tn
|
2024-06-28 23:06:17 +00:00
|
|
|
rpc_ctx_cs = ctx._scope = tn.cancel_scope
|
2024-03-13 19:57:15 +00:00
|
|
|
task_status.started(ctx)
|
|
|
|
|
|
2025-06-16 01:22:08 +00:00
|
|
|
# invoke user endpoint fn.
|
2024-05-28 13:36:26 +00:00
|
|
|
res: Any|PayloadT = await coro
|
|
|
|
|
return_msg: Return|CancelAck = return_msg_type(
|
|
|
|
|
cid=cid,
|
|
|
|
|
pld=res,
|
2024-04-02 17:41:52 +00:00
|
|
|
)
|
2024-05-28 13:36:26 +00:00
|
|
|
# set and shuttle final result to "parent"-side task.
|
|
|
|
|
ctx._result = res
|
2025-03-12 20:41:42 +00:00
|
|
|
log.runtime(
|
|
|
|
|
f'Sending result msg and exiting {ctx.side!r}\n'
|
2025-07-07 15:02:47 +00:00
|
|
|
f'\n'
|
|
|
|
|
f'{pretty_struct.pformat(return_msg)}\n'
|
2025-03-12 20:41:42 +00:00
|
|
|
)
|
2025-08-15 18:05:46 +00:00
|
|
|
await chan.send(return_msg)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
|
|
|
|
# called by any of,
|
2024-04-14 23:31:50 +00:00
|
|
|
# - *this* child task manually calling `ctx.cancel()`.
|
2024-03-13 19:57:15 +00:00
|
|
|
# - the runtime calling `ctx._deliver_msg()` which
|
|
|
|
|
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
|
|
|
|
# which cancels the scope presuming the input error
|
|
|
|
|
# is not a `.cancel_acked` pleaser.
|
2024-06-28 23:06:17 +00:00
|
|
|
if rpc_ctx_cs.cancelled_caught:
|
2024-03-13 19:57:15 +00:00
|
|
|
our_uid: tuple = actor.uid
|
|
|
|
|
|
|
|
|
|
# first check for and raise any remote error
|
|
|
|
|
# before raising any context cancelled case
|
|
|
|
|
# so that real remote errors don't get masked as
|
|
|
|
|
# ``ContextCancelled``s.
|
|
|
|
|
if re := ctx._remote_error:
|
|
|
|
|
ctx._maybe_raise_remote_err(re)
|
|
|
|
|
|
2024-06-28 23:06:17 +00:00
|
|
|
if rpc_ctx_cs.cancel_called:
|
2024-03-13 19:57:15 +00:00
|
|
|
canceller: tuple = ctx.canceller
|
2024-04-25 20:19:39 +00:00
|
|
|
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# NOTE / TODO: if we end up having
|
|
|
|
|
# ``Actor._cancel_task()`` call
|
|
|
|
|
# ``Context.cancel()`` directly, we're going to
|
|
|
|
|
# need to change this logic branch since it
|
|
|
|
|
# will always enter..
|
|
|
|
|
if ctx._cancel_called:
|
|
|
|
|
# TODO: test for this!!!!!
|
|
|
|
|
canceller: tuple = our_uid
|
2024-04-25 20:19:39 +00:00
|
|
|
explain += 'itself '
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# if the channel which spawned the ctx is the
|
|
|
|
|
# one that cancelled it then we report that, vs.
|
|
|
|
|
# it being some other random actor that for ex.
|
|
|
|
|
# some actor who calls `Portal.cancel_actor()`
|
|
|
|
|
# and by side-effect cancels this ctx.
|
2024-04-25 20:19:39 +00:00
|
|
|
#
|
|
|
|
|
# TODO: determine if the ctx peer task was the
|
|
|
|
|
# exact task which cancelled, vs. some other
|
|
|
|
|
# task in the same actor.
|
2024-03-13 19:57:15 +00:00
|
|
|
elif canceller == ctx.chan.uid:
|
2024-04-25 20:19:39 +00:00
|
|
|
explain += f'its {ctx.peer_side!r}-side peer'
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-06-28 23:06:17 +00:00
|
|
|
elif canceller == our_uid:
|
|
|
|
|
explain += 'itself'
|
|
|
|
|
|
|
|
|
|
elif canceller:
|
2024-04-25 20:19:39 +00:00
|
|
|
explain += 'a remote peer'
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-06-28 23:06:17 +00:00
|
|
|
else:
|
|
|
|
|
explain += 'an unknown cause?'
|
|
|
|
|
|
2024-04-25 20:19:39 +00:00
|
|
|
explain += (
|
2024-05-07 13:20:43 +00:00
|
|
|
add_div(message=explain)
|
|
|
|
|
+
|
2024-03-13 19:57:15 +00:00
|
|
|
f'<= canceller: {canceller}\n'
|
2024-04-25 20:19:39 +00:00
|
|
|
f'=> cancellee: {our_uid}\n'
|
|
|
|
|
# TODO: better repr for ctx tasks..
|
|
|
|
|
f' |_{ctx.side!r} {ctx._task}'
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# TODO: instead just show the
|
|
|
|
|
# ctx.__str__() here?
|
|
|
|
|
# -[ ] textwrap.indent() it correctly!
|
|
|
|
|
# -[ ] BUT we need to wait until
|
|
|
|
|
# the state is filled out before emitting
|
|
|
|
|
# this msg right ow its kinda empty? bleh..
|
|
|
|
|
#
|
|
|
|
|
# f' |_{ctx}'
|
|
|
|
|
)
|
|
|
|
|
|
2024-04-14 23:31:50 +00:00
|
|
|
# task-contex was either cancelled by request
|
|
|
|
|
# using ``Portal.cancel_actor()`` or
|
|
|
|
|
# ``Context.cancel()`` on the far end, or it
|
|
|
|
|
# was cancelled by the local child (or callee)
|
|
|
|
|
# task, so relay this cancel signal to the
|
2024-03-13 19:57:15 +00:00
|
|
|
# other side.
|
|
|
|
|
ctxc = ContextCancelled(
|
2024-04-25 20:19:39 +00:00
|
|
|
message=explain,
|
2024-03-18 14:21:37 +00:00
|
|
|
boxed_type=trio.Cancelled,
|
2024-03-13 19:57:15 +00:00
|
|
|
canceller=canceller,
|
|
|
|
|
)
|
|
|
|
|
raise ctxc
|
|
|
|
|
|
|
|
|
|
# XXX: do we ever trigger this block any more?
|
|
|
|
|
except (
|
|
|
|
|
BaseExceptionGroup,
|
|
|
|
|
BaseException,
|
2025-02-26 00:37:30 +00:00
|
|
|
trio.Cancelled,
|
2025-06-16 01:22:08 +00:00
|
|
|
) as _scope_err:
|
|
|
|
|
scope_err = _scope_err
|
2024-05-07 13:20:43 +00:00
|
|
|
if (
|
2025-06-16 01:22:08 +00:00
|
|
|
isinstance(scope_err, RuntimeError)
|
|
|
|
|
and
|
|
|
|
|
scope_err.args
|
|
|
|
|
and
|
|
|
|
|
'Cancel scope stack corrupted' in scope_err.args[0]
|
2024-05-07 13:20:43 +00:00
|
|
|
):
|
|
|
|
|
log.exception('Cancel scope stack corrupted!?\n')
|
2025-05-13 16:13:12 +00:00
|
|
|
# debug.mk_pdb().set_trace()
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-04-14 23:31:50 +00:00
|
|
|
# always set this (child) side's exception as the
|
2024-03-13 19:57:15 +00:00
|
|
|
# local error on the context
|
2025-06-16 01:22:08 +00:00
|
|
|
ctx._local_error: BaseException = scope_err
|
2024-06-14 19:49:30 +00:00
|
|
|
# ^-TODO-^ question,
|
|
|
|
|
# does this matter other then for
|
|
|
|
|
# consistentcy/testing?
|
|
|
|
|
# |_ no user code should be in this scope at this point
|
|
|
|
|
# AND we already set this in the block below?
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2025-06-16 01:22:08 +00:00
|
|
|
# XXX if a remote error was set then likely the
|
|
|
|
|
# exc group was raised due to that, so
|
2024-03-13 19:57:15 +00:00
|
|
|
# and we instead raise that error immediately!
|
2025-06-16 01:22:08 +00:00
|
|
|
maybe_re: (
|
|
|
|
|
ContextCancelled|RemoteActorError
|
|
|
|
|
) = ctx.maybe_raise()
|
|
|
|
|
if maybe_re:
|
|
|
|
|
log.cancel(
|
|
|
|
|
f'Suppressing remote-exc from peer,\n'
|
|
|
|
|
f'{maybe_re!r}\n'
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# maybe TODO: pack in come kinda
|
|
|
|
|
# `trio.Cancelled.__traceback__` here so they can be
|
|
|
|
|
# unwrapped and displayed on the caller side? no se..
|
2025-06-16 01:22:08 +00:00
|
|
|
raise scope_err
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# `@context` entrypoint task bookeeping.
|
|
|
|
|
# i.e. only pop the context tracking if used ;)
|
|
|
|
|
finally:
|
2025-06-16 01:22:08 +00:00
|
|
|
assert chan.aid
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
# don't pop the local context until we know the
|
|
|
|
|
# associated child isn't in debug any more
|
2025-05-13 16:13:12 +00:00
|
|
|
await debug.maybe_wait_for_debugger()
|
2024-03-13 19:57:15 +00:00
|
|
|
ctx: Context = actor._contexts.pop((
|
|
|
|
|
chan.uid,
|
|
|
|
|
cid,
|
|
|
|
|
))
|
|
|
|
|
|
2024-05-31 18:40:55 +00:00
|
|
|
logmeth: Callable = log.runtime
|
2024-03-13 19:57:15 +00:00
|
|
|
merr: Exception|None = ctx.maybe_error
|
2024-06-14 19:49:30 +00:00
|
|
|
message: str = 'IPC context terminated '
|
|
|
|
|
descr_str: str = (
|
|
|
|
|
f'after having {ctx.repr_state!r}\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
2024-05-07 13:20:43 +00:00
|
|
|
if merr:
|
2024-06-14 19:49:30 +00:00
|
|
|
logmeth: Callable = log.error
|
2025-07-29 19:01:47 +00:00
|
|
|
if (
|
|
|
|
|
# ctxc: by `Context.cancel()`
|
|
|
|
|
isinstance(merr, ContextCancelled)
|
2024-06-14 19:49:30 +00:00
|
|
|
|
2025-07-29 19:01:47 +00:00
|
|
|
# out-of-layer cancellation, one of:
|
|
|
|
|
# - actorc: by `Portal.cancel_actor()`
|
|
|
|
|
# - OSc: by SIGINT or `Process.signal()`
|
|
|
|
|
or (
|
|
|
|
|
isinstance(merr, trio.Cancelled)
|
|
|
|
|
and
|
|
|
|
|
ctx.canceller
|
|
|
|
|
)
|
|
|
|
|
):
|
|
|
|
|
logmeth: Callable = log.cancel
|
|
|
|
|
descr_str += (
|
|
|
|
|
f' with {merr!r}\n'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
elif (
|
|
|
|
|
not isinstance(merr, RemoteActorError)
|
|
|
|
|
):
|
|
|
|
|
tb_str: str = ''.join(
|
|
|
|
|
traceback.format_exception(merr)
|
|
|
|
|
)
|
2024-06-14 19:49:30 +00:00
|
|
|
descr_str += (
|
|
|
|
|
f'\n{merr!r}\n' # needed?
|
|
|
|
|
f'{tb_str}\n'
|
|
|
|
|
)
|
2024-05-31 18:40:55 +00:00
|
|
|
else:
|
2025-07-29 19:01:47 +00:00
|
|
|
descr_str += (
|
|
|
|
|
f'{merr!r}\n'
|
|
|
|
|
)
|
2024-06-14 19:49:30 +00:00
|
|
|
else:
|
2025-07-29 19:01:47 +00:00
|
|
|
descr_str += (
|
|
|
|
|
f'\n'
|
|
|
|
|
f'with final result {ctx.outcome!r}\n'
|
|
|
|
|
)
|
2024-05-31 18:40:55 +00:00
|
|
|
|
2024-06-14 19:49:30 +00:00
|
|
|
logmeth(
|
2025-07-07 15:02:47 +00:00
|
|
|
f'{message}\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'{descr_str}\n'
|
2024-06-14 19:49:30 +00:00
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
async def try_ship_error_to_remote(
|
|
|
|
|
channel: Channel,
|
|
|
|
|
err: Exception|BaseExceptionGroup,
|
|
|
|
|
|
|
|
|
|
cid: str|None = None,
|
|
|
|
|
remote_descr: str = 'parent',
|
|
|
|
|
hide_tb: bool = True,
|
|
|
|
|
|
|
|
|
|
) -> None:
|
|
|
|
|
'''
|
|
|
|
|
Box, pack and encode a local runtime(-internal) exception for
|
|
|
|
|
an IPC channel `.send()` with transport/network failures and
|
|
|
|
|
local cancellation ignored but logged as critical(ly bad).
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
__tracebackhide__: bool = hide_tb
|
|
|
|
|
with CancelScope(shield=True):
|
|
|
|
|
try:
|
|
|
|
|
# NOTE: normally only used for internal runtime errors
|
|
|
|
|
# so ship to peer actor without a cid.
|
2024-04-02 17:41:52 +00:00
|
|
|
# msg: dict = pack_error(
|
|
|
|
|
msg: Error = pack_error(
|
2024-03-13 19:57:15 +00:00
|
|
|
err,
|
|
|
|
|
cid=cid,
|
|
|
|
|
|
|
|
|
|
# TODO: special tb fmting for ctxc cases?
|
|
|
|
|
# tb=tb,
|
|
|
|
|
)
|
|
|
|
|
await channel.send(msg)
|
|
|
|
|
|
|
|
|
|
# XXX NOTE XXX in SC terms this is one of the worst things
|
|
|
|
|
# that can happen and provides for a 2-general's dilemma..
|
2026-02-19 18:10:02 +00:00
|
|
|
#
|
|
|
|
|
# FURHTER, we should never really have to handle these
|
|
|
|
|
# lowlevel excs from `trio` since the `Channel.send()` layers
|
|
|
|
|
# downward should be mostly wrapping such cases in a
|
|
|
|
|
# tpt-closed; the `.critical()` usage is warranted.
|
2024-03-13 19:57:15 +00:00
|
|
|
except (
|
|
|
|
|
trio.ClosedResourceError,
|
|
|
|
|
trio.BrokenResourceError,
|
|
|
|
|
BrokenPipeError,
|
|
|
|
|
):
|
|
|
|
|
log.critical(
|
|
|
|
|
'IPC transport failure -> '
|
|
|
|
|
f'failed to ship error to {remote_descr}!\n\n'
|
2025-03-03 17:17:51 +00:00
|
|
|
f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.uid}\n'
|
2025-02-26 18:16:15 +00:00
|
|
|
f'\n'
|
2024-04-07 14:40:01 +00:00
|
|
|
# TODO: use `.msg.preetty_struct` for this!
|
2024-04-02 17:41:52 +00:00
|
|
|
f'{msg}\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
2024-05-30 20:02:25 +00:00
|
|
|
except BaseException:
|
|
|
|
|
log.exception(
|
|
|
|
|
'Errored while attempting error shipment?'
|
|
|
|
|
)
|
|
|
|
|
__tracebackhide__: bool = False
|
|
|
|
|
raise
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
async def process_messages(
|
|
|
|
|
chan: Channel,
|
|
|
|
|
shield: bool = False,
|
|
|
|
|
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
|
2024-04-30 16:15:46 +00:00
|
|
|
) -> (
|
|
|
|
|
bool, # chan diconnected
|
|
|
|
|
MsgType, # last msg
|
|
|
|
|
):
|
2024-03-13 19:57:15 +00:00
|
|
|
'''
|
|
|
|
|
This is the low-level, per-IPC-channel, RPC task scheduler loop.
|
|
|
|
|
|
|
|
|
|
Receive (multiplexed) per-`Channel` RPC requests as msgs from
|
|
|
|
|
remote processes; schedule target async funcs as local
|
Heh, add back `Actor._root_tn`, it has purpose..
Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.
Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.
Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
`.trionics.maybe_open_nursery()` calls against optionally passed
`._[root/service]_tn` are allocated-if-not-provided (the
`._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
Bp).
- obvi adjust all internal usage to match new naming.
Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
service-tn and is either,
+ assigned in `._runtime.async_main()` for sub-actors OR,
+ assigned in `._root.open_root_actor()` for the root actor.
**THE primary reason** to keep this "upper" tn is that during
a full-`Actor`-cancellation condition (more details below) we want to
ensure that the IPC connection with a sub-actor's parent is **the last
thing to be cancelled**; this is most simply implemented by ensuring
that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
"policy" by scheduling a task in the `._root_tn` which,
* waits for the `._service_tn` to complete and then,
* cancels the `._root_tn.cancel_scope`,
* includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
|
|
|
`trio.Task`s inside the `Actor._service_tn: Nursery`.
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
Depending on msg type, non-`cmd` (task spawning/starting)
|
|
|
|
|
request payloads (eg. `started`, `yield`, `return`, `error`)
|
|
|
|
|
are delivered to locally running, linked-via-`Context`, tasks
|
|
|
|
|
with any (boxed) errors and/or final results shipped back to
|
|
|
|
|
the remote side.
|
|
|
|
|
|
|
|
|
|
All higher level inter-actor comms ops are delivered in some
|
|
|
|
|
form by the msg processing here, including:
|
|
|
|
|
|
|
|
|
|
- lookup and invocation of any (async) funcs-as-tasks requested
|
|
|
|
|
by remote actors presuming the local actor has enabled their
|
|
|
|
|
containing module.
|
|
|
|
|
|
|
|
|
|
- IPC-session oriented `Context` and `MsgStream` msg payload
|
|
|
|
|
delivery such as `started`, `yield` and `return` msgs.
|
|
|
|
|
|
|
|
|
|
- cancellation handling for both `Context.cancel()` (which
|
|
|
|
|
translate to `Actor._cancel_task()` RPCs server side)
|
|
|
|
|
and `Actor.cancel()` process-wide-runtime-shutdown requests
|
|
|
|
|
(as utilized inside `Portal.cancel_actor()` ).
|
|
|
|
|
|
|
|
|
|
'''
|
2025-06-11 20:44:47 +00:00
|
|
|
actor: Actor = _state.current_actor()
|
Heh, add back `Actor._root_tn`, it has purpose..
Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.
Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.
Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
`.trionics.maybe_open_nursery()` calls against optionally passed
`._[root/service]_tn` are allocated-if-not-provided (the
`._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
Bp).
- obvi adjust all internal usage to match new naming.
Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
service-tn and is either,
+ assigned in `._runtime.async_main()` for sub-actors OR,
+ assigned in `._root.open_root_actor()` for the root actor.
**THE primary reason** to keep this "upper" tn is that during
a full-`Actor`-cancellation condition (more details below) we want to
ensure that the IPC connection with a sub-actor's parent is **the last
thing to be cancelled**; this is most simply implemented by ensuring
that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
"policy" by scheduling a task in the `._root_tn` which,
* waits for the `._service_tn` to complete and then,
* cancels the `._root_tn.cancel_scope`,
* includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
|
|
|
assert actor._service_tn # runtime state sanity
|
2024-04-07 14:40:01 +00:00
|
|
|
|
2024-03-13 19:57:15 +00:00
|
|
|
# TODO: once `trio` get's an "obvious way" for req/resp we
|
|
|
|
|
# should use it?
|
2024-04-25 16:33:10 +00:00
|
|
|
# -[ ] existing GH https://github.com/python-trio/trio/issues/467
|
|
|
|
|
# -[ ] for other transports (like QUIC) we can possibly just
|
|
|
|
|
# entirely avoid the feeder mem-chans since each msg will be
|
|
|
|
|
# delivered with a ctx-id already?
|
|
|
|
|
#
|
|
|
|
|
# |_ for ex, from `aioquic` which exposed "stream ids":
|
|
|
|
|
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
|
|
|
|
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
2024-03-13 19:57:15 +00:00
|
|
|
nursery_cancelled_before_task: bool = False
|
2024-05-20 20:18:42 +00:00
|
|
|
msg: MsgType|None = None
|
2024-03-13 19:57:15 +00:00
|
|
|
try:
|
|
|
|
|
# NOTE: this internal scope allows for keeping this
|
|
|
|
|
# message loop running despite the current task having
|
|
|
|
|
# been cancelled (eg. `open_portal()` may call this method
|
|
|
|
|
# from a locally spawned task) and recieve this scope
|
|
|
|
|
# using ``scope = Nursery.start()``
|
|
|
|
|
with CancelScope(shield=shield) as loop_cs:
|
|
|
|
|
task_status.started(loop_cs)
|
2024-04-07 14:40:01 +00:00
|
|
|
|
2024-03-13 19:57:15 +00:00
|
|
|
async for msg in chan:
|
|
|
|
|
log.transport( # type: ignore
|
2024-04-30 16:15:46 +00:00
|
|
|
f'IPC msg from peer\n'
|
|
|
|
|
f'<= {chan.uid}\n\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-06-28 23:06:17 +00:00
|
|
|
# TODO: use of the pprinting of structs is
|
|
|
|
|
# FRAGILE and should prolly not be
|
|
|
|
|
#
|
|
|
|
|
# avoid fmting depending on loglevel for perf?
|
2024-04-30 16:15:46 +00:00
|
|
|
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
|
|
|
|
|
# - how to only log-level-aware actually call this?
|
2024-04-07 22:54:03 +00:00
|
|
|
# -[ ] use `.msg.pretty_struct` here now instead!
|
2024-04-30 16:15:46 +00:00
|
|
|
# f'{pretty_struct.pformat(msg)}\n'
|
|
|
|
|
f'{msg}\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
|
|
|
|
|
2024-04-02 17:41:52 +00:00
|
|
|
match msg:
|
2024-04-07 22:54:03 +00:00
|
|
|
# msg for an ongoing IPC ctx session, deliver msg to
|
|
|
|
|
# local task.
|
2024-04-02 17:41:52 +00:00
|
|
|
case (
|
|
|
|
|
StartAck(cid=cid)
|
|
|
|
|
| Started(cid=cid)
|
|
|
|
|
| Yield(cid=cid)
|
|
|
|
|
| Stop(cid=cid)
|
|
|
|
|
| Return(cid=cid)
|
2024-04-07 14:40:01 +00:00
|
|
|
| CancelAck(cid=cid)
|
2024-04-09 17:58:10 +00:00
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# `.cid` indicates RPC-ctx-task scoped
|
2024-04-09 17:58:10 +00:00
|
|
|
| Error(cid=cid)
|
|
|
|
|
|
|
|
|
|
# recv-side `MsgType` decode violation
|
|
|
|
|
| MsgTypeError(cid=cid)
|
2024-04-02 17:41:52 +00:00
|
|
|
):
|
|
|
|
|
# deliver response to local caller/waiter
|
|
|
|
|
# via its per-remote-context memory channel.
|
2024-04-08 14:25:57 +00:00
|
|
|
await actor._deliver_ctx_payload(
|
2024-04-02 17:41:52 +00:00
|
|
|
chan,
|
|
|
|
|
cid,
|
|
|
|
|
msg,
|
|
|
|
|
)
|
2024-04-07 14:40:01 +00:00
|
|
|
|
2024-04-07 22:54:03 +00:00
|
|
|
# `Actor`(-internal) runtime cancel requests
|
2024-04-07 14:40:01 +00:00
|
|
|
case Start(
|
|
|
|
|
ns='self',
|
|
|
|
|
func='cancel',
|
|
|
|
|
cid=cid,
|
|
|
|
|
kwargs=kwargs,
|
|
|
|
|
):
|
|
|
|
|
# XXX NOTE XXX don't start entire actor
|
|
|
|
|
# runtime cancellation if this actor is
|
|
|
|
|
# currently in debug mode!
|
2025-05-13 16:13:12 +00:00
|
|
|
pdb_complete: trio.Event|None = debug.DebugStatus.repl_release
|
2024-04-07 14:40:01 +00:00
|
|
|
if pdb_complete:
|
|
|
|
|
await pdb_complete.wait()
|
|
|
|
|
|
|
|
|
|
# Either of `Actor.cancel()`/`.cancel_soon()`
|
|
|
|
|
# was called, so terminate this IPC msg
|
|
|
|
|
# loop, exit back out into `async_main()`,
|
|
|
|
|
# and immediately start the core runtime
|
|
|
|
|
# machinery shutdown!
|
|
|
|
|
with CancelScope(shield=True):
|
|
|
|
|
await _invoke(
|
|
|
|
|
actor,
|
|
|
|
|
cid,
|
|
|
|
|
chan,
|
|
|
|
|
actor.cancel,
|
2025-07-07 15:02:47 +00:00
|
|
|
kwargs | {'req_chan': chan},
|
2024-04-07 14:40:01 +00:00
|
|
|
is_rpc=False,
|
2024-05-28 13:36:26 +00:00
|
|
|
return_msg_type=CancelAck,
|
2024-04-07 14:40:01 +00:00
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
log.runtime(
|
2025-07-07 15:02:47 +00:00
|
|
|
'Cancelling RPC-msg-loop with peer\n'
|
|
|
|
|
f'->c}} {chan.aid.reprol()}@[{chan.maddr}]\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
2024-04-07 14:40:01 +00:00
|
|
|
loop_cs.cancel()
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
case Start(
|
|
|
|
|
ns='self',
|
|
|
|
|
func='_cancel_task',
|
|
|
|
|
cid=cid,
|
|
|
|
|
kwargs=kwargs,
|
|
|
|
|
):
|
|
|
|
|
target_cid: str = kwargs['cid']
|
|
|
|
|
kwargs |= {
|
2025-06-29 18:47:03 +00:00
|
|
|
'requesting_aid': chan.aid,
|
2024-04-07 14:40:01 +00:00
|
|
|
'ipc_msg': msg,
|
|
|
|
|
|
|
|
|
|
# XXX NOTE! ONLY the rpc-task-owning
|
|
|
|
|
# parent IPC channel should be able to
|
|
|
|
|
# cancel it!
|
|
|
|
|
'parent_chan': chan,
|
|
|
|
|
}
|
|
|
|
|
try:
|
|
|
|
|
await _invoke(
|
|
|
|
|
actor,
|
|
|
|
|
cid,
|
|
|
|
|
chan,
|
|
|
|
|
actor._cancel_task,
|
|
|
|
|
kwargs,
|
|
|
|
|
is_rpc=False,
|
2024-05-28 13:36:26 +00:00
|
|
|
return_msg_type=CancelAck,
|
2024-04-07 14:40:01 +00:00
|
|
|
)
|
|
|
|
|
except BaseException:
|
|
|
|
|
log.exception(
|
|
|
|
|
'Failed to cancel task?\n'
|
|
|
|
|
f'<= canceller: {chan.uid}\n'
|
|
|
|
|
f' |_{chan}\n\n'
|
|
|
|
|
f'=> {actor}\n'
|
|
|
|
|
f' |_cid: {target_cid}\n'
|
|
|
|
|
)
|
2024-04-02 17:41:52 +00:00
|
|
|
|
2024-04-07 14:40:01 +00:00
|
|
|
# the "MAIN" RPC endpoint to schedule-a-`trio.Task`
|
2024-04-07 22:54:03 +00:00
|
|
|
# ------ - ------
|
|
|
|
|
# -[x] discard un-authed msgs as per,
|
|
|
|
|
# <TODO put issue for typed msging structs>
|
2024-04-02 17:41:52 +00:00
|
|
|
case Start(
|
|
|
|
|
cid=cid,
|
|
|
|
|
ns=ns,
|
|
|
|
|
func=funcname,
|
2024-04-05 23:07:12 +00:00
|
|
|
kwargs=kwargs, # type-spec this? see `msg.types`
|
2025-06-29 18:47:03 +00:00
|
|
|
uid=actor_uuid,
|
2024-04-02 17:41:52 +00:00
|
|
|
):
|
2025-06-29 18:47:03 +00:00
|
|
|
if actor_uuid != chan.aid.uid:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
f'IPC <Start> msg <-> chan.aid mismatch!?\n'
|
|
|
|
|
f'Channel.aid = {chan.aid!r}\n'
|
|
|
|
|
f'Start.uid = {actor_uuid!r}\n'
|
|
|
|
|
)
|
|
|
|
|
# await debug.pause()
|
|
|
|
|
op_repr: str = 'Start <=) '
|
|
|
|
|
req_repr: str = _pformat.nest_from_op(
|
|
|
|
|
input_op=op_repr,
|
|
|
|
|
op_suffix='',
|
|
|
|
|
nest_prefix='',
|
|
|
|
|
text=f'{chan}',
|
|
|
|
|
|
|
|
|
|
nest_indent=len(op_repr)-1,
|
|
|
|
|
rm_from_first_ln='<',
|
|
|
|
|
# ^XXX, subtract -1 to account for
|
|
|
|
|
# <Channel
|
|
|
|
|
# ^_chevron to be stripped
|
|
|
|
|
)
|
2024-05-07 13:20:43 +00:00
|
|
|
start_status: str = (
|
2025-06-29 18:47:03 +00:00
|
|
|
'Handling RPC request\n'
|
|
|
|
|
f'{req_repr}\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'->{{ ipc-context-id: {cid!r}\n'
|
|
|
|
|
f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n'
|
2024-04-02 17:41:52 +00:00
|
|
|
)
|
2024-04-07 14:40:01 +00:00
|
|
|
|
|
|
|
|
# runtime-internal endpoint: `Actor.<funcname>`
|
|
|
|
|
# only registry methods exist now yah,
|
|
|
|
|
# like ``.register_actor()`` etc. ?
|
2024-04-02 17:41:52 +00:00
|
|
|
if ns == 'self':
|
2024-04-07 14:40:01 +00:00
|
|
|
func: Callable = getattr(actor, funcname)
|
2024-04-02 17:41:52 +00:00
|
|
|
|
2024-04-07 14:40:01 +00:00
|
|
|
# application RPC endpoint
|
2024-04-02 17:41:52 +00:00
|
|
|
else:
|
|
|
|
|
try:
|
2024-04-07 22:54:03 +00:00
|
|
|
func: Callable = actor._get_rpc_func(
|
|
|
|
|
ns,
|
|
|
|
|
funcname,
|
|
|
|
|
)
|
2024-04-02 17:41:52 +00:00
|
|
|
except (
|
|
|
|
|
ModuleNotExposed,
|
|
|
|
|
AttributeError,
|
|
|
|
|
) as err:
|
2024-04-07 14:40:01 +00:00
|
|
|
# always complain to requester
|
|
|
|
|
# client about un-enabled modules
|
2024-04-02 17:41:52 +00:00
|
|
|
err_msg: dict[str, dict] = pack_error(
|
|
|
|
|
err,
|
|
|
|
|
cid=cid,
|
|
|
|
|
)
|
|
|
|
|
await chan.send(err_msg)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# schedule a task for the requested RPC function
|
|
|
|
|
# in the actor's main "service nursery".
|
2024-04-07 14:40:01 +00:00
|
|
|
#
|
2024-04-02 17:41:52 +00:00
|
|
|
# TODO: possibly a service-tn per IPC channel for
|
|
|
|
|
# supervision isolation? would avoid having to
|
|
|
|
|
# manage RPC tasks individually in `._rpc_tasks`
|
|
|
|
|
# table?
|
2025-06-29 18:47:03 +00:00
|
|
|
start_status += '->( scheduling new task..\n'
|
2024-05-07 13:20:43 +00:00
|
|
|
log.runtime(start_status)
|
2024-03-13 19:57:15 +00:00
|
|
|
try:
|
Heh, add back `Actor._root_tn`, it has purpose..
Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.
Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.
Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
`.trionics.maybe_open_nursery()` calls against optionally passed
`._[root/service]_tn` are allocated-if-not-provided (the
`._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
Bp).
- obvi adjust all internal usage to match new naming.
Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
service-tn and is either,
+ assigned in `._runtime.async_main()` for sub-actors OR,
+ assigned in `._root.open_root_actor()` for the root actor.
**THE primary reason** to keep this "upper" tn is that during
a full-`Actor`-cancellation condition (more details below) we want to
ensure that the IPC connection with a sub-actor's parent is **the last
thing to be cancelled**; this is most simply implemented by ensuring
that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
"policy" by scheduling a task in the `._root_tn` which,
* waits for the `._service_tn` to complete and then,
* cancels the `._root_tn.cancel_scope`,
* includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
|
|
|
ctx: Context = await actor._service_tn.start(
|
2024-04-02 17:41:52 +00:00
|
|
|
partial(
|
|
|
|
|
_invoke,
|
|
|
|
|
actor,
|
|
|
|
|
cid,
|
|
|
|
|
chan,
|
|
|
|
|
func,
|
|
|
|
|
kwargs,
|
|
|
|
|
),
|
|
|
|
|
name=funcname,
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
2024-04-02 17:41:52 +00:00
|
|
|
|
|
|
|
|
except (
|
|
|
|
|
RuntimeError,
|
|
|
|
|
BaseExceptionGroup,
|
|
|
|
|
):
|
|
|
|
|
# avoid reporting a benign race condition
|
|
|
|
|
# during actor runtime teardown.
|
|
|
|
|
nursery_cancelled_before_task: bool = True
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
# in the lone case where a ``Context`` is not
|
|
|
|
|
# delivered, it's likely going to be a locally
|
|
|
|
|
# scoped exception from ``_invoke()`` itself.
|
|
|
|
|
if isinstance(err := ctx, Exception):
|
|
|
|
|
log.warning(
|
2024-05-07 13:20:43 +00:00
|
|
|
start_status
|
|
|
|
|
+
|
|
|
|
|
' -> task for RPC failed?\n\n'
|
2024-04-02 17:41:52 +00:00
|
|
|
f'{err}'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
2024-04-02 17:41:52 +00:00
|
|
|
continue
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-04-02 17:41:52 +00:00
|
|
|
else:
|
2024-04-07 14:40:01 +00:00
|
|
|
# mark our global state with ongoing rpc tasks
|
2024-04-02 17:41:52 +00:00
|
|
|
actor._ongoing_rpc_tasks = trio.Event()
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-04-02 17:41:52 +00:00
|
|
|
# store cancel scope such that the rpc task can be
|
|
|
|
|
# cancelled gracefully if requested
|
|
|
|
|
actor._rpc_tasks[(chan, cid)] = (
|
|
|
|
|
ctx,
|
|
|
|
|
func,
|
|
|
|
|
trio.Event(),
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2025-03-31 03:57:52 +00:00
|
|
|
# XXX RUNTIME-SCOPED! remote (likely internal) error
|
2024-04-25 16:33:10 +00:00
|
|
|
# (^- bc no `Error.cid` -^)
|
|
|
|
|
#
|
|
|
|
|
# NOTE: this is the non-rpc error case, that
|
|
|
|
|
# is, an error NOT raised inside a call to
|
|
|
|
|
# `_invoke()` (i.e. no cid was provided in the
|
|
|
|
|
# msg - see above). Raise error inline and
|
|
|
|
|
# mark the channel as "globally errored" for
|
|
|
|
|
# all downstream consuming primitives.
|
2024-04-14 23:31:50 +00:00
|
|
|
case Error():
|
2024-04-07 14:40:01 +00:00
|
|
|
chan._exc: Exception = unpack_error(
|
2024-04-02 17:41:52 +00:00
|
|
|
msg,
|
|
|
|
|
chan=chan,
|
|
|
|
|
)
|
2024-04-07 14:40:01 +00:00
|
|
|
raise chan._exc
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-04-14 23:31:50 +00:00
|
|
|
# unknown/invalid msg type?
|
|
|
|
|
case _:
|
|
|
|
|
codec: MsgCodec = current_codec()
|
|
|
|
|
message: str = (
|
|
|
|
|
f'Unhandled IPC msg for codec?\n\n'
|
|
|
|
|
f'|_{codec}\n\n'
|
|
|
|
|
f'{msg}\n'
|
|
|
|
|
)
|
|
|
|
|
log.exception(message)
|
|
|
|
|
raise RuntimeError(message)
|
|
|
|
|
|
2024-04-18 19:18:29 +00:00
|
|
|
log.transport(
|
2024-03-13 19:57:15 +00:00
|
|
|
'Waiting on next IPC msg from\n'
|
|
|
|
|
f'peer: {chan.uid}\n'
|
|
|
|
|
f'|_{chan}\n'
|
|
|
|
|
)
|
|
|
|
|
|
2024-04-07 22:54:03 +00:00
|
|
|
# END-OF `async for`:
|
|
|
|
|
# IPC disconnected via `trio.EndOfChannel`, likely
|
|
|
|
|
# due to a (graceful) `Channel.aclose()`.
|
2025-07-07 15:02:47 +00:00
|
|
|
|
|
|
|
|
chan_op_repr: str = '<=x] '
|
|
|
|
|
chan_repr: str = _pformat.nest_from_op(
|
|
|
|
|
input_op=chan_op_repr,
|
|
|
|
|
op_suffix='',
|
|
|
|
|
nest_prefix='',
|
|
|
|
|
text=chan.pformat(),
|
|
|
|
|
nest_indent=len(chan_op_repr)-1,
|
|
|
|
|
rm_from_first_ln='<',
|
|
|
|
|
)
|
2024-03-13 19:57:15 +00:00
|
|
|
log.runtime(
|
2025-07-07 15:02:47 +00:00
|
|
|
f'IPC channel disconnected\n'
|
|
|
|
|
f'{chan_repr}\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'->c) cancelling RPC tasks.\n'
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
|
|
|
|
await actor.cancel_rpc_tasks(
|
2025-06-29 18:47:03 +00:00
|
|
|
req_aid=actor.aid,
|
2024-03-13 19:57:15 +00:00
|
|
|
# a "self cancel" in terms of the lifetime of the
|
|
|
|
|
# IPC connection which is presumed to be the
|
|
|
|
|
# source of any requests for spawned tasks.
|
|
|
|
|
parent_chan=chan,
|
|
|
|
|
)
|
|
|
|
|
|
2024-07-02 16:21:26 +00:00
|
|
|
except TransportClosed as tc:
|
2024-03-13 19:57:15 +00:00
|
|
|
# channels "breaking" (for TCP streams by EOF or 104
|
|
|
|
|
# connection-reset) is ok since we don't have a teardown
|
|
|
|
|
# handshake for them (yet) and instead we simply bail out of
|
|
|
|
|
# the message loop and expect the teardown sequence to clean
|
2024-04-07 22:54:03 +00:00
|
|
|
# up..
|
2024-04-30 16:15:46 +00:00
|
|
|
#
|
|
|
|
|
# TODO: maybe add a teardown handshake? and,
|
2024-07-02 16:21:26 +00:00
|
|
|
# -[x] don't show this msg if it's an ephemeral discovery ep call?
|
|
|
|
|
# |_ see the below `.report_n_maybe_raise()` impl as well as
|
|
|
|
|
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
|
|
|
|
|
# for different read-failure cases.
|
2024-04-07 22:54:03 +00:00
|
|
|
# -[ ] figure out how this will break with other transports?
|
2024-07-02 16:21:26 +00:00
|
|
|
tc.report_n_maybe_raise(
|
|
|
|
|
message=(
|
2025-04-03 17:28:36 +00:00
|
|
|
f'peer IPC channel closed abruptly?\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'<=x[\n'
|
|
|
|
|
f' {chan}\n'
|
2024-07-02 16:21:26 +00:00
|
|
|
f' |_{chan.raddr}\n\n'
|
|
|
|
|
)
|
|
|
|
|
+
|
|
|
|
|
tc.message
|
|
|
|
|
|
2024-03-13 19:57:15 +00:00
|
|
|
)
|
|
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# transport **WAS** disconnected
|
2024-04-30 16:15:46 +00:00
|
|
|
return (True, msg)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
|
|
|
|
except (
|
|
|
|
|
Exception,
|
|
|
|
|
BaseExceptionGroup,
|
|
|
|
|
) as err:
|
|
|
|
|
|
|
|
|
|
if nursery_cancelled_before_task:
|
Heh, add back `Actor._root_tn`, it has purpose..
Turns out I didn't read my own internals docs/comments and despite it
not being used previously, this adds the real use case: a root,
per-actor, scope which ensures parent comms are the last conc-thing to
be cancelled.
Also, the impl changes here make the test from 6410e45 (or wtv
it's rebased to) pass, i.e. we can support crash handling in the root
actor despite the root-tn having been (self) cancelled.
Superficial adjustments,
- rename `Actor._service_n` -> `._service_tn` everywhere.
- add asserts to `._runtime.async_main()` which ensure that the any
`.trionics.maybe_open_nursery()` calls against optionally passed
`._[root/service]_tn` are allocated-if-not-provided (the
`._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern
Bp).
- obvi adjust all internal usage to match new naming.
Serious/real-use-case changes,
- add (back) a `Actor._root_tn` which sits a scope "above" the
service-tn and is either,
+ assigned in `._runtime.async_main()` for sub-actors OR,
+ assigned in `._root.open_root_actor()` for the root actor.
**THE primary reason** to keep this "upper" tn is that during
a full-`Actor`-cancellation condition (more details below) we want to
ensure that the IPC connection with a sub-actor's parent is **the last
thing to be cancelled**; this is most simply implemented by ensuring
that the `Actor._parent_chan: .ipc.Channel` is handled in an upper
scope in `_rpc.process_messages()`-subtask-terms.
- for the root actor this `root_tn` is allocated in `.open_root_actor()`
body and assigned as such.
- extend `Actor.cancel_soon()` to be cohesive with this entire teardown
"policy" by scheduling a task in the `._root_tn` which,
* waits for the `._service_tn` to complete and then,
* cancels the `._root_tn.cancel_scope`,
* includes "sclangy" console logging throughout.
2025-08-19 23:24:20 +00:00
|
|
|
sn: Nursery = actor._service_tn
|
2024-03-13 19:57:15 +00:00
|
|
|
assert sn and sn.cancel_scope.cancel_called # sanity
|
|
|
|
|
log.cancel(
|
|
|
|
|
f'Service nursery cancelled before it handled {funcname}'
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
# ship any "internal" exception (i.e. one from internal
|
|
|
|
|
# machinery not from an rpc task) to parent
|
|
|
|
|
match err:
|
|
|
|
|
case ContextCancelled():
|
|
|
|
|
log.cancel(
|
|
|
|
|
f'Actor: {actor.uid} was context-cancelled with,\n'
|
|
|
|
|
f'str(err)'
|
|
|
|
|
)
|
|
|
|
|
case _:
|
|
|
|
|
log.exception("Actor errored:")
|
|
|
|
|
|
|
|
|
|
if actor._parent_chan:
|
|
|
|
|
await try_ship_error_to_remote(
|
|
|
|
|
actor._parent_chan,
|
|
|
|
|
err,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# if this is the `MainProcess` we expect the error broadcasting
|
|
|
|
|
# above to trigger an error at consuming portal "checkpoints"
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
# msg debugging for when he machinery is brokey
|
2024-05-07 13:20:43 +00:00
|
|
|
if msg is None:
|
2025-06-29 18:47:03 +00:00
|
|
|
message: str = 'Exiting RPC-loop without receiving a msg?'
|
2024-05-07 13:20:43 +00:00
|
|
|
else:
|
2025-06-29 18:47:03 +00:00
|
|
|
task_op_repr: str = ')>'
|
|
|
|
|
task: trio.Task = trio.lowlevel.current_task()
|
|
|
|
|
|
|
|
|
|
# maybe add cancelled opt prefix
|
|
|
|
|
if task._cancel_status.effectively_cancelled:
|
|
|
|
|
task_op_repr = 'c' + task_op_repr
|
|
|
|
|
|
|
|
|
|
task_repr: str = _pformat.nest_from_op(
|
|
|
|
|
input_op=task_op_repr,
|
|
|
|
|
text=f'{task!r}',
|
|
|
|
|
nest_indent=1,
|
|
|
|
|
)
|
|
|
|
|
# chan_op_repr: str = '<=} '
|
|
|
|
|
# chan_repr: str = _pformat.nest_from_op(
|
|
|
|
|
# input_op=chan_op_repr,
|
|
|
|
|
# op_suffix='',
|
|
|
|
|
# nest_prefix='',
|
|
|
|
|
# text=chan.pformat(),
|
|
|
|
|
# nest_indent=len(chan_op_repr)-1,
|
|
|
|
|
# rm_from_first_ln='<',
|
|
|
|
|
# )
|
2024-05-07 13:20:43 +00:00
|
|
|
message: str = (
|
2025-06-29 18:47:03 +00:00
|
|
|
f'Exiting RPC-loop with final msg\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
# f'{chan_repr}\n'
|
|
|
|
|
f'{task_repr}\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'{pretty_struct.pformat(msg)}'
|
|
|
|
|
f'\n'
|
2024-05-07 13:20:43 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
log.runtime(message)
|
2024-03-13 19:57:15 +00:00
|
|
|
|
2024-04-25 16:33:10 +00:00
|
|
|
# transport **WAS NOT** disconnected
|
2024-04-30 16:15:46 +00:00
|
|
|
return (False, msg)
|