forked from goodboy/tractor
Remote cancellation runtime-internal vars renames
- `Context._cancel_called_remote` -> `._cancelled_remote` since "called" implies the cancellation was "requested" when it could be due to another error and the actor uid is the value - only set once the far end task scope is terminated due to either error or cancel, which has nothing to do with *what* caused the cancellation. - `Actor._cancel_called_remote` -> `._cancel_called_by_remote` which emphasizes that this variable is **only set** IFF some remote actor **requested that** this actor's runtime be cancelled via `Actor.cancel()`.ctx_cancel_semantics_and_overruns
parent
ead9e418de
commit
a0276f41c2
|
@ -103,7 +103,7 @@ class Context:
|
||||||
|
|
||||||
# cancellation state
|
# cancellation state
|
||||||
_cancel_called: bool = False
|
_cancel_called: bool = False
|
||||||
_cancel_called_remote: tuple | None = None
|
_cancelled_remote: tuple | None = None
|
||||||
_cancel_msg: str | None = None
|
_cancel_msg: str | None = None
|
||||||
_scope: trio.CancelScope | None = None
|
_scope: trio.CancelScope | None = None
|
||||||
_enter_debugger_on_cancel: bool = True
|
_enter_debugger_on_cancel: bool = True
|
||||||
|
@ -126,7 +126,7 @@ class Context:
|
||||||
causing this side of the context to also be cancelled.
|
causing this side of the context to also be cancelled.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
remote_uid = self._cancel_called_remote
|
remote_uid = self._cancelled_remote
|
||||||
if remote_uid:
|
if remote_uid:
|
||||||
return tuple(remote_uid)
|
return tuple(remote_uid)
|
||||||
|
|
||||||
|
@ -209,6 +209,10 @@ class Context:
|
||||||
# that error as the reason.
|
# that error as the reason.
|
||||||
self._remote_error = error
|
self._remote_error = error
|
||||||
|
|
||||||
|
# always record the remote actor's uid since its cancellation
|
||||||
|
# state is directly linked to ours (the local one).
|
||||||
|
self._cancelled_remote = self.chan.uid
|
||||||
|
|
||||||
if (
|
if (
|
||||||
isinstance(error, ContextCancelled)
|
isinstance(error, ContextCancelled)
|
||||||
):
|
):
|
||||||
|
@ -218,9 +222,12 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._cancel_called:
|
if self._cancel_called:
|
||||||
|
# from ._debug import breakpoint
|
||||||
|
# await breakpoint()
|
||||||
|
|
||||||
# this is an expected cancel request response message
|
# this is an expected cancel request response message
|
||||||
# and we don't need to raise it in scope since it will
|
# and we **don't need to raise it** in local cancel
|
||||||
# potentially override a real error
|
# scope since it will potentially override a real error.
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
log.error(
|
log.error(
|
||||||
|
@ -236,7 +243,7 @@ class Context:
|
||||||
):
|
):
|
||||||
# from trio.testing import wait_all_tasks_blocked
|
# from trio.testing import wait_all_tasks_blocked
|
||||||
# await wait_all_tasks_blocked()
|
# await wait_all_tasks_blocked()
|
||||||
self._cancel_called_remote = self.chan.uid
|
# self._cancelled_remote = self.chan.uid
|
||||||
self._scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
# NOTE: this usage actually works here B)
|
# NOTE: this usage actually works here B)
|
||||||
|
@ -252,7 +259,7 @@ class Context:
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
msg: str | None = None,
|
msg: str | None = None,
|
||||||
timeout: float = 0.5,
|
timeout: float = 0.616,
|
||||||
# timeout: float = 1000,
|
# timeout: float = 1000,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -281,7 +288,7 @@ class Context:
|
||||||
|
|
||||||
cid = self.cid
|
cid = self.cid
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
# cs.shield = True
|
cs.shield = True
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Cancelling stream {cid} to "
|
f"Cancelling stream {cid} to "
|
||||||
f"{self._portal.channel.uid}")
|
f"{self._portal.channel.uid}")
|
||||||
|
|
|
@ -254,7 +254,7 @@ async def _invoke(
|
||||||
fname = func.__name__
|
fname = func.__name__
|
||||||
cs: trio.CancelScope = ctx._scope
|
cs: trio.CancelScope = ctx._scope
|
||||||
if cs.cancel_called:
|
if cs.cancel_called:
|
||||||
canceller = ctx._cancel_called_remote
|
canceller = ctx._cancelled_remote
|
||||||
# await _debug.breakpoint()
|
# await _debug.breakpoint()
|
||||||
|
|
||||||
# NOTE / TODO: if we end up having
|
# NOTE / TODO: if we end up having
|
||||||
|
@ -505,7 +505,7 @@ class Actor:
|
||||||
self.uid = (name, uid or str(uuid.uuid4()))
|
self.uid = (name, uid or str(uuid.uuid4()))
|
||||||
|
|
||||||
self._cancel_complete = trio.Event()
|
self._cancel_complete = trio.Event()
|
||||||
self._cancel_called_remote: tuple[str, tuple] | None = None
|
self._cancel_called_by_remote: tuple[str, tuple] | None = None
|
||||||
self._cancel_called: bool = False
|
self._cancel_called: bool = False
|
||||||
|
|
||||||
# retreive and store parent `__main__` data which
|
# retreive and store parent `__main__` data which
|
||||||
|
@ -1069,7 +1069,7 @@ class Actor:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log.cancel(f"{self.uid} is trying to cancel")
|
log.cancel(f"{self.uid} is trying to cancel")
|
||||||
self._cancel_called_remote: tuple = requesting_uid
|
self._cancel_called_by_remote: tuple = requesting_uid
|
||||||
self._cancel_called = True
|
self._cancel_called = True
|
||||||
|
|
||||||
# cancel all ongoing rpc tasks
|
# cancel all ongoing rpc tasks
|
||||||
|
@ -1141,10 +1141,10 @@ class Actor:
|
||||||
f"peer: {chan.uid}\n")
|
f"peer: {chan.uid}\n")
|
||||||
|
|
||||||
if (
|
if (
|
||||||
ctx._cancel_called_remote is None
|
ctx._cancelled_remote is None
|
||||||
and requesting_uid
|
and requesting_uid
|
||||||
):
|
):
|
||||||
ctx._cancel_called_remote: tuple = requesting_uid
|
ctx._cancelled_remote: tuple = requesting_uid
|
||||||
|
|
||||||
# don't allow cancelling this function mid-execution
|
# don't allow cancelling this function mid-execution
|
||||||
# (is this necessary?)
|
# (is this necessary?)
|
||||||
|
|
Loading…
Reference in New Issue