Add `@context(pld_spec=<TypeAlias>)` TODO list

Longer run we don't want `tractor` app devs having to call
`msg._ops.limit_plds()` from every child endpoint.. so this starts
a list of decorator API ideas and obviously ties in with an ideal final
API design that will come with py3.13 and typed funcs. Obviously this
is directly fueled by,

- https://github.com/goodboy/tractor/issues/365

Other,
- type with direct `trio.lowlevel.Task` import.
- use `log.exception()` to show tbs for all error-terminations in
  `.open_context()` (for now) and always explicitly mention the `.side`.
runtime_to_msgspec
Tyler Goodlet 2024-06-14 15:27:35 -04:00
parent 7a89b59a3f
commit d528e7ab4d
1 changed files with 28 additions and 15 deletions

View File

@ -58,6 +58,7 @@ from typing import (
import warnings import warnings
# ------ - ------ # ------ - ------
import trio import trio
from trio.lowlevel import Task
# ------ - ------ # ------ - ------
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
@ -121,7 +122,7 @@ class Unresolved:
@dataclass @dataclass
class Context: class Context:
''' '''
An inter-actor, SC transitive, `trio.Task` communication context. An inter-actor, SC transitive, `Task` communication context.
NB: This class should **never be instatiated directly**, it is allocated NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways: by the runtime in 2 ways:
@ -134,7 +135,7 @@ class Context:
Allows maintaining task or protocol specific state between Allows maintaining task or protocol specific state between
2 cancel-scope-linked, communicating and parallel executing 2 cancel-scope-linked, communicating and parallel executing
`trio.Task`s. Contexts are allocated on each side of any task `Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is actor from a `Portal`. On the "callee" side a context is
always allocated inside `._rpc._invoke()`. always allocated inside `._rpc._invoke()`.
@ -214,7 +215,7 @@ class Context:
# which is exactly the primitive that allows for # which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC. # cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope|None = None _scope: trio.CancelScope|None = None
_task: trio.lowlevel.Task|None = None _task: Task|None = None
# TODO: cs around result waiting so we can cancel any # TODO: cs around result waiting so we can cancel any
# permanently blocking `._rx_chan.receive()` call in # permanently blocking `._rx_chan.receive()` call in
@ -258,14 +259,14 @@ class Context:
# a call to `.cancel()` which triggers `ContextCancelled`. # a call to `.cancel()` which triggers `ContextCancelled`.
_cancel_msg: str|dict|None = None _cancel_msg: str|dict|None = None
# NOTE: this state var used by the runtime to determine if the # NOTE: this state-var is used by the runtime to determine if the
# `pdbp` REPL is allowed to engage on contexts terminated via # `pdbp` REPL is allowed to engage on contexts terminated via
# a `ContextCancelled` due to a call to `.cancel()` triggering # a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side: # "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging # - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "callee" # the crash handler REPL in such cases where the "callee"
# raises the cancellation, # raises the cancellation,
# - `.devx._debug.lock_tty_for_child()` will set it to `False` if # - `.devx._debug.lock_stdio_for_peer()` will set it to `False` if
# the global tty-lock has been configured to filter out some # the global tty-lock has been configured to filter out some
# actors from being able to acquire the debugger lock. # actors from being able to acquire the debugger lock.
_enter_debugger_on_cancel: bool = True _enter_debugger_on_cancel: bool = True
@ -861,7 +862,7 @@ class Context:
) -> None: ) -> None:
''' '''
Cancel this inter-actor IPC context by requestng the Cancel this inter-actor IPC context by requestng the
remote side's cancel-scope-linked `trio.Task` by calling remote side's cancel-scope-linked `Task` by calling
`._scope.cancel()` and delivering an `ContextCancelled` `._scope.cancel()` and delivering an `ContextCancelled`
ack msg in reponse. ack msg in reponse.
@ -1030,7 +1031,7 @@ class Context:
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
# for "graceful cancellation" case: # for "graceful cancellation" case:
# #
# Whenever a "side" of a context (a `trio.Task` running in # Whenever a "side" of a context (a `Task` running in
# an actor) **is** the side which requested ctx # an actor) **is** the side which requested ctx
# cancellation (likekly via ``Context.cancel()``), we # cancellation (likekly via ``Context.cancel()``), we
# **don't** want to re-raise any eventually received # **don't** want to re-raise any eventually received
@ -1089,7 +1090,8 @@ class Context:
else: else:
log.warning( log.warning(
'Local error already set for ctx?\n' 'Local error already set for ctx?\n'
f'{self._local_error}\n' f'{self._local_error}\n\n'
f'{self}'
) )
return remote_error return remote_error
@ -2117,8 +2119,9 @@ async def open_context_from_portal(
# the `ContextCancelled` "self cancellation absorbed" case # the `ContextCancelled` "self cancellation absorbed" case
# handled in the block above ^^^ !! # handled in the block above ^^^ !!
# await _debug.pause() # await _debug.pause()
log.cancel( # log.cancel(
'Context terminated due to\n\n' log.exception(
f'{ctx.side}-side of `Context` terminated with '
f'.outcome => {ctx.repr_outcome()}\n' f'.outcome => {ctx.repr_outcome()}\n'
) )
@ -2319,7 +2322,7 @@ async def open_context_from_portal(
# type_only=True, # type_only=True,
) )
log.cancel( log.cancel(
f'Context terminated due to local scope error:\n\n' f'Context terminated due to local {ctx.side!r}-side error:\n\n'
f'{ctx.chan.uid} => {outcome_str}\n' f'{ctx.chan.uid} => {outcome_str}\n'
) )
@ -2385,15 +2388,25 @@ def mk_context(
# TODO: use the new type-parameters to annotate this in 3.13? # TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types # -[ ] https://peps.python.org/pep-0718/#unknown-types
# -[ ] allow for `pld_spec` input(s) ideally breaking down,
# |_ `start: ParameterSpec`,
# |_ `started: TypeAlias`,
# |_ `yields: TypeAlias`,
# |_ `return: TypeAlias`,
# |_ `invalid_policy: str|Callable` ?
# -[ ] prolly implement the `@acm` wrapper using
# a `contextlib.ContextDecorator`?
#
def context( def context(
func: Callable, func: Callable,
) -> Callable: ) -> Callable:
''' '''
Mark an (async) function as an SC-supervised, inter-`Actor`, Mark an async function as an SC-supervised, inter-`Actor`, RPC
child-`trio.Task`, IPC endpoint otherwise known more scheduled child-side `Task`, IPC endpoint otherwise
colloquially as a (RPC) "context". known more colloquially as a (RPC) "context".
Functions annotated the fundamental IPC endpoint type offered by `tractor`. Functions annotated the fundamental IPC endpoint type offered by
`tractor`.
''' '''
# TODO: apply whatever solution ``mypy`` ends up picking for this: # TODO: apply whatever solution ``mypy`` ends up picking for this: