diff --git a/tractor/_actor.py b/tractor/_actor.py index e85b16b..a9ce438 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -248,7 +248,7 @@ async def _invoke( # If we're cancelled before the task returns then the # cancel scope will not have been inserted yet log.warning( - f"Task {func} likely errored or cancelled before it started") + f"Task {func} likely errored or cancelled before start") finally: if not actor._rpc_tasks: log.runtime("All RPC tasks have completed") @@ -282,6 +282,9 @@ class Actor: _parent_main_data: Dict[str, str] _parent_chan_cs: Optional[trio.CancelScope] = None + # syncs for setup/teardown sequences + _server_down: Optional[trio.Event] = None + def __init__( self, name: str, @@ -675,36 +678,44 @@ class Actor: func = getattr(self, funcname) if funcname == 'cancel': - # don't start entire actor runtime cancellation if this - # actor is in debug mode + # don't start entire actor runtime + # cancellation if this actor is in debug + # mode pdb_complete = _debug._local_pdb_complete if pdb_complete: await pdb_complete.wait() - # we immediately start the runtime machinery shutdown + # we immediately start the runtime machinery + # shutdown with trio.CancelScope(shield=True): - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` + # self.cancel() was called so kill this + # msg loop and break out into + # ``_async_main()`` log.cancel( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await _invoke(self, cid, chan, func, kwargs, is_rpc=False) - # await self._cancel_complete.wait() + f"Actor {self.uid} was remotely cancelled;" + " waiting on cancellation completion..") + await _invoke( + self, cid, chan, func, kwargs, is_rpc=False + ) loop_cs.cancel() break if funcname == '_cancel_task': - # we immediately start the runtime machinery shutdown + # we immediately start the runtime machinery + # shutdown with trio.CancelScope(shield=True): - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` + # self.cancel() was called so kill this + # msg loop and break out into + # ``_async_main()`` kwargs['chan'] = chan log.cancel( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + f"Actor {self.uid} was remotely cancelled;" + " waiting on cancellation completion..") + await _invoke( + self, cid, chan, func, kwargs, is_rpc=False + ) continue else: # complain to client about restricted modules @@ -749,7 +760,8 @@ class Actor: log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") - # end of async for, channel disconnect vis ``trio.EndOfChannel`` + # end of async for, channel disconnect vis + # ``trio.EndOfChannel`` log.runtime( f"{chan} for {chan.uid} disconnected, cancelling tasks" ) @@ -1123,7 +1135,11 @@ class Actor: # stop channel server self.cancel_server() - await self._server_down.wait() + if self._server_down is not None: + await self._server_down.wait() + else: + log.warning( + f'{self.uid} was likely cancelled before it started') # cancel all rpc tasks permanently if self._service_n: @@ -1190,7 +1206,10 @@ class Actor: tasks = self._rpc_tasks if tasks: log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") - for (chan, cid), (scope, func, is_complete) in tasks.copy().items(): + for ( + (chan, cid), + (scope, func, is_complete), + ) in tasks.copy().items(): if only_chan is not None: if only_chan != chan: continue @@ -1250,14 +1269,16 @@ class Actor: class Arbiter(Actor): - """A special actor who knows all the other actors and always has + ''' + A special actor who knows all the other actors and always has access to a top level nursery. The arbiter is by default the first actor spawned on each host and is responsible for keeping track of all other actors for coordination purposes. If a new main process is launched and an arbiter is already running that arbiter will be used. - """ + + ''' is_arbiter = True def __init__(self, *args, **kwargs):