forked from goodboy/tractor
1
0
Fork 0

Add better actor cancellation tracking

Add `Actor._cancel_called` and `._cancel_complete` making it possible to
determine whether the actor has started the cancellation sequence and
whether that sequence has fully completed. This allows for blocking in
internal machinery tasks as necessary. Also, always trigger the end of
ongoing rpc tasks even if the last task errors; there's no guarantee the
trio cancellation semantics will guarantee us a nice internal "state"
without this.
bug_in_debug
Tyler Goodlet 2020-10-13 11:48:52 -04:00
parent 0ce6d2b55c
commit 573b8fef73
1 changed files with 25 additions and 13 deletions

View File

@ -152,7 +152,7 @@ async def _invoke(
# cancel scope will not have been inserted yet
log.warn(
f"Task {func} likely errored or cancelled before it started")
finally:
if not actor._rpc_tasks:
log.info("All RPC tasks have completed")
actor._ongoing_rpc_tasks.set()
@ -198,7 +198,9 @@ class Actor:
"""
self.name = name
self.uid = (name, uid or str(uuid.uuid4()))
self._is_cancelled: bool = False
self._cancel_complete = trio.Event()
self._cancel_called: bool = False
# retreive and store parent `__main__` data which
# will be passed to children
@ -531,7 +533,10 @@ class Actor:
else:
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.warning(f"{self.uid} was remotely cancelled")
log.warning(
f"{self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel()
break
@ -540,8 +545,9 @@ class Actor:
else:
# channel disconnect
log.debug(
f"{chan} from {chan.uid} disconnected, cancelling all rpc tasks")
await self.cancel_rpc_tasks(chan)
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
self.cancel_rpc_tasks(chan)
except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke")
@ -833,7 +839,8 @@ class Actor:
spawning new rpc tasks
- return control the parent channel message loop
"""
self._is_cancelled = True
log.warning(f"{self.uid} is trying to cancel")
self._cancel_called = True
# cancel all ongoing rpc tasks
with trio.CancelScope(shield=True):
@ -848,14 +855,16 @@ class Actor:
# kill all ongoing tasks
await self.cancel_rpc_tasks()
# cancel all rpc tasks permanently
if self._service_n:
self._service_n.cancel_scope.cancel()
# stop channel server
self.cancel_server()
await self._server_down.wait()
# rekt all channel loops
if self._service_n:
self._service_n.cancel_scope.cancel()
log.warning(f"{self.uid} was sucessfullly cancelled")
self._cancel_complete.set()
return True
# XXX: hard kill logic if needed?
@ -895,6 +904,9 @@ class Actor:
scope.cancel()
# wait for _invoke to mark the task complete
log.debug(
f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
await is_complete.wait()
log.debug(
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"