commit
7ddc4db041
|
@ -150,7 +150,7 @@ async def _invoke(
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# If we're cancelled before the task returns then the
|
# If we're cancelled before the task returns then the
|
||||||
# cancel scope will not have been inserted yet
|
# cancel scope will not have been inserted yet
|
||||||
log.warn(
|
log.warning(
|
||||||
f"Task {func} likely errored or cancelled before it started")
|
f"Task {func} likely errored or cancelled before it started")
|
||||||
finally:
|
finally:
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
|
@ -520,7 +520,7 @@ class Actor:
|
||||||
# deadlock and other weird behaviour)
|
# deadlock and other weird behaviour)
|
||||||
if func != self.cancel:
|
if func != self.cancel:
|
||||||
if isinstance(cs, Exception):
|
if isinstance(cs, Exception):
|
||||||
log.warn(f"Task for RPC func {func} failed with"
|
log.warning(f"Task for RPC func {func} failed with"
|
||||||
f"{cs}")
|
f"{cs}")
|
||||||
else:
|
else:
|
||||||
# mark that we have ongoing rpc tasks
|
# mark that we have ongoing rpc tasks
|
||||||
|
@ -547,7 +547,7 @@ class Actor:
|
||||||
log.debug(
|
log.debug(
|
||||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||||
)
|
)
|
||||||
self.cancel_rpc_tasks(chan)
|
await self.cancel_rpc_tasks(chan)
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.error(f"{chan} form {chan.uid} broke")
|
log.error(f"{chan} form {chan.uid} broke")
|
||||||
|
@ -757,7 +757,7 @@ class Actor:
|
||||||
|
|
||||||
# tear down all lifetime contexts
|
# tear down all lifetime contexts
|
||||||
# api idea: ``tractor.open_context()``
|
# api idea: ``tractor.open_context()``
|
||||||
log.warn("Closing all actor lifetime contexts")
|
log.warning("Closing all actor lifetime contexts")
|
||||||
self._lifetime_stack.close()
|
self._lifetime_stack.close()
|
||||||
|
|
||||||
# Unregister actor from the arbiter
|
# Unregister actor from the arbiter
|
||||||
|
@ -855,14 +855,14 @@ class Actor:
|
||||||
# kill all ongoing tasks
|
# kill all ongoing tasks
|
||||||
await self.cancel_rpc_tasks()
|
await self.cancel_rpc_tasks()
|
||||||
|
|
||||||
# cancel all rpc tasks permanently
|
|
||||||
if self._service_n:
|
|
||||||
self._service_n.cancel_scope.cancel()
|
|
||||||
|
|
||||||
# stop channel server
|
# stop channel server
|
||||||
self.cancel_server()
|
self.cancel_server()
|
||||||
await self._server_down.wait()
|
await self._server_down.wait()
|
||||||
|
|
||||||
|
# cancel all rpc tasks permanently
|
||||||
|
if self._service_n:
|
||||||
|
self._service_n.cancel_scope.cancel()
|
||||||
|
|
||||||
log.warning(f"{self.uid} was sucessfullly cancelled")
|
log.warning(f"{self.uid} was sucessfullly cancelled")
|
||||||
self._cancel_complete.set()
|
self._cancel_complete.set()
|
||||||
return True
|
return True
|
||||||
|
@ -1083,18 +1083,12 @@ async def _start_actor(
|
||||||
try:
|
try:
|
||||||
result = await main()
|
result = await main()
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
try:
|
log.exception("Actor crashed:")
|
||||||
log.exception("Actor crashed:")
|
await _debug._maybe_enter_pm(err)
|
||||||
await _debug._maybe_enter_pm(err)
|
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
finally:
|
await actor.cancel()
|
||||||
await actor.cancel()
|
|
||||||
|
|
||||||
# XXX: the actor is cancelled when this context is complete
|
|
||||||
# given that there are no more active peer channels connected
|
|
||||||
actor.cancel_server()
|
|
||||||
|
|
||||||
# unset module state
|
# unset module state
|
||||||
_state._current_actor = None
|
_state._current_actor = None
|
||||||
|
|
|
@ -234,8 +234,8 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
|
||||||
os.close(child_w)
|
os.close(child_w)
|
||||||
else:
|
else:
|
||||||
# This shouldn't happen really
|
# This shouldn't happen really
|
||||||
warnings.warn('forkserver: waitpid returned '
|
warnings.warning('forkserver: waitpid returned '
|
||||||
'unexpected pid %d' % pid)
|
'unexpected pid %d' % pid)
|
||||||
|
|
||||||
if listener in rfds:
|
if listener in rfds:
|
||||||
# Incoming fork request
|
# Incoming fork request
|
||||||
|
|
|
@ -275,7 +275,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
# ria_nursery scope end
|
# ria_nursery scope end
|
||||||
|
|
||||||
# XXX: do we need a `trio.Cancelled` catch here as well?
|
# XXX: do we need a `trio.Cancelled` catch here as well?
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError, trio.Cancelled) as err:
|
||||||
# If actor-local error was raised while waiting on
|
# If actor-local error was raised while waiting on
|
||||||
# ".run_in_actor()" actors then we also want to cancel all
|
# ".run_in_actor()" actors then we also want to cancel all
|
||||||
# remaining sub-actors (due to our lone strategy:
|
# remaining sub-actors (due to our lone strategy:
|
||||||
|
|
Loading…
Reference in New Issue