forked from goodboy/tractor
Make actor runtime cancellation immediate
parent
f72eabd42a
commit
2afbc3898f
|
@ -506,8 +506,8 @@ class Actor:
|
||||||
log.runtime(f"Peers is {self._peers}")
|
log.runtime(f"Peers is {self._peers}")
|
||||||
|
|
||||||
if not self._peers: # no more channels connected
|
if not self._peers: # no more channels connected
|
||||||
self._no_more_peers.set()
|
|
||||||
log.runtime("Signalling no more peer channels")
|
log.runtime("Signalling no more peer channels")
|
||||||
|
self._no_more_peers.set()
|
||||||
|
|
||||||
# # XXX: is this necessary (GC should do it?)
|
# # XXX: is this necessary (GC should do it?)
|
||||||
if chan.connected():
|
if chan.connected():
|
||||||
|
@ -674,7 +674,28 @@ class Actor:
|
||||||
f"{ns}.{funcname}({kwargs})")
|
f"{ns}.{funcname}({kwargs})")
|
||||||
if ns == 'self':
|
if ns == 'self':
|
||||||
func = getattr(self, funcname)
|
func = getattr(self, funcname)
|
||||||
if funcname == '_cancel_task':
|
if funcname == 'cancel':
|
||||||
|
|
||||||
|
# don't start entire actor runtime cancellation if this
|
||||||
|
# actor is in debug mode
|
||||||
|
pdb_complete = _debug._local_pdb_complete
|
||||||
|
if pdb_complete:
|
||||||
|
await pdb_complete.wait()
|
||||||
|
|
||||||
|
# we immediately start the runtime machinery shutdown
|
||||||
|
await _invoke(self, cid, chan, func, kwargs)
|
||||||
|
|
||||||
|
# self.cancel() was called so kill this msg loop
|
||||||
|
# and break out into ``_async_main()``
|
||||||
|
log.cancel(
|
||||||
|
f"Actor {self.uid} was remotely cancelled; "
|
||||||
|
"waiting on cancellation completion..")
|
||||||
|
await self._cancel_complete.wait()
|
||||||
|
loop_cs.cancel()
|
||||||
|
break
|
||||||
|
|
||||||
|
elif funcname == '_cancel_task':
|
||||||
|
|
||||||
# XXX: a special case is made here for
|
# XXX: a special case is made here for
|
||||||
# remote calls since we don't want the
|
# remote calls since we don't want the
|
||||||
# remote actor have to know which channel
|
# remote actor have to know which channel
|
||||||
|
@ -684,6 +705,7 @@ class Actor:
|
||||||
# Portal.run('self', '_cancel_task, cid=did)
|
# Portal.run('self', '_cancel_task, cid=did)
|
||||||
# without passing the `chan` arg.
|
# without passing the `chan` arg.
|
||||||
kwargs['chan'] = chan
|
kwargs['chan'] = chan
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# complain to client about restricted modules
|
# complain to client about restricted modules
|
||||||
try:
|
try:
|
||||||
|
@ -709,37 +731,28 @@ class Actor:
|
||||||
|
|
||||||
# never allow cancelling cancel requests (results in
|
# never allow cancelling cancel requests (results in
|
||||||
# deadlock and other weird behaviour)
|
# deadlock and other weird behaviour)
|
||||||
if func != self.cancel:
|
# if func != self.cancel:
|
||||||
if isinstance(cs, Exception):
|
if isinstance(cs, Exception):
|
||||||
log.warning(
|
|
||||||
f"Task for RPC func {func} failed with"
|
|
||||||
f"{cs}")
|
|
||||||
else:
|
|
||||||
# mark that we have ongoing rpc tasks
|
|
||||||
self._ongoing_rpc_tasks = trio.Event()
|
|
||||||
log.runtime(f"RPC func is {func}")
|
|
||||||
# store cancel scope such that the rpc task can be
|
|
||||||
# cancelled gracefully if requested
|
|
||||||
self._rpc_tasks[(chan, cid)] = (
|
|
||||||
cs, func, trio.Event())
|
|
||||||
else:
|
|
||||||
# self.cancel() was called so kill this msg loop
|
|
||||||
# and break out into ``_async_main()``
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Actor {self.uid} was remotely cancelled; "
|
f"Task for RPC func {func} failed with"
|
||||||
"waiting on cancellation completion..")
|
f"{cs}")
|
||||||
await self._cancel_complete.wait()
|
else:
|
||||||
loop_cs.cancel()
|
# mark that we have ongoing rpc tasks
|
||||||
break
|
self._ongoing_rpc_tasks = trio.Event()
|
||||||
|
log.runtime(f"RPC func is {func}")
|
||||||
|
# store cancel scope such that the rpc task can be
|
||||||
|
# cancelled gracefully if requested
|
||||||
|
self._rpc_tasks[(chan, cid)] = (
|
||||||
|
cs, func, trio.Event())
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
else:
|
|
||||||
# channel disconnect
|
# end of async for, channel disconnect vis ``trio.EndOfChannel``
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||||
)
|
)
|
||||||
await self.cancel_rpc_tasks(chan)
|
await self.cancel_rpc_tasks(chan)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
|
@ -950,6 +963,9 @@ class Actor:
|
||||||
# Blocks here as expected until the root nursery is
|
# Blocks here as expected until the root nursery is
|
||||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
log.info("Closing all actor lifetime contexts")
|
||||||
|
_lifetime_stack.close()
|
||||||
|
|
||||||
if not registered_with_arbiter:
|
if not registered_with_arbiter:
|
||||||
# TODO: I guess we could try to connect back
|
# TODO: I guess we could try to connect back
|
||||||
# to the parent through a channel and engage a debugger
|
# to the parent through a channel and engage a debugger
|
||||||
|
@ -979,11 +995,21 @@ class Actor:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.runtime("Root nursery complete")
|
log.info("Root nursery complete")
|
||||||
|
|
||||||
# tear down all lifetime contexts if not in guest mode
|
# tear down all lifetime contexts if not in guest mode
|
||||||
# XXX: should this just be in the entrypoint?
|
# XXX: should this just be in the entrypoint?
|
||||||
log.cancel("Closing all actor lifetime contexts")
|
log.info("Closing all actor lifetime contexts")
|
||||||
|
|
||||||
|
# TODO: we can't actually do this bc the debugger
|
||||||
|
# uses the _service_n to spawn the lock task, BUT,
|
||||||
|
# in theory if we had the root nursery surround this finally
|
||||||
|
# block it might be actually possible to debug THIS
|
||||||
|
# machinery in the same way as user task code?
|
||||||
|
# if self.name == 'brokerd.ib':
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await _debug.breakpoint()
|
||||||
|
|
||||||
_lifetime_stack.close()
|
_lifetime_stack.close()
|
||||||
|
|
||||||
# Unregister actor from the arbiter
|
# Unregister actor from the arbiter
|
||||||
|
@ -1102,7 +1128,7 @@ class Actor:
|
||||||
if self._service_n:
|
if self._service_n:
|
||||||
self._service_n.cancel_scope.cancel()
|
self._service_n.cancel_scope.cancel()
|
||||||
|
|
||||||
log.cancel(f"{self.uid} was sucessfullly cancelled")
|
log.cancel(f"{self.uid} called `Actor.cancel()`")
|
||||||
self._cancel_complete.set()
|
self._cancel_complete.set()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue