forked from goodboy/tractor
1
0
Fork 0

Even moar bitty `Context` refinements

- set `._state._ctxvar_Context` just after `StartAck` inside
  `open_context_from_portal()` so that `current_ipc_ctx()` always
  works on the 'parent' side.
- always set `.canceller` to any `MsgTypeError.src_uid` and otherwise to
  any maybe-detected `.src_uid` (i.e. for RAEs).
- always set `.canceller` to us when we rx a ctxc which reports us as
  its canceller; this is a sanity check on definite "self cancellation".
- adjust `._is_self_cancelled()` logic to only be `True` when
  `._remote_error` is both a ctxc with a `.canceller` set to us AND
  when `Context.canceller` is also set to us (since the change above)
  as a little bit of extra rigor.
- fill-in/fix some `.repr_state` edge cases:
  - merge self-vs.-peer ctxc cases to one block and distinguish via
    nested `._is_self_cancelled()` check.
  - set 'errored' for all exception matched cases despite `.canceller`.
  - add pre-`Return` phase statuses:
   |_'pre-started' and 'syncing-to-child' depending on side and when
     `._stream` has not (yet) been set.
   |_'streaming' and 'streaming-finished' depending on side when
     `._stream` is set and whether it was stopped/closed.
- tweak drainage log-message to use "outcome" instead of "result".
- use new `.devx.pformat.pformat_cs()` inside `_maybe_cancel_and_set_remote_error()`
  but, IFF the log level is at least 'cancel'.
runtime_to_msgspec
Tyler Goodlet 2024-05-08 13:35:29 -04:00
parent 45f37870af
commit 343b7c9712
1 changed files with 126 additions and 71 deletions

View File

