Make `async_main()` a module func

we_bein_all_matchy
Tyler Goodlet 2022-08-03 15:29:34 -04:00
parent a3a5bc267e
commit 208d56af2c
3 changed files with 210 additions and 205 deletions

View File

@ -20,13 +20,13 @@ Sub-process entry points.
"""
from functools import partial
from typing import Tuple, Any
import signal
import trio # type: ignore
from .log import get_console_log, get_logger
from . import _state
from .to_asyncio import run_as_asyncio_guest
from ._runtime import async_main, Actor
log = get_logger(__name__)
@ -63,7 +63,8 @@ def _mp_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
actor._async_main,
async_main,
actor,
accept_addr,
parent_addr=parent_addr
)
@ -82,7 +83,7 @@ def _mp_main(
def _trio_main(
actor: 'Actor', # type: ignore
actor: Actor, # type: ignore
*,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
@ -106,7 +107,8 @@ def _trio_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
actor._async_main,
async_main,
actor,
parent_addr=parent_addr
)

View File

@ -29,7 +29,7 @@ import warnings
import trio
from ._runtime import Actor, Arbiter
from ._runtime import Actor, Arbiter, async_main
from . import _debug
from . import _spawn
from . import _state
@ -188,13 +188,14 @@ async def open_root_actor(
# start the actor runtime in a new task
async with trio.open_nursery() as nursery:
# ``Actor._async_main()`` creates an internal nursery and
# ``_runtime.async_main()`` creates an internal nursery and
# thus blocks here until the entire underlying actor tree has
# terminated thereby conducting structured concurrency.
await nursery.start(
partial(
actor._async_main,
async_main,
actor,
accept_addr=(host, port),
parent_addr=None
)

View File

@ -391,7 +391,7 @@ class Actor:
is_arbiter: bool = False
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `_async_main()` after fork
# nursery placeholders filled in by `async_main()` after fork
_root_n: Optional[trio.Nursery] = None
_service_n: Optional[trio.Nursery] = None
_server_n: Optional[trio.Nursery] = None
@ -940,199 +940,6 @@ class Actor:
await self.cancel()
raise
async def _async_main(
self,
accept_addr: Optional[tuple[str, int]] = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
# the child instead of relaying it over the connect-back
# channel). Once that backend is removed we can likely just
# change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
parent_addr: Optional[tuple[str, int]] = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
"""
Start the channel server, maybe connect back to the parent, and
start the main task.
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
registered_with_arbiter = False
try:
# establish primary connection with immediate parent
self._parent_chan = None
if parent_addr is not None:
self._parent_chan, accept_addr_rent = await self._from_parent(
parent_addr)
# either it's passed in because we're not a child
# or because we're running in mp mode
if accept_addr_rent is not None:
accept_addr = accept_addr_rent
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
self.load_modules()
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
# a deterministic way.
async with trio.open_nursery() as root_nursery:
self._root_n = root_nursery
assert self._root_n
async with trio.open_nursery() as service_nursery:
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
self._service_n = service_nursery
assert self._service_n
# Startup up the channel server with,
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
self._server_n = await service_nursery.start(
partial(
self._serve_forever,
service_nursery,
accept_host=host,
accept_port=port
)
)
accept_addr = self.accept_addr
if _state._runtime_vars['_is_root']:
_state._runtime_vars['_root_mailbox'] = accept_addr
# Register with the arbiter if we're told its addr
log.runtime(f"Registering {self} for role `{self.name}`")
assert isinstance(self._arb_addr, tuple)
async with get_arbiter(*self._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'register_actor',
uid=self.uid,
sockaddr=accept_addr,
)
registered_with_arbiter = True
# init steps complete
task_status.started()
# Begin handling our new connection back to our
# parent. This is done last since we don't want to
# start processing parent requests until our channel
# server is 100% up and running.
if self._parent_chan:
await root_nursery.start(
partial(
process_messages,
self,
self._parent_chan,
shield=True,
)
)
log.runtime("Waiting on service nursery to complete")
log.runtime("Service nursery complete")
log.runtime("Waiting on root nursery to complete")
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
log.info("Closing all actor lifetime contexts")
_lifetime_stack.close()
if not registered_with_arbiter:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {self._arb_addr}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
"\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n"
"\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n"
)
if self._parent_chan:
await try_ship_error_to_parent(self._parent_chan, err)
# always!
log.exception("Actor errored:")
raise
finally:
log.info("Runtime nursery complete")
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
log.info("Closing all actor lifetime contexts")
# TODO: we can't actually do this bc the debugger
# uses the _service_n to spawn the lock task, BUT,
# in theory if we had the root nursery surround this finally
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if self.name == 'brokerd.ib':
# with trio.CancelScope(shield=True):
# await _debug.breakpoint()
_lifetime_stack.close()
# Unregister actor from the arbiter
if registered_with_arbiter and (
self._arb_addr is not None
):
failed = False
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_arbiter(*self._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'unregister_actor',
uid=self.uid
)
except OSError:
failed = True
if cs.cancelled_caught:
failed = True
if failed:
log.warning(
f"Failed to unregister {self.name} from arbiter")
# Ensure all peers (actors connected to us as clients) are finished
if not self._no_more_peers.is_set():
if any(
chan.connected() for chan in chain(*self._peers.values())
):
log.runtime(
f"Waiting for remaining peers {self._peers} to clear")
with trio.CancelScope(shield=True):
await self._no_more_peers.wait()
log.runtime("All peer channels are complete")
log.runtime("Runtime completed")
async def _serve_forever(
self,
handler_nursery: trio.Nursery,
@ -1346,6 +1153,200 @@ class Actor:
return self._infected_aio
async def async_main(
actor: Actor,
accept_addr: Optional[tuple[str, int]] = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
# the child instead of relaying it over the connect-back
# channel). Once that backend is removed we can likely just
# change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
parent_addr: Optional[tuple[str, int]] = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Actor runtime entrypoint; start the IPC channel server, maybe connect
back to the parent, and startup all core machinery tasks.
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
'''
registered_with_arbiter = False
try:
# establish primary connection with immediate parent
actor._parent_chan = None
if parent_addr is not None:
actor._parent_chan, accept_addr_rent = await actor._from_parent(
parent_addr)
# either it's passed in because we're not a child
# or because we're running in mp mode
if accept_addr_rent is not None:
accept_addr = accept_addr_rent
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
actor.load_modules()
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
# a deterministic way.
async with trio.open_nursery() as root_nursery:
actor._root_n = root_nursery
assert actor._root_n
async with trio.open_nursery() as service_nursery:
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_n = service_nursery
assert actor._service_n
# Startup up the channel server with,
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
service_nursery,
accept_host=host,
accept_port=port
)
)
accept_addr = actor.accept_addr
if _state._runtime_vars['_is_root']:
_state._runtime_vars['_root_mailbox'] = accept_addr
# Register with the arbiter if we're told its addr
log.runtime(f"Registering {actor} for role `{actor.name}`")
assert isinstance(actor._arb_addr, tuple)
async with get_arbiter(*actor._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
sockaddr=accept_addr,
)
registered_with_arbiter = True
# init steps complete
task_status.started()
# Begin handling our new connection back to our
# parent. This is done last since we don't want to
# start processing parent requests until our channel
# server is 100% up and running.
if actor._parent_chan:
await root_nursery.start(
partial(
process_messages,
actor,
actor._parent_chan,
shield=True,
)
)
log.runtime("Waiting on service nursery to complete")
log.runtime("Service nursery complete")
log.runtime("Waiting on root nursery to complete")
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
log.info("Closing all actor lifetime contexts")
_lifetime_stack.close()
if not registered_with_arbiter:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {actor._arb_addr}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
"\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n"
"\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n"
)
if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err)
# always!
log.exception("Actor errored:")
raise
finally:
log.info("Runtime nursery complete")
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
log.info("Closing all actor lifetime contexts")
# TODO: we can't actually do this bc the debugger
# uses the _service_n to spawn the lock task, BUT,
# in theory if we had the root nursery surround this finally
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if actor.name == 'brokerd.ib':
# with trio.CancelScope(shield=True):
# await _debug.breakpoint()
_lifetime_stack.close()
# Unregister actor from the arbiter
if registered_with_arbiter and (
actor._arb_addr is not None
):
failed = False
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_arbiter(*actor._arb_addr) as arb_portal:
await arb_portal.run_from_ns(
'self',
'unregister_actor',
uid=actor.uid
)
except OSError:
failed = True
if cs.cancelled_caught:
failed = True
if failed:
log.warning(
f"Failed to unregister {actor.name} from arbiter")
# Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set():
if any(
chan.connected() for chan in chain(*actor._peers.values())
):
log.runtime(
f"Waiting for remaining peers {actor._peers} to clear")
with trio.CancelScope(shield=True):
await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
log.runtime("Runtime completed")
async def process_messages(
actor: Actor,
chan: Channel,
@ -1354,9 +1355,10 @@ async def process_messages(
) -> bool:
'''
Process messages for the channel async-RPC style.
Process messages for the IPC transport channel async-RPC style.
Receive multiplexed RPC requests and deliver responses over ``chan``.
Receive multiplexed RPC requests, spawn handler tasks and deliver
responses over or boxed errors back to the "caller" task.
'''
# TODO: once https://github.com/python-trio/trio/issues/467 gets
@ -1438,7 +1440,7 @@ async def process_messages(
with trio.CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
# ``async_main()``
log.cancel(
f"Actor {actor.uid} was remotely cancelled "
f"by {chan.uid}"
@ -1457,7 +1459,7 @@ async def process_messages(
with trio.CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
# ``async_main()``
kwargs['chan'] = chan
log.cancel(
f'Remote request to cancel task\n'