Guarding for IPC failures in `._runtime._invoke()`

Took me longer then i wanted to figure out the source of
a failed-response to a remote-cancellation (in this case in `modden`
where a client was cancelling a workspace layer.. but disconnects before
receiving the ack msg) that was triggering an IPC error when sending the
error msg for the cancellation of a `Actor._cancel_task()`, but since
this (non-rpc) `._invoke()` task was trying to send to a now
disconnected canceller it was resulting in a `BrokenPipeError` (or similar)
error.

Now, we except for such IPC errors and only raise them when,
1. the transport `Channel` is for sure up (bc ow what's the point of
   trying to send an error on the thing that caused it..)
2. it's definitely for handling an RPC task

Similarly if the entire main invoke `try:` excepts,
- we only hide the call-stack frame from the debugger (with
  `__tracebackhide__: bool`) if it's an RPC task that has a connected
  channel since we always want to see the frame when debugging internal
  task or IPC failures.
- we don't bother trying to send errors to the context caller (actor)
  when it's a non-RPC request since failures on actor-runtime-internal
  tasks shouldn't really ever be reported remotely, only maybe raised
  locally.

Also some other tidying,
- this properly corrects for the self-cancel case where an RPC context
  is cancelled due to a local (runtime) task calling a method like
  `Actor.cancel_soon()`. We now set our own `.uid` as the
  `ContextCancelled.canceller` value so that other-end tasks know that
  the cancellation was due to a self-cancellation by the actor itself.
  We still need to properly test for this though!
- add a more detailed module doc-str.
- more explicit imports for `trio` core types throughout.
multihomed
Tyler Goodlet 2024-01-02 09:08:39 -05:00
parent f415fc43ce
commit 250275d98d
1 changed files with 130 additions and 56 deletions

View File

