forked from goodboy/tractor
1
0
Fork 0

Offer a `@context(pld_spec=<TypeAlias>)` API

Instead of the WIP/prototyped `Portal.open_context()` offering
a `pld_spec` input arg, this changes to a proper decorator API for
specifying the "payload spec" on `@context` endpoints.

The impl change details actually cover 2-birds:
- monkey patch decorated functions with a new
  `._tractor_context_meta: dict[str, Any]` and insert any provided input
  `@context` kwargs: `_pld_spec`, `enc_hook`, `enc_hook`.
- use `inspect.get_annotations()` to scan for a `func` arg
  type-annotated with `tractor.Context` and use the name of that arg as
  the RPC task-side injected `Context`, thus injecting the needed arg
  by type instead of by name (a longstanding TODO); raise a type-error
  when not found.
- pull the `pld_spec` from the `._tractor_context_meta` attr both in the
  `.open_context()` parent-side and child-side `._invoke()`-cation of
  the RPC task and use the `msg._ops.maybe_limit_plds()` API to apply it
  internally in the runtime for each case.
runtime_to_msgspec
Tyler Goodlet 2024-06-16 22:50:43 -04:00
parent e6d4ec43b9
commit 5449bd5673
2 changed files with 92 additions and 22 deletions

View File

@ -1792,7 +1792,6 @@ async def open_context_from_portal(
portal: Portal, portal: Portal,
func: Callable, func: Callable,
pld_spec: TypeAlias|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
hide_tb: bool = True, hide_tb: bool = True,
@ -1838,12 +1837,20 @@ async def open_context_from_portal(
# NOTE: 2 bc of the wrapping `@acm` # NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa __runtimeframe__: int = 2 # noqa
# conduct target func method structural checks # if NOT an async func but decorated with `@context`, error.
if not inspect.iscoroutinefunction(func) and ( if (
getattr(func, '_tractor_contex_function', False) not inspect.iscoroutinefunction(func)
and getattr(func, '_tractor_context_meta', False)
): ):
raise TypeError( raise TypeError(
f'{func} must be an async generator function!') f'{func!r} must be an async function!'
)
ctx_meta: dict[str, Any]|None = getattr(
func,
'_tractor_context_meta',
None,
)
# TODO: i think from here onward should probably # TODO: i think from here onward should probably
# just be factored into an `@acm` inside a new # just be factored into an `@acm` inside a new
@ -1890,12 +1897,9 @@ async def open_context_from_portal(
trio.open_nursery() as tn, trio.open_nursery() as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
spec=pld_spec, spec=ctx_meta.get('pld_spec'),
) as maybe_msgdec, ),
): ):
if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec
# NOTE: this in an implicit runtime nursery used to, # NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as # - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user. # for cancellation of the scope opened by the user.
@ -2398,7 +2402,15 @@ def mk_context(
# a `contextlib.ContextDecorator`? # a `contextlib.ContextDecorator`?
# #
def context( def context(
func: Callable, func: Callable|None = None,
*,
# must be named!
pld_spec: Union[Type]|TypeAlias = Any,
dec_hook: Callable|None = None,
enc_hook: Callable|None = None,
) -> Callable: ) -> Callable:
''' '''
Mark an async function as an SC-supervised, inter-`Actor`, RPC Mark an async function as an SC-supervised, inter-`Actor`, RPC
@ -2409,15 +2421,54 @@ def context(
`tractor`. `tractor`.
''' '''
# XXX for the `@context(pld_spec=MyMsg|None)` case
if func is None:
return partial(
context,
pld_spec=pld_spec,
dec_hook=dec_hook,
enc_hook=enc_hook,
)
# TODO: from this, enforcing a `Start.sig` type
# check when invoking RPC tasks by ensuring the input
# args validate against the endpoint def.
sig: inspect.Signature = inspect.signature(func)
# params: inspect.Parameters = sig.parameters
# https://docs.python.org/3/library/inspect.html#inspect.get_annotations
annots: dict[str, Type] = inspect.get_annotations(
func,
eval_str=True,
)
name: str
param: Type
for name, param in annots.items():
if param is Context:
ctx_var_name: str = name
break
else:
raise TypeError(
'At least one (normally the first) argument to the `@context` function '
f'{func.__name__!r} must be typed as `tractor.Context`, for ex,\n\n'
f'`ctx: tractor.Context`\n'
)
# TODO: apply whatever solution ``mypy`` ends up picking for this: # TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912 # https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore # func._tractor_context_function = True # type: ignore
func._tractor_context_meta: dict[str, Any] = {
'ctx_var_name': ctx_var_name,
# `msgspec` related settings
'pld_spec': pld_spec,
'enc_hook': enc_hook,
'dec_hook': dec_hook,
sig: inspect.Signature = inspect.signature(func) # TODO: eventually we need to "signature-check" with these
params: Mapping = sig.parameters # vs. the `Start` msg fields!
if 'ctx' not in params: # => this would allow for TPC endpoint argument-type-spec
raise TypeError( # limiting and we could then error on
"The first argument to the context function " # invalid inputs passed to `.open_context(rpc_ep, arg0='blah')`
f"{func.__name__} must be `ctx: tractor.Context`" 'sig': sig,
) }
return func return func

View File

@ -69,6 +69,7 @@ from .msg import (
PayloadT, PayloadT,
NamespacePath, NamespacePath,
pretty_struct, pretty_struct,
_ops as msgops,
) )
from tractor.msg.types import ( from tractor.msg.types import (
CancelAck, CancelAck,
@ -500,8 +501,19 @@ async def _invoke(
# handle decorated ``@tractor.context`` async function # handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False): # - pull out any typed-pld-spec info and apply (below)
kwargs['ctx'] = ctx # - (TODO) store func-ref meta data for API-frame-info logging
elif (
ctx_meta := getattr(
func,
'_tractor_context_meta',
False,
)
):
# kwargs['ctx'] = ctx
# set the required `tractor.Context` typed input argument to
# the allocated RPC task context.
kwargs[ctx_meta['ctx_var_name']] = ctx
context_ep_func = True context_ep_func = True
# errors raised inside this block are propgated back to caller # errors raised inside this block are propgated back to caller
@ -595,7 +607,14 @@ async def _invoke(
# `@context` marked RPC function. # `@context` marked RPC function.
# - `._portal` is never set. # - `._portal` is never set.
try: try:
async with trio.open_nursery() as tn: async with (
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=ctx_meta.get('pld_spec'),
dec_hook=ctx_meta.get('dec_hook'),
),
):
ctx._scope_nursery = tn ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
task_status.started(ctx) task_status.started(ctx)