Try out always delivering `ContextCancelled`
Previously, on a task cancel request there was no real response other then the `None` returned from the `Actor._cancel_task()` method and sometimes this might get lost if the cancel task was cancelled by a runtime cancel request (i.e. an "actor cancel"). Instead let's try always checking if the task's cancel scope is cancelled and if so relay back to the caller a `ContextCancelled` which can then be explicitly handled by actor nursery machinery as well as individual cancel APIs (`Portal.cancel_actor()`, and maybe later if we decide to expose the `tractor.Context` on every `Portal.run()` call). Also, - fix up a bunch of cancellation related logging - add an `Actor.cancel_called` flag much like `trio`'s cancel scopezombie_lord_infinite
parent
cef9ab7353
commit
66137030d9
|
@ -194,7 +194,17 @@ async def _invoke(
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
if cs.cancelled_caught:
|
||||||
|
raise ContextCancelled(
|
||||||
|
'cancelled',
|
||||||
|
suberror_type=trio.Cancelled,
|
||||||
|
)
|
||||||
|
|
||||||
|
except (
|
||||||
|
Exception,
|
||||||
|
trio.MultiError,
|
||||||
|
# trio.Cancelled,
|
||||||
|
) as err:
|
||||||
|
|
||||||
if not is_multi_cancelled(err):
|
if not is_multi_cancelled(err):
|
||||||
|
|
||||||
|
@ -248,7 +258,7 @@ async def _invoke(
|
||||||
# If we're cancelled before the task returns then the
|
# If we're cancelled before the task returns then the
|
||||||
# cancel scope will not have been inserted yet
|
# cancel scope will not have been inserted yet
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Task {func} likely errored or cancelled before it started")
|
f"Task {func} likely errored or cancelled before start")
|
||||||
finally:
|
finally:
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
log.runtime("All RPC tasks have completed")
|
log.runtime("All RPC tasks have completed")
|
||||||
|
@ -358,6 +368,14 @@ class Actor:
|
||||||
Tuple[Any, Any, Any, Any, Any]] = None
|
Tuple[Any, Any, Any, Any, Any]] = None
|
||||||
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
|
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cancel_called(self) -> bool:
|
||||||
|
'''
|
||||||
|
Same principle as ``trio.CancelScope.cancel_called``.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._cancel_called
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self, uid: Tuple[str, str]
|
self, uid: Tuple[str, str]
|
||||||
) -> Tuple[trio.Event, Channel]:
|
) -> Tuple[trio.Event, Channel]:
|
||||||
|
@ -618,6 +636,7 @@ class Actor:
|
||||||
# worked out we'll likely want to use that!
|
# worked out we'll likely want to use that!
|
||||||
msg = None
|
msg = None
|
||||||
nursery_cancelled_before_task: bool = False
|
nursery_cancelled_before_task: bool = False
|
||||||
|
uid = chan.uid
|
||||||
|
|
||||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||||
try:
|
try:
|
||||||
|
@ -641,7 +660,7 @@ class Actor:
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Msg loop signalled to terminate for"
|
f"Msg loop signalled to terminate for"
|
||||||
f" {chan} from {chan.uid}")
|
f" {chan} from {uid}")
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -676,38 +695,46 @@ class Actor:
|
||||||
f"{ns}.{funcname}({kwargs})")
|
f"{ns}.{funcname}({kwargs})")
|
||||||
if ns == 'self':
|
if ns == 'self':
|
||||||
func = getattr(self, funcname)
|
func = getattr(self, funcname)
|
||||||
if funcname == 'cancel':
|
|
||||||
|
|
||||||
# don't start entire actor runtime cancellation if this
|
if funcname == 'cancel':
|
||||||
# actor is in debug mode
|
# self.cancel() was called so kill this
|
||||||
|
# msg loop and break out into
|
||||||
|
# ``_async_main()``
|
||||||
|
|
||||||
|
log.cancel(
|
||||||
|
f"{self.uid} remote cancel msg from {uid}")
|
||||||
|
|
||||||
|
# don't start entire actor runtime
|
||||||
|
# cancellation if this actor is in debug
|
||||||
|
# mode
|
||||||
pdb_complete = _debug._local_pdb_complete
|
pdb_complete = _debug._local_pdb_complete
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
|
log.cancel(
|
||||||
|
f'{self.uid} is in debug, wait for unlock')
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
|
||||||
# we immediately start the runtime machinery shutdown
|
# we immediately start the runtime machinery
|
||||||
|
# shutdown
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
# self.cancel() was called so kill this msg loop
|
await _invoke(
|
||||||
# and break out into ``_async_main()``
|
self, cid, chan, func, kwargs, is_rpc=False
|
||||||
log.cancel(
|
)
|
||||||
f"Actor {self.uid} was remotely cancelled; "
|
|
||||||
"waiting on cancellation completion..")
|
|
||||||
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
|
|
||||||
# await self._cancel_complete.wait()
|
|
||||||
|
|
||||||
loop_cs.cancel()
|
loop_cs.cancel()
|
||||||
break
|
continue
|
||||||
|
|
||||||
if funcname == '_cancel_task':
|
if funcname == '_cancel_task':
|
||||||
|
task_cid = kwargs['cid']
|
||||||
# we immediately start the runtime machinery shutdown
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
# self.cancel() was called so kill this msg loop
|
|
||||||
# and break out into ``_async_main()``
|
|
||||||
kwargs['chan'] = chan
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Actor {self.uid} was remotely cancelled; "
|
f'Actor {uid} requests cancel for {task_cid}')
|
||||||
"waiting on cancellation completion..")
|
|
||||||
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
|
# we immediately start the runtime machinery
|
||||||
|
# shutdown
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
kwargs['chan'] = chan
|
||||||
|
await _invoke(
|
||||||
|
self, cid, chan, func, kwargs, is_rpc=False
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# complain to client about restricted modules
|
# complain to client about restricted modules
|
||||||
|
@ -752,7 +779,8 @@ class Actor:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
|
|
||||||
# end of async for, channel disconnect vis ``trio.EndOfChannel``
|
# end of async for, channel disconnect vis
|
||||||
|
# ``trio.EndOfChannel``
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||||
)
|
)
|
||||||
|
@ -1193,7 +1221,10 @@ class Actor:
|
||||||
tasks = self._rpc_tasks
|
tasks = self._rpc_tasks
|
||||||
if tasks:
|
if tasks:
|
||||||
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||||
for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
|
for (
|
||||||
|
(chan, cid),
|
||||||
|
(scope, func, is_complete),
|
||||||
|
) in tasks.copy().items():
|
||||||
if only_chan is not None:
|
if only_chan is not None:
|
||||||
if only_chan != chan:
|
if only_chan != chan:
|
||||||
continue
|
continue
|
||||||
|
@ -1202,6 +1233,7 @@ class Actor:
|
||||||
if func != self._cancel_task:
|
if func != self._cancel_task:
|
||||||
await self._cancel_task(cid, chan)
|
await self._cancel_task(cid, chan)
|
||||||
|
|
||||||
|
if tasks:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Waiting for remaining rpc tasks to complete {tasks}")
|
f"Waiting for remaining rpc tasks to complete {tasks}")
|
||||||
await self._ongoing_rpc_tasks.wait()
|
await self._ongoing_rpc_tasks.wait()
|
||||||
|
|
Loading…
Reference in New Issue