forked from goodboy/tractor
1
0
Fork 0

Type checker fixes

try_trip^2
Tyler Goodlet 2020-01-20 21:06:49 -05:00
parent e1a55a6f4f
commit 4b0554b61f
3 changed files with 18 additions and 19 deletions

View File

@ -168,7 +168,7 @@ class Actor:
def __init__(
self,
name: str,
rpc_module_paths: List[str] = {},
rpc_module_paths: List[str] = [],
statespace: Optional[Dict[str, Any]] = None,
uid: str = None,
loglevel: str = None,

View File

@ -107,8 +107,8 @@ async def exhaust_portal(
async def cancel_on_completion(
portal: Portal,
actor: Actor,
errors: List[Exception],
task_status=trio.TASK_STATUS_IGNORED,
errors: Dict[Tuple[str, str], Exception],
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's
result arrives.
@ -127,29 +127,26 @@ async def cancel_on_completion(
f"Cancelling {portal.channel.uid} after error {result}"
)
else:
log.info(f"Cancelling {portal.channel.uid} gracefully")
log.info(
f"Cancelling {portal.channel.uid} gracefully "
"after result {result}")
# cancel the process now that we have a final result
await portal.cancel_actor()
# XXX: lol, this will never get run without a shield above..
# if cs.cancelled_caught:
# log.warning(
# "Result waiter was cancelled, process may have died")
async def new_proc(
name: str,
actor_nursery: 'ActorNursery',
actor_nursery: 'ActorNursery', # type: ignore
subactor: Actor,
errors: Dict[str, Exception],
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
begin_wait_phase: trio.Event,
use_trip: bool = True,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> mp.Process:
) -> None:
"""Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
@ -217,7 +214,7 @@ async def new_proc(
else:
fs_info = (None, None, None, None, None)
proc = _ctx.Process(
proc = _ctx.Process( # type: ignore
target=subactor._mp_main,
args=(
bind_addr,

View File

@ -26,7 +26,7 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: Dict[str, Exception],
errors: Dict[Tuple[str, str], Exception],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
@ -57,7 +57,7 @@ class ActorNursery:
subactor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths,
rpc_module_paths=rpc_module_paths or [],
statespace=statespace, # global proc state vars
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
@ -68,7 +68,9 @@ class ActorNursery:
# start a task to spawn a process
# blocks until process has been started and a portal setup
nursery = nursery or self._da_nursery
return await nursery.start(
# XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore
_spawn.new_proc,
name,
self,
@ -186,8 +188,8 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# XXX we use these nurseries because TRIP is doing all its stuff with
# an `@asynccontextmanager` which has an internal nursery *and* the
# task that opens a nursery must also close it.
errors: Dict[str, Exception] = {}
# task that opens a nursery **must also close it**.
errors: Dict[Tuple[str, str], Exception] = {}
async with trio.open_nursery() as da_nursery:
try:
async with trio.open_nursery() as ria_nursery:
@ -241,7 +243,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
with trio.CancelScope(shield=True):
await anursery.cancel()
if len(errors) > 1:
raise trio.MultiError(errors.values())
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
log.debug(f"Nursery teardown complete")