Guard against TCP server never started on cancel
parent
dbe5d96d66
commit
6b0366fe04
|
@ -248,7 +248,7 @@ async def _invoke(
|
||||||
# If we're cancelled before the task returns then the
|
# If we're cancelled before the task returns then the
|
||||||
# cancel scope will not have been inserted yet
|
# cancel scope will not have been inserted yet
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Task {func} likely errored or cancelled before it started")
|
f"Task {func} likely errored or cancelled before start")
|
||||||
finally:
|
finally:
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
log.runtime("All RPC tasks have completed")
|
log.runtime("All RPC tasks have completed")
|
||||||
|
@ -282,6 +282,9 @@ class Actor:
|
||||||
_parent_main_data: Dict[str, str]
|
_parent_main_data: Dict[str, str]
|
||||||
_parent_chan_cs: Optional[trio.CancelScope] = None
|
_parent_chan_cs: Optional[trio.CancelScope] = None
|
||||||
|
|
||||||
|
# syncs for setup/teardown sequences
|
||||||
|
_server_down: Optional[trio.Event] = None
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -675,36 +678,44 @@ class Actor:
|
||||||
func = getattr(self, funcname)
|
func = getattr(self, funcname)
|
||||||
if funcname == 'cancel':
|
if funcname == 'cancel':
|
||||||
|
|
||||||
# don't start entire actor runtime cancellation if this
|
# don't start entire actor runtime
|
||||||
# actor is in debug mode
|
# cancellation if this actor is in debug
|
||||||
|
# mode
|
||||||
pdb_complete = _debug._local_pdb_complete
|
pdb_complete = _debug._local_pdb_complete
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
|
||||||
# we immediately start the runtime machinery shutdown
|
# we immediately start the runtime machinery
|
||||||
|
# shutdown
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
# self.cancel() was called so kill this msg loop
|
# self.cancel() was called so kill this
|
||||||
# and break out into ``_async_main()``
|
# msg loop and break out into
|
||||||
|
# ``_async_main()``
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Actor {self.uid} was remotely cancelled; "
|
f"Actor {self.uid} was remotely cancelled;"
|
||||||
"waiting on cancellation completion..")
|
" waiting on cancellation completion..")
|
||||||
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
|
await _invoke(
|
||||||
# await self._cancel_complete.wait()
|
self, cid, chan, func, kwargs, is_rpc=False
|
||||||
|
)
|
||||||
|
|
||||||
loop_cs.cancel()
|
loop_cs.cancel()
|
||||||
break
|
break
|
||||||
|
|
||||||
if funcname == '_cancel_task':
|
if funcname == '_cancel_task':
|
||||||
|
|
||||||
# we immediately start the runtime machinery shutdown
|
# we immediately start the runtime machinery
|
||||||
|
# shutdown
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
# self.cancel() was called so kill this msg loop
|
# self.cancel() was called so kill this
|
||||||
# and break out into ``_async_main()``
|
# msg loop and break out into
|
||||||
|
# ``_async_main()``
|
||||||
kwargs['chan'] = chan
|
kwargs['chan'] = chan
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Actor {self.uid} was remotely cancelled; "
|
f"Actor {self.uid} was remotely cancelled;"
|
||||||
"waiting on cancellation completion..")
|
" waiting on cancellation completion..")
|
||||||
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
|
await _invoke(
|
||||||
|
self, cid, chan, func, kwargs, is_rpc=False
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# complain to client about restricted modules
|
# complain to client about restricted modules
|
||||||
|
@ -749,7 +760,8 @@ class Actor:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
|
|
||||||
# end of async for, channel disconnect vis ``trio.EndOfChannel``
|
# end of async for, channel disconnect vis
|
||||||
|
# ``trio.EndOfChannel``
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||||
)
|
)
|
||||||
|
@ -1123,7 +1135,11 @@ class Actor:
|
||||||
|
|
||||||
# stop channel server
|
# stop channel server
|
||||||
self.cancel_server()
|
self.cancel_server()
|
||||||
await self._server_down.wait()
|
if self._server_down is not None:
|
||||||
|
await self._server_down.wait()
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
f'{self.uid} was likely cancelled before it started')
|
||||||
|
|
||||||
# cancel all rpc tasks permanently
|
# cancel all rpc tasks permanently
|
||||||
if self._service_n:
|
if self._service_n:
|
||||||
|
@ -1190,7 +1206,10 @@ class Actor:
|
||||||
tasks = self._rpc_tasks
|
tasks = self._rpc_tasks
|
||||||
if tasks:
|
if tasks:
|
||||||
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||||
for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
|
for (
|
||||||
|
(chan, cid),
|
||||||
|
(scope, func, is_complete),
|
||||||
|
) in tasks.copy().items():
|
||||||
if only_chan is not None:
|
if only_chan is not None:
|
||||||
if only_chan != chan:
|
if only_chan != chan:
|
||||||
continue
|
continue
|
||||||
|
@ -1250,14 +1269,16 @@ class Actor:
|
||||||
|
|
||||||
|
|
||||||
class Arbiter(Actor):
|
class Arbiter(Actor):
|
||||||
"""A special actor who knows all the other actors and always has
|
'''
|
||||||
|
A special actor who knows all the other actors and always has
|
||||||
access to a top level nursery.
|
access to a top level nursery.
|
||||||
|
|
||||||
The arbiter is by default the first actor spawned on each host
|
The arbiter is by default the first actor spawned on each host
|
||||||
and is responsible for keeping track of all other actors for
|
and is responsible for keeping track of all other actors for
|
||||||
coordination purposes. If a new main process is launched and an
|
coordination purposes. If a new main process is launched and an
|
||||||
arbiter is already running that arbiter will be used.
|
arbiter is already running that arbiter will be used.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
is_arbiter = True
|
is_arbiter = True
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
|
Loading…
Reference in New Issue