diff --git a/tractor/_entry.py b/tractor/_entry.py index c860f2b..755225d 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -20,13 +20,13 @@ Sub-process entry points. """ from functools import partial from typing import Tuple, Any -import signal import trio # type: ignore from .log import get_console_log, get_logger from . import _state from .to_asyncio import run_as_asyncio_guest +from ._runtime import async_main, Actor log = get_logger(__name__) @@ -63,7 +63,8 @@ def _mp_main( log.debug(f"parent_addr is {parent_addr}") trio_main = partial( - actor._async_main, + async_main, + actor, accept_addr, parent_addr=parent_addr ) @@ -82,7 +83,7 @@ def _mp_main( def _trio_main( - actor: 'Actor', # type: ignore + actor: Actor, # type: ignore *, parent_addr: Tuple[str, int] = None, infect_asyncio: bool = False, @@ -106,7 +107,8 @@ def _trio_main( log.debug(f"parent_addr is {parent_addr}") trio_main = partial( - actor._async_main, + async_main, + actor, parent_addr=parent_addr ) diff --git a/tractor/_root.py b/tractor/_root.py index a8cfde0..aa5b745 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -29,7 +29,7 @@ import warnings import trio -from ._runtime import Actor, Arbiter +from ._runtime import Actor, Arbiter, async_main from . import _debug from . import _spawn from . import _state @@ -188,13 +188,14 @@ async def open_root_actor( # start the actor runtime in a new task async with trio.open_nursery() as nursery: - # ``Actor._async_main()`` creates an internal nursery and + # ``_runtime.async_main()`` creates an internal nursery and # thus blocks here until the entire underlying actor tree has # terminated thereby conducting structured concurrency. await nursery.start( partial( - actor._async_main, + async_main, + actor, accept_addr=(host, port), parent_addr=None ) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 815020c..5fddfb1 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -391,7 +391,7 @@ class Actor: is_arbiter: bool = False msg_buffer_size: int = 2**6 - # nursery placeholders filled in by `_async_main()` after fork + # nursery placeholders filled in by `async_main()` after fork _root_n: Optional[trio.Nursery] = None _service_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None @@ -940,199 +940,6 @@ class Actor: await self.cancel() raise - async def _async_main( - self, - accept_addr: Optional[tuple[str, int]] = None, - - # XXX: currently ``parent_addr`` is only needed for the - # ``multiprocessing`` backend (which pickles state sent to - # the child instead of relaying it over the connect-back - # channel). Once that backend is removed we can likely just - # change this to a simple ``is_subactor: bool`` which will - # be False when running as root actor and True when as - # a subactor. - parent_addr: Optional[tuple[str, int]] = None, - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - - ) -> None: - """ - Start the channel server, maybe connect back to the parent, and - start the main task. - - A "root-most" (or "top-level") nursery for this actor is opened here - and when cancelled effectively cancels the actor. - - """ - registered_with_arbiter = False - try: - - # establish primary connection with immediate parent - self._parent_chan = None - if parent_addr is not None: - - self._parent_chan, accept_addr_rent = await self._from_parent( - parent_addr) - - # either it's passed in because we're not a child - # or because we're running in mp mode - if accept_addr_rent is not None: - accept_addr = accept_addr_rent - - # load exposed/allowed RPC modules - # XXX: do this **after** establishing a channel to the parent - # but **before** starting the message loop for that channel - # such that import errors are properly propagated upwards - self.load_modules() - - # The "root" nursery ensures the channel with the immediate - # parent is kept alive as a resilient service until - # cancellation steps have (mostly) occurred in - # a deterministic way. - async with trio.open_nursery() as root_nursery: - self._root_n = root_nursery - assert self._root_n - - async with trio.open_nursery() as service_nursery: - # This nursery is used to handle all inbound - # connections to us such that if the TCP server - # is killed, connections can continue to process - # in the background until this nursery is cancelled. - self._service_n = service_nursery - assert self._service_n - - # Startup up the channel server with, - # - subactor: the bind address is sent by our parent - # over our established channel - # - root actor: the ``accept_addr`` passed to this method - assert accept_addr - host, port = accept_addr - - self._server_n = await service_nursery.start( - partial( - self._serve_forever, - service_nursery, - accept_host=host, - accept_port=port - ) - ) - accept_addr = self.accept_addr - if _state._runtime_vars['_is_root']: - _state._runtime_vars['_root_mailbox'] = accept_addr - - # Register with the arbiter if we're told its addr - log.runtime(f"Registering {self} for role `{self.name}`") - assert isinstance(self._arb_addr, tuple) - - async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'register_actor', - uid=self.uid, - sockaddr=accept_addr, - ) - - registered_with_arbiter = True - - # init steps complete - task_status.started() - - # Begin handling our new connection back to our - # parent. This is done last since we don't want to - # start processing parent requests until our channel - # server is 100% up and running. - if self._parent_chan: - await root_nursery.start( - partial( - process_messages, - self, - self._parent_chan, - shield=True, - ) - ) - log.runtime("Waiting on service nursery to complete") - log.runtime("Service nursery complete") - log.runtime("Waiting on root nursery to complete") - - # Blocks here as expected until the root nursery is - # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception as err: - log.info("Closing all actor lifetime contexts") - _lifetime_stack.close() - - if not registered_with_arbiter: - # TODO: I guess we could try to connect back - # to the parent through a channel and engage a debugger - # once we have that all working with std streams locking? - log.exception( - f"Actor errored and failed to register with arbiter " - f"@ {self._arb_addr}?") - log.error( - "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" - "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" - "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n" - "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n" - ) - - if self._parent_chan: - await try_ship_error_to_parent(self._parent_chan, err) - - # always! - log.exception("Actor errored:") - raise - - finally: - log.info("Runtime nursery complete") - - # tear down all lifetime contexts if not in guest mode - # XXX: should this just be in the entrypoint? - log.info("Closing all actor lifetime contexts") - - # TODO: we can't actually do this bc the debugger - # uses the _service_n to spawn the lock task, BUT, - # in theory if we had the root nursery surround this finally - # block it might be actually possible to debug THIS - # machinery in the same way as user task code? - # if self.name == 'brokerd.ib': - # with trio.CancelScope(shield=True): - # await _debug.breakpoint() - - _lifetime_stack.close() - - # Unregister actor from the arbiter - if registered_with_arbiter and ( - self._arb_addr is not None - ): - failed = False - with trio.move_on_after(0.5) as cs: - cs.shield = True - try: - async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'unregister_actor', - uid=self.uid - ) - except OSError: - failed = True - if cs.cancelled_caught: - failed = True - if failed: - log.warning( - f"Failed to unregister {self.name} from arbiter") - - # Ensure all peers (actors connected to us as clients) are finished - if not self._no_more_peers.is_set(): - if any( - chan.connected() for chan in chain(*self._peers.values()) - ): - log.runtime( - f"Waiting for remaining peers {self._peers} to clear") - with trio.CancelScope(shield=True): - await self._no_more_peers.wait() - log.runtime("All peer channels are complete") - - log.runtime("Runtime completed") - async def _serve_forever( self, handler_nursery: trio.Nursery, @@ -1346,6 +1153,200 @@ class Actor: return self._infected_aio +async def async_main( + actor: Actor, + accept_addr: Optional[tuple[str, int]] = None, + + # XXX: currently ``parent_addr`` is only needed for the + # ``multiprocessing`` backend (which pickles state sent to + # the child instead of relaying it over the connect-back + # channel). Once that backend is removed we can likely just + # change this to a simple ``is_subactor: bool`` which will + # be False when running as root actor and True when as + # a subactor. + parent_addr: Optional[tuple[str, int]] = None, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Actor runtime entrypoint; start the IPC channel server, maybe connect + back to the parent, and startup all core machinery tasks. + + A "root-most" (or "top-level") nursery for this actor is opened here + and when cancelled effectively cancels the actor. + + ''' + registered_with_arbiter = False + try: + + # establish primary connection with immediate parent + actor._parent_chan = None + if parent_addr is not None: + + actor._parent_chan, accept_addr_rent = await actor._from_parent( + parent_addr) + + # either it's passed in because we're not a child + # or because we're running in mp mode + if accept_addr_rent is not None: + accept_addr = accept_addr_rent + + # load exposed/allowed RPC modules + # XXX: do this **after** establishing a channel to the parent + # but **before** starting the message loop for that channel + # such that import errors are properly propagated upwards + actor.load_modules() + + # The "root" nursery ensures the channel with the immediate + # parent is kept alive as a resilient service until + # cancellation steps have (mostly) occurred in + # a deterministic way. + async with trio.open_nursery() as root_nursery: + actor._root_n = root_nursery + assert actor._root_n + + async with trio.open_nursery() as service_nursery: + # This nursery is used to handle all inbound + # connections to us such that if the TCP server + # is killed, connections can continue to process + # in the background until this nursery is cancelled. + actor._service_n = service_nursery + assert actor._service_n + + # Startup up the channel server with, + # - subactor: the bind address is sent by our parent + # over our established channel + # - root actor: the ``accept_addr`` passed to this method + assert accept_addr + host, port = accept_addr + + actor._server_n = await service_nursery.start( + partial( + actor._serve_forever, + service_nursery, + accept_host=host, + accept_port=port + ) + ) + accept_addr = actor.accept_addr + if _state._runtime_vars['_is_root']: + _state._runtime_vars['_root_mailbox'] = accept_addr + + # Register with the arbiter if we're told its addr + log.runtime(f"Registering {actor} for role `{actor.name}`") + assert isinstance(actor._arb_addr, tuple) + + async with get_arbiter(*actor._arb_addr) as arb_portal: + await arb_portal.run_from_ns( + 'self', + 'register_actor', + uid=actor.uid, + sockaddr=accept_addr, + ) + + registered_with_arbiter = True + + # init steps complete + task_status.started() + + # Begin handling our new connection back to our + # parent. This is done last since we don't want to + # start processing parent requests until our channel + # server is 100% up and running. + if actor._parent_chan: + await root_nursery.start( + partial( + process_messages, + actor, + actor._parent_chan, + shield=True, + ) + ) + log.runtime("Waiting on service nursery to complete") + log.runtime("Service nursery complete") + log.runtime("Waiting on root nursery to complete") + + # Blocks here as expected until the root nursery is + # killed (i.e. this actor is cancelled or signalled by the parent) + except Exception as err: + log.info("Closing all actor lifetime contexts") + _lifetime_stack.close() + + if not registered_with_arbiter: + # TODO: I guess we could try to connect back + # to the parent through a channel and engage a debugger + # once we have that all working with std streams locking? + log.exception( + f"Actor errored and failed to register with arbiter " + f"@ {actor._arb_addr}?") + log.error( + "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" + "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" + "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n" + "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n" + ) + + if actor._parent_chan: + await try_ship_error_to_parent(actor._parent_chan, err) + + # always! + log.exception("Actor errored:") + raise + + finally: + log.info("Runtime nursery complete") + + # tear down all lifetime contexts if not in guest mode + # XXX: should this just be in the entrypoint? + log.info("Closing all actor lifetime contexts") + + # TODO: we can't actually do this bc the debugger + # uses the _service_n to spawn the lock task, BUT, + # in theory if we had the root nursery surround this finally + # block it might be actually possible to debug THIS + # machinery in the same way as user task code? + # if actor.name == 'brokerd.ib': + # with trio.CancelScope(shield=True): + # await _debug.breakpoint() + + _lifetime_stack.close() + + # Unregister actor from the arbiter + if registered_with_arbiter and ( + actor._arb_addr is not None + ): + failed = False + with trio.move_on_after(0.5) as cs: + cs.shield = True + try: + async with get_arbiter(*actor._arb_addr) as arb_portal: + await arb_portal.run_from_ns( + 'self', + 'unregister_actor', + uid=actor.uid + ) + except OSError: + failed = True + if cs.cancelled_caught: + failed = True + if failed: + log.warning( + f"Failed to unregister {actor.name} from arbiter") + + # Ensure all peers (actors connected to us as clients) are finished + if not actor._no_more_peers.is_set(): + if any( + chan.connected() for chan in chain(*actor._peers.values()) + ): + log.runtime( + f"Waiting for remaining peers {actor._peers} to clear") + with trio.CancelScope(shield=True): + await actor._no_more_peers.wait() + log.runtime("All peer channels are complete") + + log.runtime("Runtime completed") + + async def process_messages( actor: Actor, chan: Channel, @@ -1354,9 +1355,10 @@ async def process_messages( ) -> bool: ''' - Process messages for the channel async-RPC style. + Process messages for the IPC transport channel async-RPC style. - Receive multiplexed RPC requests and deliver responses over ``chan``. + Receive multiplexed RPC requests, spawn handler tasks and deliver + responses over or boxed errors back to the "caller" task. ''' # TODO: once https://github.com/python-trio/trio/issues/467 gets @@ -1438,7 +1440,7 @@ async def process_messages( with trio.CancelScope(shield=True): # actor.cancel() was called so kill this # msg loop and break out into - # ``_async_main()`` + # ``async_main()`` log.cancel( f"Actor {actor.uid} was remotely cancelled " f"by {chan.uid}" @@ -1457,7 +1459,7 @@ async def process_messages( with trio.CancelScope(shield=True): # actor.cancel() was called so kill this # msg loop and break out into - # ``_async_main()`` + # ``async_main()`` kwargs['chan'] = chan log.cancel( f'Remote request to cancel task\n'