forked from goodboy/tractor
1
0
Fork 0

Appease the great mypy

dereg_on_channel_aclose
Tyler Goodlet 2020-08-08 20:57:18 -04:00
parent 42be410076
commit b3eba00c3a
2 changed files with 20 additions and 11 deletions

View File

@ -168,8 +168,8 @@ class Actor:
is_arbiter: bool = False is_arbiter: bool = False
# nursery placeholders filled in by `_async_main()` after fork # nursery placeholders filled in by `_async_main()` after fork
_root_n: trio.Nursery = None _root_n: Optional[trio.Nursery] = None
_service_n: trio.Nursery = None _service_n: Optional[trio.Nursery] = None
_server_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None
# Information about `__main__` from parent # Information about `__main__` from parent
@ -497,6 +497,7 @@ class Actor:
# spin up a task for the requested function # spin up a task for the requested function
log.debug(f"Spawning task for {func}") log.debug(f"Spawning task for {func}")
assert self._service_n
cs = await self._service_n.start( cs = await self._service_n.start(
partial(_invoke, self, cid, chan, func, kwargs), partial(_invoke, self, cid, chan, func, kwargs),
name=funcname, name=funcname,
@ -551,7 +552,7 @@ class Actor:
async def _chan_to_parent( async def _chan_to_parent(
self, self,
parent_addr: Optional[Tuple[str, int]], parent_addr: Optional[Tuple[str, int]],
) -> Channel: ) -> Tuple[Channel, Optional[Tuple[str, int]]]:
try: try:
# Connect back to the parent actor and conduct initial # Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we # handshake. From this point on if we error, we
@ -564,6 +565,8 @@ class Actor:
# Initial handshake: swap names. # Initial handshake: swap names.
await self._do_handshake(chan) await self._do_handshake(chan)
accept_addr: Optional[Tuple[str, int]] = None
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive runtime state from our parent # Receive runtime state from our parent
parent_data = await chan.recv() parent_data = await chan.recv()
@ -578,9 +581,6 @@ class Actor:
for attr, value in parent_data.items(): for attr, value in parent_data.items():
setattr(self, attr, value) setattr(self, attr, value)
else: # mp
accept_addr = None
return chan, accept_addr return chan, accept_addr
except OSError: # failed to connect except OSError: # failed to connect
@ -636,6 +636,7 @@ class Actor:
# a deterministic way. # a deterministic way.
async with trio.open_nursery() as root_nursery: async with trio.open_nursery() as root_nursery:
self._root_n = root_nursery self._root_n = root_nursery
assert self._root_n
async with trio.open_nursery() as service_nursery: async with trio.open_nursery() as service_nursery:
# This nursery is used to handle all inbound # This nursery is used to handle all inbound
@ -643,6 +644,7 @@ class Actor:
# is killed, connections can continue to process # is killed, connections can continue to process
# in the background until this nursery is cancelled. # in the background until this nursery is cancelled.
self._service_n = service_nursery self._service_n = service_nursery
assert self._service_n
# Startup up the channel server with, # Startup up the channel server with,
# - subactor: the bind address sent to us by our parent # - subactor: the bind address sent to us by our parent
@ -650,6 +652,7 @@ class Actor:
# - root actor: the ``accept_addr`` passed to this method # - root actor: the ``accept_addr`` passed to this method
assert accept_addr assert accept_addr
host, port = accept_addr host, port = accept_addr
self._server_n = await service_nursery.start( self._server_n = await service_nursery.start(
partial( partial(
self._serve_forever, self._serve_forever,
@ -765,7 +768,7 @@ class Actor:
# (host, port) to bind for channel server # (host, port) to bind for channel server
accept_host: Tuple[str, int] = None, accept_host: Tuple[str, int] = None,
accept_port: int = 0, accept_port: int = 0,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Start the channel server, begin listening for new connections. """Start the channel server, begin listening for new connections.
@ -794,7 +797,7 @@ class Actor:
# signal the server is down since nursery above terminated # signal the server is down since nursery above terminated
self._server_down.set() self._server_down.set()
async def cancel(self) -> None: async def cancel(self) -> bool:
"""Cancel this actor. """Cancel this actor.
The "deterministic" teardown sequence in order is: The "deterministic" teardown sequence in order is:
@ -807,10 +810,16 @@ class Actor:
""" """
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# kill all ongoing tasks
await self.cancel_rpc_tasks() await self.cancel_rpc_tasks()
# stop channel server
self.cancel_server() self.cancel_server()
await self._server_down.wait() await self._server_down.wait()
self._service_n.cancel_scope.cancel()
# rekt all channel loops
if self._service_n:
self._service_n.cancel_scope.cancel()
return True return True

View File

@ -338,9 +338,9 @@ async def open_portal(
if channel.uid is None: if channel.uid is None:
await actor._do_handshake(channel) await actor._do_handshake(channel)
msg_loop_cs = None msg_loop_cs: Optional[trio.CancelScope] = None
if start_msg_loop: if start_msg_loop:
msg_loop_cs: trio.CancelScope = await nursery.start( msg_loop_cs = await nursery.start(
partial( partial(
actor._process_messages, actor._process_messages,
channel, channel,