diff --git a/tractor/_actor.py b/tractor/_actor.py index fe1f67f..d46471d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -168,7 +168,7 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: List[str] = {}, + rpc_module_paths: List[str] = [], statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 5e99723..a26cb8e 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -107,8 +107,8 @@ async def exhaust_portal( async def cancel_on_completion( portal: Portal, actor: Actor, - errors: List[Exception], - task_status=trio.TASK_STATUS_IGNORED, + errors: Dict[Tuple[str, str], Exception], + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Cancel actor gracefully once it's "main" portal's result arrives. @@ -127,29 +127,26 @@ async def cancel_on_completion( f"Cancelling {portal.channel.uid} after error {result}" ) else: - log.info(f"Cancelling {portal.channel.uid} gracefully") + log.info( + f"Cancelling {portal.channel.uid} gracefully " + "after result {result}") # cancel the process now that we have a final result await portal.cancel_actor() - # XXX: lol, this will never get run without a shield above.. - # if cs.cancelled_caught: - # log.warning( - # "Result waiter was cancelled, process may have died") - async def new_proc( name: str, - actor_nursery: 'ActorNursery', + actor_nursery: 'ActorNursery', # type: ignore subactor: Actor, - errors: Dict[str, Exception], + errors: Dict[Tuple[str, str], Exception], # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], begin_wait_phase: trio.Event, use_trip: bool = True, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED -) -> mp.Process: +) -> None: """Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. """ @@ -217,7 +214,7 @@ async def new_proc( else: fs_info = (None, None, None, None, None) - proc = _ctx.Process( + proc = _ctx.Process( # type: ignore target=subactor._mp_main, args=( bind_addr, diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 4f97b5e..a20f8bb 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -26,7 +26,7 @@ class ActorNursery: actor: Actor, ria_nursery: trio.Nursery, da_nursery: trio.Nursery, - errors: Dict[str, Exception], + errors: Dict[Tuple[str, str], Exception], ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor @@ -57,7 +57,7 @@ class ActorNursery: subactor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths, + rpc_module_paths=rpc_module_paths or [], statespace=statespace, # global proc state vars loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, @@ -68,7 +68,9 @@ class ActorNursery: # start a task to spawn a process # blocks until process has been started and a portal setup nursery = nursery or self._da_nursery - return await nursery.start( + + # XXX: the type ignore is actually due to a `mypy` bug + return await nursery.start( # type: ignore _spawn.new_proc, name, self, @@ -186,8 +188,8 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # XXX we use these nurseries because TRIP is doing all its stuff with # an `@asynccontextmanager` which has an internal nursery *and* the - # task that opens a nursery must also close it. - errors: Dict[str, Exception] = {} + # task that opens a nursery **must also close it**. + errors: Dict[Tuple[str, str], Exception] = {} async with trio.open_nursery() as da_nursery: try: async with trio.open_nursery() as ria_nursery: @@ -241,7 +243,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: with trio.CancelScope(shield=True): await anursery.cancel() if len(errors) > 1: - raise trio.MultiError(errors.values()) + raise trio.MultiError(tuple(errors.values())) else: raise list(errors.values())[0] log.debug(f"Nursery teardown complete")