forked from goodboy/tractor
1
0
Fork 0

Do immediate remote task cancels

As for `Actor.cancel()` requests, do the same for
`Actor._cancel_task()` but use `_invoke()` to ensure
correct msg transactions with caller. Don't cancel task
cancels on a cancel-all-tasks operation in attempt at
more determinism.
immediate_remote_cancels
Tyler Goodlet 2021-10-10 11:42:32 -04:00
parent 41f0992445
commit bb9d9c74b1
1 changed files with 33 additions and 30 deletions

View File

@ -682,29 +682,30 @@ class Actor:
await pdb_complete.wait() await pdb_complete.wait()
# we immediately start the runtime machinery shutdown # we immediately start the runtime machinery shutdown
await _invoke(self, cid, chan, func, kwargs, is_rpc=False) with trio.CancelScope(shield=True):
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
# await self._cancel_complete.wait()
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel() loop_cs.cancel()
break break
elif funcname == '_cancel_task': if funcname == '_cancel_task':
# XXX: a special case is made here for
# remote calls since we don't want the
# remote actor have to know which channel
# the task is associated with and we can't
# pass non-primitive types between actors.
# This means you can use:
# Portal.run('self', '_cancel_task, cid=did)
# without passing the `chan` arg.
kwargs['chan'] = chan
# we immediately start the runtime machinery shutdown
with trio.CancelScope(shield=True):
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
kwargs['chan'] = chan
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
continue
else: else:
# complain to client about restricted modules # complain to client about restricted modules
try: try:
@ -995,7 +996,7 @@ class Actor:
raise raise
finally: finally:
log.info("Root nursery complete") log.info("Runtime nursery complete")
# tear down all lifetime contexts if not in guest mode # tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint? # XXX: should this just be in the entrypoint?
@ -1094,7 +1095,7 @@ class Actor:
self._service_n.start_soon(self.cancel) self._service_n.start_soon(self.cancel)
async def cancel(self) -> bool: async def cancel(self) -> bool:
"""Cancel this actor. """Cancel this actor's runtime.
The "deterministic" teardown sequence in order is: The "deterministic" teardown sequence in order is:
- cancel all ongoing rpc tasks by cancel scope - cancel all ongoing rpc tasks by cancel scope
@ -1187,18 +1188,20 @@ class Actor:
registered for each. registered for each.
""" """
tasks = self._rpc_tasks tasks = self._rpc_tasks
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") if tasks:
for (chan, cid) in tasks.copy(): log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
if only_chan is not None: for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
if only_chan != chan: if only_chan is not None:
continue if only_chan != chan:
continue
# TODO: this should really done in a nursery batch # TODO: this should really done in a nursery batch
await self._cancel_task(cid, chan) if func != self._cancel_task:
await self._cancel_task(cid, chan)
log.cancel( log.cancel(
f"Waiting for remaining rpc tasks to complete {tasks}") f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None: def cancel_server(self) -> None:
"""Cancel the internal channel server nursery thereby """Cancel the internal channel server nursery thereby