@ -37,8 +37,9 @@ import inspect
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable,
AsyncGenerator, AsyncGenerator,
Callable,
Mapping,
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
Union, Union,
@ -59,7 +60,10 @@ from ._exceptions import (
pack_from_raise, pack_from_raise,
unpack_error, unpack_error,
) )
from .log import get_logger from .log import (
get_logger,
at_least_level,
)
from .msg import ( from .msg import (
Error, Error,
MsgType, MsgType,
@ -83,6 +87,7 @@ from ._streaming import MsgStream
from ._state import ( from ._state import (
current_actor, current_actor,
debug_mode, debug_mode,
_ctxvar_Context,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -204,7 +209,7 @@ class Context:
# cancelled that the other side is as well, so maybe we should # cancelled that the other side is as well, so maybe we should
# instead just have a `.canceller` pulled from the # instead just have a `.canceller` pulled from the
# `ContextCancelled`? # `ContextCancelled`?
_canceller: tuple[str, str] | None = None _canceller: tuple[str, str]|None = None
# NOTE: we try to ensure assignment of a "cancel msg" since # NOTE: we try to ensure assignment of a "cancel msg" since
# there's always going to be an "underlying reason" that any # there's always going to be an "underlying reason" that any
@ -384,8 +389,12 @@ class Context:
re: BaseException|None = ( re: BaseException|None = (
remote_error remote_error
or self._remote_error or
self._remote_error
) )
# XXX we only report "this context" as self-cancelled
# once we've received a ctxc from our direct-peer task
# (aka we're `.cancel_acked`).
if not re: if not re:
return False return False
@ -396,10 +405,10 @@ class Context:
our_canceller = self.canceller our_canceller = self.canceller
return bool( return bool(
isinstance(re, ContextCancelled) isinstance((ctxc := re), ContextCancelled)
and from_uid == self.chan.uid and from_uid == self.chan.uid
and re.canceller == our_uid and ctxc.canceller == our_uid
and our_canceller == from_uid and our_canceller == our_uid
) )
@property @property
@ -619,15 +628,27 @@ class Context:
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
msgerr: bool = False
# self-cancel (ack) or, # self-cancel (ack) or,
# peer propagated remote cancellation. # peer propagated remote cancellation.
msgerr: bool = False
if isinstance(error, ContextCancelled): if isinstance(error, ContextCancelled):
# NOTE in the case error is a ctxc the canceller will
# either be another peer or us. in the case where it's us
# we mark ourself as the canceller of ourselves (a ctx
# "self cancel" from this side's perspective), if instead
# the far end was cancelled by some other (inter-) peer,
# we want to mark our canceller as the actor that was
# cancelled, NOT their reported canceller. IOW in the
# latter case we're cancelled by someone else getting
# cancelled.
if (canc := error.canceller) == self._actor.uid:
whom: str = 'us'
self._canceller = canc
else:
whom = 'a remote peer (not us)'
self._canceller = error.src_uid
whom: str = (
'us' if error.canceller == self._actor.uid
else 'a remote peer (not us)'
)
log.cancel( log.cancel(
f'IPC context was cancelled by {whom}!\n\n' f'IPC context was cancelled by {whom}!\n\n'
f'{error}' f'{error}'
@ -635,6 +656,7 @@ class Context:
elif isinstance(error, MsgTypeError): elif isinstance(error, MsgTypeError):
msgerr = True msgerr = True
self._canceller = error.src_uid
log.error( log.error(
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n' f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
f'{error}\n' f'{error}\n'
@ -642,28 +664,25 @@ class Context:
) )
else: else:
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
maybe_error_src_uid: tuple = getattr(
error,
'src_uid',
None,
)
# we mark the source actor as our canceller
self._canceller = maybe_error_src_uid
log.error( log.error(
f'Remote context error:\n\n' f'Remote context error:\n\n'
# f'{pformat(self)}\n' # f'{pformat(self)}\n'
f'{error}\n' f'{error}\n'
) )
# always record the cancelling actor's uid since its if self._canceller is None:
# cancellation state is linked and we want to know log.error('Ctx has no canceller set!?')
# which process was the cause / requester of the
# cancellation.
maybe_error_src: tuple = getattr(
error,
'src_uid',
None,
)
self._canceller = (
maybe_error_src
or
# XXX: in the case we get a non-boxed error?
# -> wait but this should never happen right?
self.chan.uid
)
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
@ -707,27 +726,34 @@ class Context:
message: str = 'NOT cancelling `Context._scope` !\n\n' message: str = 'NOT cancelling `Context._scope` !\n\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
if cs: if (
cs
and
at_least_level(log=log, level='cancel')
):
fmt_str: str = self.pformat( fmt_str: str = self.pformat(
extra_fields={ extra_fields={
'._is_self_cancelled()': self._is_self_cancelled(), '._is_self_cancelled()': self._is_self_cancelled(),
'._cancel_on_msgerr': self._cancel_on_msgerr, '._cancel_on_msgerr': self._cancel_on_msgerr,
'._scope': cs,
'._scope.cancel_called': cs.cancel_called,
'._scope.cancelled_caught': cs.cancelled_caught,
'._scope._cancel_status': cs._cancel_status,
} }
) )
from .devx.pformat import pformat_cs
cs_fmt: str = pformat_cs(
cs,
var_name='Context._scope',
)
fmt_str += (
'\n'
+
cs_fmt
)
log.cancel( log.cancel(
message message
+ +
fmt_str fmt_str
) )
# TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs?
# TODO: add to `Channel`? # TODO: also add to `Channel`?
@property @property
def dst_maddr(self) -> str: def dst_maddr(self) -> str:
chan: Channel = self.chan chan: Channel = self.chan
@ -1100,7 +1126,8 @@ class Context:
f'ctx id: {self.cid}' f'ctx id: {self.cid}'
) )
# TODO: replace all the instances of this!! XD # TODO: replace all the `._maybe_raise_remote_err()` usage
# with instances of this!!
def maybe_raise( def maybe_raise(
self, self,
hide_tb: bool = True, hide_tb: bool = True,
@ -1111,6 +1138,7 @@ class Context:
if re := self._remote_error: if re := self._remote_error:
return self._maybe_raise_remote_err( return self._maybe_raise_remote_err(
re, re,
hide_tb=hide_tb,
**kwargs, **kwargs,
) )
@ -1212,7 +1240,6 @@ class Context:
# runtime frames from the tb explicitly? # runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607 # https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True
raise remote_error # from None raise remote_error # from None
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
@ -1263,8 +1290,15 @@ class Context:
# wait for a final context result/error by "draining" # wait for a final context result/error by "draining"
# (by more or less ignoring) any bi-dir-stream "yield" # (by more or less ignoring) any bi-dir-stream "yield"
# msgs still in transit from the far end. # msgs still in transit from the far end.
#
# XXX NOTE XXX: this call shouldn't really ever raise
# (other then internal error), instead delivering an
# `Error`-msg and that being `.maybe_raise()`-ed below
# since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set
# any `.maybe_error`.
( (
return_msg, outcome_msg,
drained_msgs, drained_msgs,
) = await msgops.drain_to_final_msg( ) = await msgops.drain_to_final_msg(
ctx=self, ctx=self,
@ -1282,14 +1316,19 @@ class Context:
f'{msg}\n' f'{msg}\n'
) )
log.cancel( drained_status: str = (
'Ctx drained to final result msgs\n' 'Ctx drained to final outcome msg\n\n'
f'{return_msg}\n\n' f'{outcome_msg}\n'
)
f'pre-result drained msgs:\n' if drained_msgs:
drained_status += (
'\n'
f'The pre-drained msgs are\n'
f'{pformat(drained_msgs)}\n' f'{pformat(drained_msgs)}\n'
) )
log.cancel(drained_status)
self.maybe_raise( self.maybe_raise(
# NOTE: obvi we don't care if we # NOTE: obvi we don't care if we
# overran the far end if we're already # overran the far end if we're already
@ -1319,7 +1358,7 @@ class Context:
@property @property
def maybe_error(self) -> BaseException|None: def maybe_error(self) -> BaseException|None:
le: Exception|None = self._local_error le: BaseException|None = self._local_error
re: RemoteActorError|ContextCancelled|None = self._remote_error re: RemoteActorError|ContextCancelled|None = self._remote_error
match (le, re): match (le, re):
@ -1347,7 +1386,7 @@ class Context:
# ContextCancelled(canceller=), # ContextCancelled(canceller=),
# ): # ):
error: Exception|None = le or re error: BaseException|None = le or re
if error: if error:
return error return error
@ -1462,52 +1501,63 @@ class Context:
''' '''
merr: Exception|None = self.maybe_error merr: Exception|None = self.maybe_error
outcome: Unresolved|Exception|Any = self.outcome outcome: Unresolved|Exception|Any = self.outcome
status: str|None = None
match ( match (
outcome, outcome,
merr, merr,
): ):
# "graceful" ctx cancellation
case ( case (
Unresolved, Unresolved,
ContextCancelled(), ContextCancelled(),
) if self.cancel_acked: ):
if self._is_self_cancelled():
status = 'self-cancelled' status = 'self-cancelled'
elif (
case (
Unresolved,
ContextCancelled(),
) if (
self.canceller self.canceller
and not self._cancel_called and not self._cancel_called
): ):
status = 'peer-cancelled' status = 'peer-cancelled'
# (remote) error condition
case ( case (
Unresolved, Unresolved,
BaseException(), BaseException(), # any error-type
) if self.canceller: ):
status = 'errored' status = 'errored'
# result already returned
case ( case (
_, # any non-unresolved value _, # any non-unresolved value
None, None,
) if self._final_result_is_set(): ) if self._final_result_is_set():
status = 'returned' status = 'returned'
# normal operation but still in a pre-`Return`-result
# dialog phase
case ( case (
Unresolved, # noqa (weird.. ruff) Unresolved, # noqa (ruff, you so weird..)
None, None, # no (remote) error set
): ):
if stream := self._stream: if stream := self._stream:
if stream.closed: if stream.closed:
status = 'streaming-finished' status = 'streaming-finished'
else: else:
status = 'streaming' status = 'streaming'
elif self._started_called: elif self._started_called:
status = 'started' status = 'started'
case _: else:
status = 'unknown!?' if self.side == 'child':
status = 'pre-started'
else:
status = 'syncing-to-child'
if status is None:
status = '??unknown??'
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
return status return status
@ -1738,7 +1788,6 @@ class Context:
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n' f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
f'{flow_body}' f'{flow_body}'
f'{pformat(re)}\n' f'{pformat(re)}\n'
) )
self._cancel_msg: dict = msg self._cancel_msg: dict = msg
@ -2003,6 +2052,7 @@ async def open_context_from_portal(
) )
assert ctx._remote_func_type == 'context' assert ctx._remote_func_type == 'context'
assert ctx._caller_info assert ctx._caller_info
_ctxvar_Context.set(ctx)
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the # XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered # `Started`-msg any cancellation triggered
@ -2156,7 +2206,7 @@ async def open_context_from_portal(
# CASE 2: context was cancelled by local task calling # CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should # `.cancel()`, we don't raise and the exit block should
# exit silently. # finish silently.
if ( if (
ctx._cancel_called ctx._cancel_called
and and
@ -2281,6 +2331,11 @@ async def open_context_from_portal(
try: try:
result_or_err: Exception|Any = await ctx.result() result_or_err: Exception|Any = await ctx.result()
except BaseException as berr: except BaseException as berr:
# cancelled before (or maybe during?) final result capture
# if isinstance(trio.Cancelled, berr):
# from .devx import mk_pdb
# mk_pdb.set_trace()
# on normal teardown, if we get some error # on normal teardown, if we get some error
# raised in `Context.result()` we still want to # raised in `Context.result()` we still want to
# save that error on the ctx's state to # save that error on the ctx's state to
@ -2476,12 +2531,12 @@ def mk_context(
_caller_info=caller_info, _caller_info=caller_info,
**kwargs, **kwargs,
) )
# TODO: we can drop the old placeholder yah?
# ctx._result: int | Any = id(ctx)
ctx._result = Unresolved ctx._result = Unresolved
return ctx return ctx
# TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types
def context(func: Callable) -> Callable: def context(func: Callable) -> Callable:
''' '''
Mark an (async) function as an SC-supervised, inter-`Actor`, Mark an (async) function as an SC-supervised, inter-`Actor`,
@ -2495,8 +2550,8 @@ def context(func: Callable) -> Callable:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912 # https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore func._tractor_context_function = True # type: ignore
sig = inspect.signature(func) sig: inspect.Signature = inspect.signature(func)
params = sig.parameters params: Mapping = sig.parameters
if 'ctx' not in params: if 'ctx' not in params:
raise TypeError( raise TypeError(
"The first argument to the context function " "The first argument to the context function "