@ -15,7 +15,10 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Actor primitives and helpers
The fundamental core machinery implementing every "actor" including
the process-local (python-interpreter global) `Actor` state-type
primitive(s), RPC-in-task scheduling, and IPC connectivity and
low-level transport msg handling.
"""
from __future__ import annotations
@ -41,8 +44,14 @@ import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore
from trio_typing import TaskStatus
import trio
from trio import (
CancelScope,
)
from trio_typing import (
Nursery,
TaskStatus,
)
from ._ipc import Channel
from ._context import (
@ -90,10 +99,9 @@ async def _invoke(
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: trio.Nursery`.
remotely invoked function, normally in `Actor._service_n: Nursery`.
'''
__tracebackhide__: bool = True
treat_as_gen: bool = False
failed_resp: bool = False
@ -110,9 +118,9 @@ async def _invoke(
# possibly a traceback (not sure what typing is for this..)
tb = None
cancel_scope = trio.CancelScope()
cancel_scope = CancelScope()
# activated cancel scope ref
cs: trio.CancelScope | None = None
cs: CancelScope | None = None
ctx = actor.get_context(
chan,
@ -124,6 +132,7 @@ async def _invoke(
)
context: bool = False
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
@ -165,6 +174,7 @@ async def _invoke(
except TypeError:
raise
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope
@ -195,6 +205,7 @@ async def _invoke(
await chan.send({'stop': True, 'cid': cid})
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push
@ -211,8 +222,20 @@ async def _invoke(
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
# our most general case: a remote SC-transitive,
# IPC-linked, cross-actor-task "context"
# ------ - ------
# TODO: every other "func type" should be implemented from
# a special case of a context eventually!
# a special case of this impl eventually!
# -[ ] streaming funcs should instead of being async-for
# handled directly here wrapped in
# a async-with-open_stream() closure that does the
# normal thing you'd expect a far end streaming context
# to (if written by the app-dev).
# -[ ] one off async funcs can literally just be called
# here and awaited directly, possibly just with a small
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
@ -273,11 +296,12 @@ async def _invoke(
ctx._maybe_raise_remote_err(re)
fname: str = func.__name__
cs: trio.CancelScope = ctx._scope
cs: CancelScope = ctx._scope
if cs.cancel_called:
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller
msg: str = (
f'`{fname}()`@{actor.uid} cancelled by '
f'`{fname}()`@{our_uid} cancelled by '
)
# NOTE / TODO: if we end up having
@ -286,6 +310,8 @@ async def _invoke(
# need to change this logic branch since it
# will always enter..
if ctx._cancel_called:
# TODO: test for this!!!!!
canceller: tuple = our_uid
msg += 'itself '
# if the channel which spawned the ctx is the
@ -318,40 +344,76 @@ async def _invoke(
canceller=canceller,
)
# regular async function
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
else:
try:
await chan.send({
'functype': 'asyncfunc',
'cid': cid
})
except trio.BrokenResourceError:
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
failed_resp = True
if is_rpc:
raise
else:
# TODO: should this be an `.exception()` call?
log.warning(
f'Failed to respond to non-rpc request: {func}'
f'Failed to respond to non-rpc request: {func}\n'
f'{ipc_err}'
)
with cancel_scope as cs:
ctx._scope = cs
ctx._scope: CancelScope = cs
task_status.started(ctx)
result = await coro
fname: str = func.__name__
log.runtime(f'{fname}() result: {result}')
if not failed_resp:
# only send result if we know IPC isn't down
# NOTE: only send result if we know IPC isn't down
if (
not failed_resp
and chan.connected()
):
try:
await chan.send(
{'return': result,
'cid': cid}
)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to return result:\n'
f'{func}@{actor.uid}\n'
f'remote chan: {chan.uid}'
)
except (
Exception,
BaseExceptionGroup,
) as err:
# always hide this frame from debug REPL if the crash
# originated from an rpc task and we DID NOT fail
# due to an IPC transport error!
if (
is_rpc
and chan.connected()
):
__tracebackhide__: bool = True
if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
@ -385,9 +447,13 @@ async def _invoke(
log.exception("Actor crashed:")
# always ship errors back to caller
err_msg = pack_error(err, tb=tb)
err_msg: dict[str, dict] = pack_error(
err,
tb=tb,
)
err_msg['cid'] = cid
if is_rpc:
try:
await chan.send(err_msg)
@ -398,10 +464,13 @@ async def _invoke(
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
):
) as ipc_err:
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?"
f"Failed to ship error to caller @ {chan.uid} !?\n"
f'{ipc_err}'
)
# error is probably from above coro running code *not from the
@ -428,7 +497,11 @@ async def _invoke(
log.warning(
f"Task {func} likely errored or cancelled before start")
else:
log.cancel(f'{func.__name__}({kwargs}) failed?')
log.cancel(
'Failed to de-alloc internal task!?\n'
f'cid: {cid}\n'
f'{func.__name__}({kwargs})'
)
finally:
if not actor._rpc_tasks:
@ -445,7 +518,7 @@ async def try_ship_error_to_parent(
err: Exception | BaseExceptionGroup,
) -> None:
with trio.CancelScope(shield=True):
with CancelScope(shield=True):
try:
# internal error so ship to parent without cid
await channel.send(pack_error(err))
@ -497,13 +570,13 @@ class Actor:
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
_root_n: trio.Nursery | None = None
_service_n: trio.Nursery | None = None
_server_n: trio.Nursery | None = None
_root_n: Nursery | None = None
_service_n: Nursery | None = None
_server_n: Nursery | None = None
# Information about `__main__` from parent
_parent_main_data: dict[str, str]
_parent_chan_cs: trio.CancelScope | None = None
_parent_chan_cs: CancelScope | None = None
# syncs for setup/teardown sequences
_server_down: trio.Event | None = None
@ -1096,12 +1169,12 @@ class Actor:
async def _serve_forever(
self,
handler_nursery: trio.Nursery,
handler_nursery: Nursery,
*,
# (host, port) to bind for channel server
listen_sockaddrs: list[tuple[str, int]] | None = None,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Start the channel server, begin listening for new connections.
@ -1188,7 +1261,7 @@ class Actor:
self._cancel_called = True
# cancel all ongoing rpc tasks
with trio.CancelScope(shield=True):
with CancelScope(shield=True):
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
@ -1248,7 +1321,7 @@ class Actor:
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope: trio.CancelScope = ctx._scope
scope: CancelScope = ctx._scope
except KeyError:
log.cancel(f"{cid} has already completed/terminated?")
return True
@ -1613,7 +1686,7 @@ async def async_main(
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if actor.name == 'brokerd.ib':
# with trio.CancelScope(shield=True):
# with CancelScope(shield=True):
# await _debug.breakpoint()
actor.lifetime_stack.close()
@ -1655,7 +1728,7 @@ async def async_main(
):
log.runtime(
f"Waiting for remaining peers {actor._peers} to clear")
with trio.CancelScope(shield=True):
with CancelScope(shield=True):
await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
@ -1666,7 +1739,7 @@ async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
@ -1684,7 +1757,7 @@ async def process_messages(
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try:
with trio.CancelScope(shield=shield) as loop_cs:
with CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
@ -1746,18 +1819,18 @@ async def process_messages(
if ns == 'self':
if funcname == 'cancel':
func = actor.cancel
func: Callable = actor.cancel
kwargs['requesting_uid'] = chan.uid
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete = _debug.Lock.local_pdb_complete
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
with CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``async_main()``
@ -1785,7 +1858,7 @@ async def process_messages(
# we immediately start the runtime machinery
# shutdown
# with trio.CancelScope(shield=True):
# with CancelScope(shield=True):
kwargs['chan'] = chan
target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid
@ -1810,7 +1883,7 @@ async def process_messages(
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func = getattr(actor, funcname)
func: Callable = getattr(actor, funcname)
else:
# complain to client about restricted modules
@ -1900,9 +1973,10 @@ async def process_messages(
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task:
sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called
sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)