forked from goodboy/tractor
1
0
Fork 0

Merge pull request #263 from goodboy/early_deth_fixes

Early deth fixes
faster_daemon_cancels
goodboy 2021-11-08 21:23:09 -05:00 committed by GitHub
commit b527fdbe1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 22 deletions

View File

@ -248,7 +248,7 @@ async def _invoke(
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warning(
f"Task {func} likely errored or cancelled before it started")
f"Task {func} likely errored or cancelled before start")
finally:
if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed")
@ -282,6 +282,9 @@ class Actor:
_parent_main_data: Dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None
# syncs for setup/teardown sequences
_server_down: Optional[trio.Event] = None
def __init__(
self,
name: str,
@ -675,36 +678,44 @@ class Actor:
func = getattr(self, funcname)
if funcname == 'cancel':
# don't start entire actor runtime cancellation if this
# actor is in debug mode
# don't start entire actor runtime
# cancellation if this actor is in debug
# mode
pdb_complete = _debug._local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery shutdown
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
# self.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
# await self._cancel_complete.wait()
f"Actor {self.uid} was remotely cancelled;"
" waiting on cancellation completion..")
await _invoke(
self, cid, chan, func, kwargs, is_rpc=False
)
loop_cs.cancel()
break
if funcname == '_cancel_task':
# we immediately start the runtime machinery shutdown
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
# self.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
kwargs['chan'] = chan
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
f"Actor {self.uid} was remotely cancelled;"
" waiting on cancellation completion..")
await _invoke(
self, cid, chan, func, kwargs, is_rpc=False
)
continue
else:
# complain to client about restricted modules
@ -749,7 +760,8 @@ class Actor:
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
# end of async for, channel disconnect vis ``trio.EndOfChannel``
# end of async for, channel disconnect vis
# ``trio.EndOfChannel``
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
@ -1123,7 +1135,11 @@ class Actor:
# stop channel server
self.cancel_server()
await self._server_down.wait()
if self._server_down is not None:
await self._server_down.wait()
else:
log.warning(
f'{self.uid} was likely cancelled before it started')
# cancel all rpc tasks permanently
if self._service_n:
@ -1190,7 +1206,10 @@ class Actor:
tasks = self._rpc_tasks
if tasks:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
for (
(chan, cid),
(scope, func, is_complete),
) in tasks.copy().items():
if only_chan is not None:
if only_chan != chan:
continue
@ -1250,14 +1269,16 @@ class Actor:
class Arbiter(Actor):
"""A special actor who knows all the other actors and always has
'''
A special actor who knows all the other actors and always has
access to a top level nursery.
The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used.
"""
'''
is_arbiter = True
def __init__(self, *args, **kwargs):

View File

@ -557,8 +557,13 @@ async def acquire_debug_lock(
'''
Grab root's debug lock on entry, release on exit.
This helper is for actor's who don't actually need
to acquired the debugger but want to wait until the
lock is free in the tree root.
'''
if not debug_mode():
yield None
return
async with trio.open_nursery() as n:
@ -627,4 +632,3 @@ async def maybe_wait_for_debugger(
log.warning(
'Root acquired TTY LOCK'
)
return