forked from goodboy/tractor
1
0
Fork 0

Make actor runtime cancellation immediate

immediate_remote_cancels
Tyler Goodlet 2021-10-07 23:13:47 -04:00
parent dfeebd6382
commit 7643bbf183
1 changed files with 58 additions and 32 deletions

View File

@ -503,8 +503,8 @@ class Actor:
log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.runtime("Signalling no more peer channels")
self._no_more_peers.set()
# # XXX: is this necessary (GC should do it?)
if chan.connected():
@ -671,7 +671,28 @@ class Actor:
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
if funcname == '_cancel_task':
if funcname == 'cancel':
# don't start entire actor runtime cancellation if this
# actor is in debug mode
pdb_complete = _debug._local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery shutdown
await _invoke(self, cid, chan, func, kwargs)
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel()
break
elif funcname == '_cancel_task':
# XXX: a special case is made here for
# remote calls since we don't want the
# remote actor have to know which channel
@ -681,6 +702,7 @@ class Actor:
# Portal.run('self', '_cancel_task, cid=did)
# without passing the `chan` arg.
kwargs['chan'] = chan
else:
# complain to client about restricted modules
try:
@ -706,37 +728,28 @@ class Actor:
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
if func != self.cancel:
if isinstance(cs, Exception):
log.warning(
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
else:
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
# if func != self.cancel:
if isinstance(cs, Exception):
log.warning(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel()
break
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
else:
# channel disconnect
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
await self.cancel_rpc_tasks(chan)
# end of async for, channel disconnect vis ``trio.EndOfChannel``
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
await self.cancel_rpc_tasks(chan)
except (
TransportClosed,
@ -947,6 +960,9 @@ class Actor:
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
log.info("Closing all actor lifetime contexts")
_lifetime_stack.close()
if not registered_with_arbiter:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
@ -976,11 +992,21 @@ class Actor:
raise
finally:
log.runtime("Root nursery complete")
log.info("Root nursery complete")
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
log.cancel("Closing all actor lifetime contexts")
log.info("Closing all actor lifetime contexts")
# TODO: we can't actually do this bc the debugger
# uses the _service_n to spawn the lock task, BUT,
# in theory if we had the root nursery surround this finally
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if self.name == 'brokerd.ib':
# with trio.CancelScope(shield=True):
# await _debug.breakpoint()
_lifetime_stack.close()
# Unregister actor from the arbiter
@ -1099,7 +1125,7 @@ class Actor:
if self._service_n:
self._service_n.cancel_scope.cancel()
log.cancel(f"{self.uid} was sucessfullly cancelled")
log.cancel(f"{self.uid} called `Actor.cancel()`")
self._cancel_complete.set()
return True