forked from goodboy/tractor
Restructure actor runtime nursery scoping
In an effort acquire more deterministic actor cancellation, this adds a clearer and more resilient (whilst possibly a bit slower) internal nursery structure with explicit semantics for clarifying the task-scope shutdown sequence. Namely, on cancellation, the explicit steps are now: - cancel all currently running rpc tasks and wait for them to complete - cancel the channel server and wait for it to complete - cancel the msg loop for the channel with the immediate parent - de-register with arbiter if possible - wait on remaining connections to release - exit process To accomplish this add a new nursery called the "service nursery" which spawns all rpc tasks **instead of using** the "root nursery". The root is now used solely for async launching the msg loop for the primary channel with the parent such that it is (nearly) the last thing torn down on cancellation. In the future it should also be possible to have `self.cancel()` return a result to the parent once the runtime is sure that the rest of the shutdown is atomic; this would allow for a true unbounded shield in `Portal.cancel_actor()`. This will likely require that the error handling blocks in `Actor._async_main()` are moved "inside" the root nursery block such that the msg loop with the parent truly is the last thing to terminate.dereg_on_channel_aclose
parent
90c7fa6963
commit
8477d21499
|
@ -167,9 +167,10 @@ class Actor:
|
||||||
"""
|
"""
|
||||||
is_arbiter: bool = False
|
is_arbiter: bool = False
|
||||||
|
|
||||||
# placeholders filled in by `_async_main` after fork
|
# nursery placeholders filled in by `_async_main()` after fork
|
||||||
_root_nursery: trio.Nursery
|
_root_n: trio.Nursery = None
|
||||||
_server_nursery: trio.Nursery
|
_service_n: trio.Nursery = None
|
||||||
|
_server_n: Optional[trio.Nursery] = None
|
||||||
|
|
||||||
# Information about `__main__` from parent
|
# Information about `__main__` from parent
|
||||||
_parent_main_data: Dict[str, str]
|
_parent_main_data: Dict[str, str]
|
||||||
|
@ -293,7 +294,7 @@ class Actor:
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Entry point for new inbound connections to the channel server.
|
"""Entry point for new inbound connections to the channel server.
|
||||||
"""
|
"""
|
||||||
self._no_more_peers = trio.Event()
|
self._no_more_peers = trio.Event() # unset
|
||||||
chan = Channel(stream=stream)
|
chan = Channel(stream=stream)
|
||||||
log.info(f"New connection to us {chan}")
|
log.info(f"New connection to us {chan}")
|
||||||
|
|
||||||
|
@ -427,13 +428,13 @@ class Actor:
|
||||||
msg = None
|
msg = None
|
||||||
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
|
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
|
||||||
try:
|
try:
|
||||||
with trio.CancelScope(shield=shield) as cs:
|
with trio.CancelScope(shield=shield) as loop_cs:
|
||||||
# this internal scope allows for keeping this message
|
# this internal scope allows for keeping this message
|
||||||
# loop running despite the current task having been
|
# loop running despite the current task having been
|
||||||
# cancelled (eg. `open_portal()` may call this method from
|
# cancelled (eg. `open_portal()` may call this method from
|
||||||
# a locally spawned task) and recieve this scope using
|
# a locally spawned task) and recieve this scope using
|
||||||
# ``scope = Nursery.start()``
|
# ``scope = Nursery.start()``
|
||||||
task_status.started(cs)
|
task_status.started(loop_cs)
|
||||||
async for msg in chan:
|
async for msg in chan:
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
log.debug(
|
log.debug(
|
||||||
|
@ -496,7 +497,7 @@ class Actor:
|
||||||
|
|
||||||
# spin up a task for the requested function
|
# spin up a task for the requested function
|
||||||
log.debug(f"Spawning task for {func}")
|
log.debug(f"Spawning task for {func}")
|
||||||
cs = await self._root_nursery.start(
|
cs = await self._service_n.start(
|
||||||
partial(_invoke, self, cid, chan, func, kwargs),
|
partial(_invoke, self, cid, chan, func, kwargs),
|
||||||
name=funcname,
|
name=funcname,
|
||||||
)
|
)
|
||||||
|
@ -514,6 +515,13 @@ class Actor:
|
||||||
# cancelled gracefully if requested
|
# cancelled gracefully if requested
|
||||||
self._rpc_tasks[(chan, cid)] = (
|
self._rpc_tasks[(chan, cid)] = (
|
||||||
cs, func, trio.Event())
|
cs, func, trio.Event())
|
||||||
|
else:
|
||||||
|
# self.cancel() was called so kill this msg loop
|
||||||
|
# and break out into ``_async_main()``
|
||||||
|
log.warning(f"{self.uid} was remotely cancelled")
|
||||||
|
loop_cs.cancel()
|
||||||
|
break
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
else:
|
else:
|
||||||
|
@ -540,6 +548,46 @@ class Actor:
|
||||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||||
f"with last msg:\n{msg}")
|
f"with last msg:\n{msg}")
|
||||||
|
|
||||||
|
async def _chan_to_parent(
|
||||||
|
self,
|
||||||
|
parent_addr: Optional[Tuple[str, int]],
|
||||||
|
) -> Channel:
|
||||||
|
try:
|
||||||
|
# Connect back to the parent actor and conduct initial
|
||||||
|
# handshake. From this point on if we error, we
|
||||||
|
# attempt to ship the exception back to the parent.
|
||||||
|
chan = Channel(
|
||||||
|
destaddr=parent_addr,
|
||||||
|
)
|
||||||
|
await chan.connect()
|
||||||
|
|
||||||
|
# Initial handshake: swap names.
|
||||||
|
await self._do_handshake(chan)
|
||||||
|
|
||||||
|
if self._spawn_method == "trio":
|
||||||
|
# Receive runtime state from our parent
|
||||||
|
parent_data = await chan.recv()
|
||||||
|
log.debug(
|
||||||
|
"Recieved state from parent:\n"
|
||||||
|
f"{parent_data}"
|
||||||
|
)
|
||||||
|
accept_addr = (
|
||||||
|
parent_data.pop('bind_host'),
|
||||||
|
parent_data.pop('bind_port'),
|
||||||
|
)
|
||||||
|
for attr, value in parent_data.items():
|
||||||
|
setattr(self, attr, value)
|
||||||
|
|
||||||
|
return chan, accept_addr
|
||||||
|
|
||||||
|
except OSError: # failed to connect
|
||||||
|
log.warning(
|
||||||
|
f"Failed to connect to parent @ {parent_addr},"
|
||||||
|
" closing server")
|
||||||
|
await self.cancel()
|
||||||
|
# self._parent_chan = None
|
||||||
|
raise
|
||||||
|
|
||||||
async def _async_main(
|
async def _async_main(
|
||||||
self,
|
self,
|
||||||
accept_addr: Optional[Tuple[str, int]] = None,
|
accept_addr: Optional[Tuple[str, int]] = None,
|
||||||
|
@ -561,88 +609,84 @@ class Actor:
|
||||||
"""
|
"""
|
||||||
registered_with_arbiter = False
|
registered_with_arbiter = False
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as nursery:
|
|
||||||
self._root_nursery = nursery
|
|
||||||
|
|
||||||
# TODO: just make `parent_addr` a bool system (see above)?
|
# establish primary connection with immediate parent
|
||||||
if parent_addr is not None:
|
self._parent_chan = None
|
||||||
try:
|
if parent_addr is not None:
|
||||||
# Connect back to the parent actor and conduct initial
|
self._parent_chan, accept_addr = await self._chan_to_parent(
|
||||||
# handshake. From this point on if we error, we
|
parent_addr)
|
||||||
# attempt to ship the exception back to the parent.
|
|
||||||
chan = self._parent_chan = Channel(
|
# load exposed/allowed RPC modules
|
||||||
destaddr=parent_addr,
|
# XXX: do this **after** establishing a channel to the parent
|
||||||
|
# but **before** starting the message loop for that channel
|
||||||
|
# such that import errors are properly propagated upwards
|
||||||
|
self.load_modules()
|
||||||
|
|
||||||
|
# The "root" nursery ensures the channel with the immediate
|
||||||
|
# parent is kept alive as a reslient service until
|
||||||
|
# cancellation steps have (mostly) ocurred in
|
||||||
|
# a deterministic way.
|
||||||
|
async with trio.open_nursery() as root_nursery:
|
||||||
|
self._root_n = root_nursery
|
||||||
|
|
||||||
|
async with trio.open_nursery() as service_nursery:
|
||||||
|
# This nursery is used to handle all inbound
|
||||||
|
# connections to us such that if the TCP server
|
||||||
|
# is killed, connections can continue to process
|
||||||
|
# in the background until this nursery is cancelled.
|
||||||
|
self._service_n = service_nursery
|
||||||
|
|
||||||
|
# Startup up the channel server with,
|
||||||
|
# - subactor: the bind address sent to us by our parent
|
||||||
|
# over our established channel
|
||||||
|
# - root actor: the ``accept_addr`` passed to this method
|
||||||
|
assert accept_addr
|
||||||
|
host, port = accept_addr
|
||||||
|
self._server_n = await service_nursery.start(
|
||||||
|
partial(
|
||||||
|
self._serve_forever,
|
||||||
|
service_nursery,
|
||||||
|
accept_host=host,
|
||||||
|
accept_port=port
|
||||||
)
|
)
|
||||||
await chan.connect()
|
|
||||||
|
|
||||||
# Initial handshake: swap names.
|
|
||||||
await self._do_handshake(chan)
|
|
||||||
|
|
||||||
if self._spawn_method == "trio":
|
|
||||||
# Receive runtime state from our parent
|
|
||||||
parent_data = await chan.recv()
|
|
||||||
log.debug(
|
|
||||||
"Recieved state from parent:\n"
|
|
||||||
f"{parent_data}"
|
|
||||||
)
|
|
||||||
accept_addr = (
|
|
||||||
parent_data.pop('bind_host'),
|
|
||||||
parent_data.pop('bind_port'),
|
|
||||||
)
|
|
||||||
for attr, value in parent_data.items():
|
|
||||||
setattr(self, attr, value)
|
|
||||||
|
|
||||||
except OSError: # failed to connect
|
|
||||||
log.warning(
|
|
||||||
f"Failed to connect to parent @ {parent_addr},"
|
|
||||||
" closing server")
|
|
||||||
await self.cancel()
|
|
||||||
self._parent_chan = None
|
|
||||||
raise
|
|
||||||
|
|
||||||
# load exposed/allowed RPC modules
|
|
||||||
# XXX: do this **after** establishing a channel to the parent
|
|
||||||
# but **before** starting the message loop for that channel
|
|
||||||
# such that import errors are properly propagated upwards
|
|
||||||
self.load_modules()
|
|
||||||
|
|
||||||
# Startup up channel server with,
|
|
||||||
# - subactor: the bind address sent to us by our parent
|
|
||||||
# over our established channel
|
|
||||||
# - root actor: the ``accept_addr`` passed to this method
|
|
||||||
assert accept_addr
|
|
||||||
host, port = accept_addr
|
|
||||||
await nursery.start(
|
|
||||||
partial(
|
|
||||||
self._serve_forever,
|
|
||||||
accept_host=host,
|
|
||||||
accept_port=port
|
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# Begin handling our new connection back to parent.
|
# Register with the arbiter if we're told its addr
|
||||||
# This is done here since we don't want to start
|
log.debug(f"Registering {self} for role `{self.name}`")
|
||||||
# processing parent requests until our server is
|
assert isinstance(self._arb_addr, tuple)
|
||||||
# 100% up and running.
|
|
||||||
if self._parent_chan:
|
async with get_arbiter(*self._arb_addr) as arb_portal:
|
||||||
nursery.start_soon(
|
await arb_portal.run(
|
||||||
self._process_messages, self._parent_chan)
|
'self',
|
||||||
|
'register_actor',
|
||||||
|
uid=self.uid,
|
||||||
|
sockaddr=self.accept_addr,
|
||||||
|
)
|
||||||
|
|
||||||
# Register with the arbiter if we're told its addr
|
|
||||||
log.debug(f"Registering {self} for role `{self.name}`")
|
|
||||||
assert isinstance(self._arb_addr, tuple)
|
|
||||||
async with get_arbiter(*self._arb_addr) as arb_portal:
|
|
||||||
await arb_portal.run(
|
|
||||||
'self', 'register_actor',
|
|
||||||
uid=self.uid, sockaddr=self.accept_addr)
|
|
||||||
registered_with_arbiter = True
|
registered_with_arbiter = True
|
||||||
|
|
||||||
task_status.started()
|
# init steps complete
|
||||||
log.debug("Waiting on root nursery to complete")
|
task_status.started()
|
||||||
|
|
||||||
# Blocks here as expected until the channel server is
|
# Begin handling our new connection back to our
|
||||||
|
# parent. This is done last since we don't want to
|
||||||
|
# start processing parent requests until our channel
|
||||||
|
# server is 100% up and running.
|
||||||
|
if self._parent_chan:
|
||||||
|
await root_nursery.start(
|
||||||
|
partial(
|
||||||
|
self._process_messages,
|
||||||
|
self._parent_chan,
|
||||||
|
shield=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
log.info("Waiting on service nursery to complete")
|
||||||
|
log.info("Service nursery complete")
|
||||||
|
log.info("Waiting on root nursery to complete")
|
||||||
|
|
||||||
|
# Blocks here as expected until the root nursery is
|
||||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||||
except Exception as err:
|
except (trio.MultiError, Exception) as err:
|
||||||
if not registered_with_arbiter:
|
if not registered_with_arbiter:
|
||||||
# TODO: I guess we could try to connect back
|
# TODO: I guess we could try to connect back
|
||||||
# to the parent through a channel and engage a debugger
|
# to the parent through a channel and engage a debugger
|
||||||
|
@ -658,47 +702,57 @@ class Actor:
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._parent_chan:
|
if self._parent_chan:
|
||||||
try:
|
with trio.CancelScope(shield=True):
|
||||||
# internal error so ship to parent without cid
|
try:
|
||||||
await self._parent_chan.send(pack_error(err))
|
# internal error so ship to parent without cid
|
||||||
except trio.ClosedResourceError:
|
await self._parent_chan.send(pack_error(err))
|
||||||
log.error(
|
except trio.ClosedResourceError:
|
||||||
f"Failed to ship error to parent "
|
log.error(
|
||||||
f"{self._parent_chan.uid}, channel was closed")
|
f"Failed to ship error to parent "
|
||||||
log.exception("Actor errored:")
|
f"{self._parent_chan.uid}, channel was closed")
|
||||||
|
log.exception("Actor errored:")
|
||||||
|
|
||||||
if isinstance(err, ModuleNotFoundError):
|
# always!
|
||||||
raise
|
raise
|
||||||
else:
|
|
||||||
# XXX wait, why?
|
|
||||||
# causes a hang if I always raise..
|
|
||||||
# A parent process does something weird here?
|
|
||||||
# i'm so lost now..
|
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if registered_with_arbiter:
|
log.info("Root nursery complete")
|
||||||
with trio.move_on_after(3) as cs:
|
|
||||||
cs.shield = True
|
|
||||||
await self._do_unreg(self._arb_addr)
|
|
||||||
|
|
||||||
# terminate actor once all it's peers (actors that connected
|
# UNregister actor from the arbiter
|
||||||
# to it as clients) have disappeared
|
if registered_with_arbiter and (
|
||||||
|
self._arb_addr is not None
|
||||||
|
):
|
||||||
|
failed = False
|
||||||
|
with trio.move_on_after(5) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
try:
|
||||||
|
async with get_arbiter(*self._arb_addr) as arb_portal:
|
||||||
|
await arb_portal.run(
|
||||||
|
'self', 'unregister_actor', uid=self.uid)
|
||||||
|
except OSError:
|
||||||
|
failed = True
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
failed = True
|
||||||
|
if failed:
|
||||||
|
log.warning(
|
||||||
|
f"Failed to unregister {self.name} from arbiter")
|
||||||
|
|
||||||
|
# Ensure all peers (actors connected to us as clients) are finished
|
||||||
if not self._no_more_peers.is_set():
|
if not self._no_more_peers.is_set():
|
||||||
if any(
|
if any(
|
||||||
chan.connected() for chan in chain(*self._peers.values())
|
chan.connected() for chan in chain(*self._peers.values())
|
||||||
):
|
):
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Waiting for remaining peers {self._peers} to clear")
|
f"Waiting for remaining peers {self._peers} to clear")
|
||||||
await self._no_more_peers.wait()
|
with trio.CancelScope(shield=True):
|
||||||
|
await self._no_more_peers.wait()
|
||||||
log.debug("All peer channels are complete")
|
log.debug("All peer channels are complete")
|
||||||
|
|
||||||
# tear down channel server no matter what since we errored
|
log.debug("Runtime completed")
|
||||||
# or completed
|
|
||||||
self.cancel_server()
|
|
||||||
|
|
||||||
async def _serve_forever(
|
async def _serve_forever(
|
||||||
self,
|
self,
|
||||||
|
handler_nursery: trio.Nursery,
|
||||||
*,
|
*,
|
||||||
# (host, port) to bind for channel server
|
# (host, port) to bind for channel server
|
||||||
accept_host: Tuple[str, int] = None,
|
accept_host: Tuple[str, int] = None,
|
||||||
|
@ -710,48 +764,55 @@ class Actor:
|
||||||
This will cause an actor to continue living (blocking) until
|
This will cause an actor to continue living (blocking) until
|
||||||
``cancel_server()`` is called.
|
``cancel_server()`` is called.
|
||||||
"""
|
"""
|
||||||
async with trio.open_nursery() as nursery:
|
self._server_down = trio.Event()
|
||||||
self._server_nursery = nursery
|
|
||||||
# TODO: might want to consider having a separate nursery
|
|
||||||
# for the stream handler such that the server can be cancelled
|
|
||||||
# whilst leaving existing channels up
|
|
||||||
listeners: List[trio.abc.Listener] = await nursery.start(
|
|
||||||
partial(
|
|
||||||
trio.serve_tcp,
|
|
||||||
self._stream_handler,
|
|
||||||
# new connections will stay alive even if this server
|
|
||||||
# is cancelled
|
|
||||||
handler_nursery=self._root_nursery,
|
|
||||||
port=accept_port, host=accept_host,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
log.debug("Started tcp server(s) on" # type: ignore
|
|
||||||
f" {[l.socket for l in listeners]}")
|
|
||||||
self._listeners.extend(listeners)
|
|
||||||
task_status.started()
|
|
||||||
|
|
||||||
async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None:
|
|
||||||
# UNregister actor from the arbiter
|
|
||||||
try:
|
try:
|
||||||
if arbiter_addr is not None:
|
async with trio.open_nursery() as server_n:
|
||||||
async with get_arbiter(*arbiter_addr) as arb_portal:
|
listeners: List[trio.abc.Listener] = await server_n.start(
|
||||||
await arb_portal.run(
|
partial(
|
||||||
'self', 'unregister_actor', uid=self.uid)
|
trio.serve_tcp,
|
||||||
except OSError:
|
self._stream_handler,
|
||||||
log.warning(f"Unable to unregister {self.name} from arbiter")
|
# new connections will stay alive even if this server
|
||||||
|
# is cancelled
|
||||||
|
handler_nursery=handler_nursery,
|
||||||
|
port=accept_port,
|
||||||
|
host=accept_host,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
log.debug("Started tcp server(s) on" # type: ignore
|
||||||
|
f" {[l.socket for l in listeners]}")
|
||||||
|
self._listeners.extend(listeners)
|
||||||
|
task_status.started(server_n)
|
||||||
|
finally:
|
||||||
|
# signal the server is down since nursery above terminated
|
||||||
|
self._server_down.set()
|
||||||
|
|
||||||
async def cancel(self) -> None:
|
async def cancel(self) -> None:
|
||||||
"""Cancel this actor.
|
"""Cancel this actor.
|
||||||
|
|
||||||
The sequence in order is:
|
The "deterministic" teardown sequence in order is:
|
||||||
- cancelling all rpc tasks
|
- cancel all ongoing rpc tasks by cancel scope
|
||||||
- cancelling the channel server
|
- cancel the channel server to prevent new inbound
|
||||||
- cancel the "root" nursery
|
connections
|
||||||
|
- cancel the "service" nursery reponsible for
|
||||||
|
spawning new rpc tasks
|
||||||
|
- return control the parent channel message loop
|
||||||
"""
|
"""
|
||||||
# cancel all ongoing rpc tasks
|
# cancel all ongoing rpc tasks
|
||||||
await self.cancel_rpc_tasks()
|
with trio.CancelScope(shield=True):
|
||||||
self.cancel_server()
|
await self.cancel_rpc_tasks()
|
||||||
self._root_nursery.cancel_scope.cancel()
|
self.cancel_server()
|
||||||
|
await self._server_down.wait()
|
||||||
|
self._service_n.cancel_scope.cancel()
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
# XXX: hard kill logic if needed?
|
||||||
|
# def _hard_mofo_kill(self):
|
||||||
|
# # If we're the root actor or zombied kill everything
|
||||||
|
# if self._parent_chan is None: # TODO: more robust check
|
||||||
|
# root = trio.lowlevel.current_root_task()
|
||||||
|
# for n in root.child_nurseries:
|
||||||
|
# n.cancel_scope.cancel()
|
||||||
|
|
||||||
async def _cancel_task(self, cid, chan):
|
async def _cancel_task(self, cid, chan):
|
||||||
"""Cancel a local task by call-id / channel.
|
"""Cancel a local task by call-id / channel.
|
||||||
|
@ -804,11 +865,12 @@ class Actor:
|
||||||
"""Cancel the internal channel server nursery thereby
|
"""Cancel the internal channel server nursery thereby
|
||||||
preventing any new inbound connections from being established.
|
preventing any new inbound connections from being established.
|
||||||
"""
|
"""
|
||||||
log.debug("Shutting down channel server")
|
if self._server_n:
|
||||||
self._server_nursery.cancel_scope.cancel()
|
log.debug("Shutting down channel server")
|
||||||
|
self._server_n.cancel_scope.cancel()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def accept_addr(self) -> Tuple[str, int]:
|
def accept_addr(self) -> Optional[Tuple[str, int]]:
|
||||||
"""Primary address to which the channel server is bound.
|
"""Primary address to which the channel server is bound.
|
||||||
"""
|
"""
|
||||||
# throws OSError on failure
|
# throws OSError on failure
|
||||||
|
|
Loading…
Reference in New Issue