Add a `Context.portal`, more cancel tooing
Might as well add a public maybe-getter for use on the "parent" side since it can be handy to check out-of-band cancellation conditions (like from `Portal.cancel_actor()`). Buncha bitty tweaks for more easily debugging cancel conditions: - add a `@.cancel_called.setter` for hooking into `.cancel_called = True` being set in hard to decipher "who cancelled us" scenarios. - use a new `self_ctxc: bool` var in `.cancel()` to capture the output state from `._is_self_cancelled(remote_error)` at call time so it can be compared against the measured value at crash-time (when REPL-ing it can often have already changed due to runtime teardown sequencing vs. the crash handler hook entry). - proxy `hide_tb` to `.drain_to_final_msg()` from `.wait_for_result()`. - use `remote_error.sender` attr directly instead of through `RAE.msgdata: dict` lookup. - change var name `our_uid` -> `peer_uid`; it's not "ours".. Other various docs/comment updates: - extend the main class doc to include some other name ideas. - change over all remaining `.result()` refs to `.wait_for_result()`. - doc more details on how we want `.outcome` to eventually signature.aio_abandons
parent
9133f42b07
commit
9f9b0b17dc
|
@ -121,10 +121,19 @@ class Unresolved:
|
||||||
@dataclass
|
@dataclass
|
||||||
class Context:
|
class Context:
|
||||||
'''
|
'''
|
||||||
An inter-actor, SC transitive, `Task` communication context.
|
An inter-actor, SC transitive, `trio.Task` (pair)
|
||||||
|
communication context.
|
||||||
|
|
||||||
NB: This class should **never be instatiated directly**, it is allocated
|
(We've also considered other names and ideas:
|
||||||
by the runtime in 2 ways:
|
- "communicating tasks scope": cts
|
||||||
|
- "distributed task scope": dts
|
||||||
|
- "communicating tasks context": ctc
|
||||||
|
|
||||||
|
**Got a better idea for naming? Make an issue dawg!**
|
||||||
|
)
|
||||||
|
|
||||||
|
NB: This class should **never be instatiated directly**, it is
|
||||||
|
allocated by the runtime in 2 ways:
|
||||||
- by entering `Portal.open_context()` which is the primary
|
- by entering `Portal.open_context()` which is the primary
|
||||||
public API for any "parent" task or,
|
public API for any "parent" task or,
|
||||||
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
|
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
|
||||||
|
@ -210,6 +219,16 @@ class Context:
|
||||||
# more the the `Context` is needed?
|
# more the the `Context` is needed?
|
||||||
_portal: Portal | None = None
|
_portal: Portal | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def portal(self) -> Portal|None:
|
||||||
|
'''
|
||||||
|
Return any wrapping memory-`Portal` if this is
|
||||||
|
a 'parent'-side task which called `Portal.open_context()`,
|
||||||
|
otherwise `None`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._portal
|
||||||
|
|
||||||
# NOTE: each side of the context has its own cancel scope
|
# NOTE: each side of the context has its own cancel scope
|
||||||
# which is exactly the primitive that allows for
|
# which is exactly the primitive that allows for
|
||||||
# cross-actor-task-supervision and thus SC.
|
# cross-actor-task-supervision and thus SC.
|
||||||
|
@ -299,6 +318,8 @@ class Context:
|
||||||
# boxed exception. NOW, it's used for spawning overrun queuing
|
# boxed exception. NOW, it's used for spawning overrun queuing
|
||||||
# tasks when `.allow_overruns == True` !!!
|
# tasks when `.allow_overruns == True` !!!
|
||||||
_scope_nursery: trio.Nursery|None = None
|
_scope_nursery: trio.Nursery|None = None
|
||||||
|
# ^-TODO-^ change name?
|
||||||
|
# -> `._scope_tn` "scope task nursery"
|
||||||
|
|
||||||
# streaming overrun state tracking
|
# streaming overrun state tracking
|
||||||
_in_overrun: bool = False
|
_in_overrun: bool = False
|
||||||
|
@ -408,10 +429,23 @@ class Context:
|
||||||
'''
|
'''
|
||||||
return self._cancel_called
|
return self._cancel_called
|
||||||
|
|
||||||
|
@cancel_called.setter
|
||||||
|
def cancel_called(self, val: bool) -> None:
|
||||||
|
'''
|
||||||
|
Set the self-cancelled request `bool` value.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# to debug who frickin sets it..
|
||||||
|
# if val:
|
||||||
|
# from .devx import pause_from_sync
|
||||||
|
# pause_from_sync()
|
||||||
|
|
||||||
|
self._cancel_called = val
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def canceller(self) -> tuple[str, str]|None:
|
def canceller(self) -> tuple[str, str]|None:
|
||||||
'''
|
'''
|
||||||
``Actor.uid: tuple[str, str]`` of the (remote)
|
`Actor.uid: tuple[str, str]` of the (remote)
|
||||||
actor-process who's task was cancelled thus causing this
|
actor-process who's task was cancelled thus causing this
|
||||||
(side of the) context to also be cancelled.
|
(side of the) context to also be cancelled.
|
||||||
|
|
||||||
|
@ -515,7 +549,7 @@ class Context:
|
||||||
|
|
||||||
# the local scope was never cancelled
|
# the local scope was never cancelled
|
||||||
# and instead likely we received a remote side
|
# and instead likely we received a remote side
|
||||||
# # cancellation that was raised inside `.result()`
|
# # cancellation that was raised inside `.wait_for_result()`
|
||||||
# or (
|
# or (
|
||||||
# (se := self._local_error)
|
# (se := self._local_error)
|
||||||
# and se is re
|
# and se is re
|
||||||
|
@ -585,6 +619,10 @@ class Context:
|
||||||
self,
|
self,
|
||||||
error: BaseException,
|
error: BaseException,
|
||||||
|
|
||||||
|
# TODO: manual toggle for cases where we wouldn't normally
|
||||||
|
# mark ourselves cancelled but want to?
|
||||||
|
# set_cancel_called: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
(Maybe) cancel this local scope due to a received remote
|
(Maybe) cancel this local scope due to a received remote
|
||||||
|
@ -603,7 +641,7 @@ class Context:
|
||||||
- `Portal.open_context()`
|
- `Portal.open_context()`
|
||||||
- `Portal.result()`
|
- `Portal.result()`
|
||||||
- `Context.open_stream()`
|
- `Context.open_stream()`
|
||||||
- `Context.result()`
|
- `Context.wait_for_result()`
|
||||||
|
|
||||||
when called/closed by actor local task(s).
|
when called/closed by actor local task(s).
|
||||||
|
|
||||||
|
@ -729,7 +767,7 @@ class Context:
|
||||||
|
|
||||||
# Cancel the local `._scope`, catch that
|
# Cancel the local `._scope`, catch that
|
||||||
# `._scope.cancelled_caught` and re-raise any remote error
|
# `._scope.cancelled_caught` and re-raise any remote error
|
||||||
# once exiting (or manually calling `.result()`) the
|
# once exiting (or manually calling `.wait_for_result()`) the
|
||||||
# `.open_context()` block.
|
# `.open_context()` block.
|
||||||
cs: trio.CancelScope = self._scope
|
cs: trio.CancelScope = self._scope
|
||||||
if (
|
if (
|
||||||
|
@ -764,8 +802,9 @@ class Context:
|
||||||
# `trio.Cancelled` subtype here ;)
|
# `trio.Cancelled` subtype here ;)
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||||
|
# from .devx import pause_from_sync
|
||||||
|
# pause_from_sync()
|
||||||
self._scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
# from .devx import mk_pdb
|
# from .devx import mk_pdb
|
||||||
|
@ -845,15 +884,15 @@ class Context:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def repr_api(self) -> str:
|
def repr_api(self) -> str:
|
||||||
|
return 'Portal.open_context()'
|
||||||
|
|
||||||
|
# TODO: use `.dev._frame_stack` scanning to find caller!
|
||||||
# ci: CallerInfo|None = self._caller_info
|
# ci: CallerInfo|None = self._caller_info
|
||||||
# if ci:
|
# if ci:
|
||||||
# return (
|
# return (
|
||||||
# f'{ci.api_nsp}()\n'
|
# f'{ci.api_nsp}()\n'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# TODO: use `.dev._frame_stack` scanning to find caller!
|
|
||||||
return 'Portal.open_context()'
|
|
||||||
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
timeout: float = 0.616,
|
timeout: float = 0.616,
|
||||||
|
@ -889,7 +928,8 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
side: str = self.side
|
side: str = self.side
|
||||||
self._cancel_called: bool = True
|
# XXX for debug via the `@.setter`
|
||||||
|
self.cancel_called = True
|
||||||
|
|
||||||
header: str = (
|
header: str = (
|
||||||
f'Cancelling ctx with peer from {side.upper()} side\n\n'
|
f'Cancelling ctx with peer from {side.upper()} side\n\n'
|
||||||
|
@ -912,7 +952,7 @@ class Context:
|
||||||
# `._scope.cancel()` since we expect the eventual
|
# `._scope.cancel()` since we expect the eventual
|
||||||
# `ContextCancelled` from the other side to trigger this
|
# `ContextCancelled` from the other side to trigger this
|
||||||
# when the runtime finally receives it during teardown
|
# when the runtime finally receives it during teardown
|
||||||
# (normally in `.result()` called from
|
# (normally in `.wait_for_result()` called from
|
||||||
# `Portal.open_context().__aexit__()`)
|
# `Portal.open_context().__aexit__()`)
|
||||||
if side == 'parent':
|
if side == 'parent':
|
||||||
if not self._portal:
|
if not self._portal:
|
||||||
|
@ -1025,10 +1065,10 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
our_uid: tuple = self.chan.uid
|
peer_uid: tuple = self.chan.uid
|
||||||
|
|
||||||
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
|
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
|
||||||
# for "graceful cancellation" case:
|
# for "graceful cancellation" case(s):
|
||||||
#
|
#
|
||||||
# Whenever a "side" of a context (a `Task` running in
|
# Whenever a "side" of a context (a `Task` running in
|
||||||
# an actor) **is** the side which requested ctx
|
# an actor) **is** the side which requested ctx
|
||||||
|
@ -1045,9 +1085,11 @@ class Context:
|
||||||
# set to the `Actor.uid` of THIS task (i.e. the
|
# set to the `Actor.uid` of THIS task (i.e. the
|
||||||
# cancellation requesting task's actor is the actor
|
# cancellation requesting task's actor is the actor
|
||||||
# checking whether it should absorb the ctxc).
|
# checking whether it should absorb the ctxc).
|
||||||
|
self_ctxc: bool = self._is_self_cancelled(remote_error)
|
||||||
if (
|
if (
|
||||||
|
self_ctxc
|
||||||
|
and
|
||||||
not raise_ctxc_from_self_call
|
not raise_ctxc_from_self_call
|
||||||
and self._is_self_cancelled(remote_error)
|
|
||||||
|
|
||||||
# TODO: ?potentially it is useful to emit certain
|
# TODO: ?potentially it is useful to emit certain
|
||||||
# warning/cancel logs for the cases where the
|
# warning/cancel logs for the cases where the
|
||||||
|
@ -1077,8 +1119,8 @@ class Context:
|
||||||
and isinstance(remote_error, RemoteActorError)
|
and isinstance(remote_error, RemoteActorError)
|
||||||
and remote_error.boxed_type is StreamOverrun
|
and remote_error.boxed_type is StreamOverrun
|
||||||
|
|
||||||
# and tuple(remote_error.msgdata['sender']) == our_uid
|
# and tuple(remote_error.msgdata['sender']) == peer_uid
|
||||||
and tuple(remote_error.sender) == our_uid
|
and tuple(remote_error.sender) == peer_uid
|
||||||
):
|
):
|
||||||
# NOTE: we set the local scope error to any "self
|
# NOTE: we set the local scope error to any "self
|
||||||
# cancellation" error-response thus "absorbing"
|
# cancellation" error-response thus "absorbing"
|
||||||
|
@ -1140,9 +1182,9 @@ class Context:
|
||||||
of the remote cancellation.
|
of the remote cancellation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
assert self._portal, (
|
assert self._portal, (
|
||||||
"Context.result() can not be called from callee side!"
|
'`Context.wait_for_result()` can not be called from callee side!'
|
||||||
)
|
)
|
||||||
if self._final_result_is_set():
|
if self._final_result_is_set():
|
||||||
return self._result
|
return self._result
|
||||||
|
@ -1197,10 +1239,11 @@ class Context:
|
||||||
# raising something we know might happen
|
# raising something we know might happen
|
||||||
# during cancellation ;)
|
# during cancellation ;)
|
||||||
(not self._cancel_called)
|
(not self._cancel_called)
|
||||||
)
|
),
|
||||||
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
# TODO: eventually make `.outcome: Outcome` and thus return
|
# TODO: eventually make `.outcome: Outcome` and thus return
|
||||||
# `self.outcome.unwrap()` here!
|
# `self.outcome.unwrap()` here?
|
||||||
return self.outcome
|
return self.outcome
|
||||||
|
|
||||||
# TODO: switch this with above!
|
# TODO: switch this with above!
|
||||||
|
@ -1284,17 +1327,24 @@ class Context:
|
||||||
Any|
|
Any|
|
||||||
RemoteActorError|
|
RemoteActorError|
|
||||||
ContextCancelled
|
ContextCancelled
|
||||||
|
# TODO: make this a `outcome.Outcome`!
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
The final "outcome" from an IPC context which can either be
|
Return the "final outcome" (state) of the far end peer task
|
||||||
some Value returned from the target `@context`-decorated
|
non-blocking. If the remote task has not completed then this
|
||||||
remote task-as-func, or an `Error` wrapping an exception
|
field always resolves to the module defined `Unresolved`
|
||||||
raised from an RPC task fault or cancellation.
|
handle.
|
||||||
|
|
||||||
Note that if the remote task has not terminated then this
|
------ - ------
|
||||||
field always resolves to the module defined `Unresolved` handle.
|
TODO->( this is doc-driven-dev content not yet actual ;P )
|
||||||
|
|
||||||
TODO: implement this using `outcome.Outcome` types?
|
The final "outcome" from an IPC context which can be any of:
|
||||||
|
- some `outcome.Value` which boxes the returned output from the peer task's
|
||||||
|
`@context`-decorated remote task-as-func, or
|
||||||
|
- an `outcome.Error` wrapping an exception raised that same RPC task
|
||||||
|
after a fault or cancellation, or
|
||||||
|
- an unresolved `outcome.Outcome` when the peer task is still
|
||||||
|
executing and has not yet completed.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return (
|
return (
|
||||||
|
@ -1583,7 +1633,7 @@ class Context:
|
||||||
|
|
||||||
- NEVER `return` early before delivering the msg!
|
- NEVER `return` early before delivering the msg!
|
||||||
bc if the error is a ctxc and there is a task waiting on
|
bc if the error is a ctxc and there is a task waiting on
|
||||||
`.result()` we need the msg to be
|
`.wait_for_result()` we need the msg to be
|
||||||
`send_chan.send_nowait()`-ed over the `._rx_chan` so
|
`send_chan.send_nowait()`-ed over the `._rx_chan` so
|
||||||
that the error is relayed to that waiter task and thus
|
that the error is relayed to that waiter task and thus
|
||||||
raised in user code!
|
raised in user code!
|
||||||
|
@ -1828,7 +1878,7 @@ async def open_context_from_portal(
|
||||||
When the "callee" (side that is "called"/started by a call
|
When the "callee" (side that is "called"/started by a call
|
||||||
to *this* method) returns, the caller side (this) unblocks
|
to *this* method) returns, the caller side (this) unblocks
|
||||||
and any final value delivered from the other end can be
|
and any final value delivered from the other end can be
|
||||||
retrieved using the `Contex.result()` api.
|
retrieved using the `Contex.wait_for_result()` api.
|
||||||
|
|
||||||
The yielded ``Context`` instance further allows for opening
|
The yielded ``Context`` instance further allows for opening
|
||||||
bidirectional streams, explicit cancellation and
|
bidirectional streams, explicit cancellation and
|
||||||
|
@ -1965,14 +2015,14 @@ async def open_context_from_portal(
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
||||||
# ??TODO??: do we still want to consider this or is
|
# ??TODO??: do we still want to consider this or is
|
||||||
# the `else:` block handling via a `.result()`
|
# the `else:` block handling via a `.wait_for_result()`
|
||||||
# call below enough??
|
# call below enough??
|
||||||
#
|
#
|
||||||
# -[ ] pretty sure `.result()` internals do the
|
# -[ ] pretty sure `.wait_for_result()` internals do the
|
||||||
# same as our ctxc handler below so it ended up
|
# same as our ctxc handler below so it ended up
|
||||||
# being same (repeated?) behaviour, but ideally we
|
# being same (repeated?) behaviour, but ideally we
|
||||||
# wouldn't have that duplication either by somehow
|
# wouldn't have that duplication either by somehow
|
||||||
# factoring the `.result()` handler impl in a way
|
# factoring the `.wait_for_result()` handler impl in a way
|
||||||
# that we can re-use it around the `yield` ^ here
|
# that we can re-use it around the `yield` ^ here
|
||||||
# or vice versa?
|
# or vice versa?
|
||||||
#
|
#
|
||||||
|
@ -2110,7 +2160,7 @@ async def open_context_from_portal(
|
||||||
# AND a group-exc is only raised if there was > 1
|
# AND a group-exc is only raised if there was > 1
|
||||||
# tasks started *here* in the "caller" / opener
|
# tasks started *here* in the "caller" / opener
|
||||||
# block. If any one of those tasks calls
|
# block. If any one of those tasks calls
|
||||||
# `.result()` or `MsgStream.receive()`
|
# `.wait_for_result()` or `MsgStream.receive()`
|
||||||
# `._maybe_raise_remote_err()` will be transitively
|
# `._maybe_raise_remote_err()` will be transitively
|
||||||
# called and the remote error raised causing all
|
# called and the remote error raised causing all
|
||||||
# tasks to be cancelled.
|
# tasks to be cancelled.
|
||||||
|
@ -2180,7 +2230,7 @@ async def open_context_from_portal(
|
||||||
f'|_{ctx._task}\n'
|
f'|_{ctx._task}\n'
|
||||||
)
|
)
|
||||||
# XXX NOTE XXX: the below call to
|
# XXX NOTE XXX: the below call to
|
||||||
# `Context.result()` will ALWAYS raise
|
# `Context.wait_for_result()` will ALWAYS raise
|
||||||
# a `ContextCancelled` (via an embedded call to
|
# a `ContextCancelled` (via an embedded call to
|
||||||
# `Context._maybe_raise_remote_err()`) IFF
|
# `Context._maybe_raise_remote_err()`) IFF
|
||||||
# a `Context._remote_error` was set by the runtime
|
# a `Context._remote_error` was set by the runtime
|
||||||
|
@ -2190,10 +2240,10 @@ async def open_context_from_portal(
|
||||||
# ALWAYS SET any time "callee" side fails and causes "caller
|
# ALWAYS SET any time "callee" side fails and causes "caller
|
||||||
# side" cancellation via a `ContextCancelled` here.
|
# side" cancellation via a `ContextCancelled` here.
|
||||||
try:
|
try:
|
||||||
result_or_err: Exception|Any = await ctx.result()
|
result_or_err: Exception|Any = await ctx.wait_for_result()
|
||||||
except BaseException as berr:
|
except BaseException as berr:
|
||||||
# on normal teardown, if we get some error
|
# on normal teardown, if we get some error
|
||||||
# raised in `Context.result()` we still want to
|
# raised in `Context.wait_for_result()` we still want to
|
||||||
# save that error on the ctx's state to
|
# save that error on the ctx's state to
|
||||||
# determine things like `.cancelled_caught` for
|
# determine things like `.cancelled_caught` for
|
||||||
# cases where there was remote cancellation but
|
# cases where there was remote cancellation but
|
||||||
|
|
Loading…
Reference in New Issue