Factor `breakpoint()` blocking into `@acm`
Call it `maybe_block_bp()` can wrap the `open_root_actor()` body with it. Main reason is to guarantee we can bp inside actor runtime bootup as needed when debugging internals! Prolly should factor this to another module tho? ALSO, ensure we RTE on recurrent entries to `open_root_actor()` from within an existing tree! There was actually `test_spawning` test somehow getting away with this!? Should never be possible or allowed!leslies_extra_appendix
parent
1cb2337c7c
commit
0c60914cc4
709
tractor/_root.py
709
tractor/_root.py
|
@ -18,7 +18,9 @@
|
|||
Root actor runtime ignition(s).
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
import importlib
|
||||
import inspect
|
||||
|
@ -26,7 +28,10 @@ import logging
|
|||
import os
|
||||
import signal
|
||||
import sys
|
||||
from typing import Callable
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
)
|
||||
import warnings
|
||||
|
||||
|
||||
|
@ -47,21 +52,95 @@ from .ipc import (
|
|||
_connect_chan,
|
||||
)
|
||||
from ._addr import (
|
||||
Address,
|
||||
UnwrappedAddress,
|
||||
default_lo_addrs,
|
||||
mk_uuid,
|
||||
preferred_transport,
|
||||
wrap_address,
|
||||
)
|
||||
from ._exceptions import is_multi_cancelled
|
||||
from ._exceptions import (
|
||||
ActorFailure,
|
||||
is_multi_cancelled,
|
||||
)
|
||||
|
||||
|
||||
logger = log.get_logger('tractor')
|
||||
|
||||
|
||||
# TODO: stick this in a `@acm` defined in `devx._debug`?
|
||||
# -[ ] also maybe consider making this a `wrapt`-deco to
|
||||
# save an indent level?
|
||||
#
|
||||
@acm
|
||||
async def maybe_block_bp(
|
||||
debug_mode: bool,
|
||||
maybe_enable_greenback: bool,
|
||||
) -> bool:
|
||||
# Override the global debugger hook to make it play nice with
|
||||
# ``trio``, see much discussion in:
|
||||
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
|
||||
builtin_bp_handler: Callable = sys.breakpointhook
|
||||
orig_bp_path: str|None = os.environ.get(
|
||||
'PYTHONBREAKPOINT',
|
||||
None,
|
||||
)
|
||||
bp_blocked: bool
|
||||
if (
|
||||
debug_mode
|
||||
and maybe_enable_greenback
|
||||
and (
|
||||
maybe_mod := await _debug.maybe_init_greenback(
|
||||
raise_not_found=False,
|
||||
)
|
||||
)
|
||||
):
|
||||
logger.info(
|
||||
f'Found `greenback` installed @ {maybe_mod}\n'
|
||||
'Enabling `tractor.pause_from_sync()` support!\n'
|
||||
)
|
||||
os.environ['PYTHONBREAKPOINT'] = (
|
||||
'tractor.devx._debug._sync_pause_from_builtin'
|
||||
)
|
||||
_state._runtime_vars['use_greenback'] = True
|
||||
bp_blocked = False
|
||||
|
||||
else:
|
||||
# TODO: disable `breakpoint()` by default (without
|
||||
# `greenback`) since it will break any multi-actor
|
||||
# usage by a clobbered TTY's stdstreams!
|
||||
def block_bps(*args, **kwargs):
|
||||
raise RuntimeError(
|
||||
'Trying to use `breakpoint()` eh?\n\n'
|
||||
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
|
||||
'If you need to use it please install `greenback` and set '
|
||||
'`debug_mode=True` when opening the runtime '
|
||||
'(either via `.open_nursery()` or `open_root_actor()`)\n'
|
||||
)
|
||||
|
||||
sys.breakpointhook = block_bps
|
||||
# lol ok,
|
||||
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
|
||||
os.environ['PYTHONBREAKPOINT'] = "0"
|
||||
bp_blocked = True
|
||||
|
||||
try:
|
||||
yield bp_blocked
|
||||
finally:
|
||||
# restore any prior built-in `breakpoint()` hook state
|
||||
if builtin_bp_handler is not None:
|
||||
sys.breakpointhook = builtin_bp_handler
|
||||
|
||||
if orig_bp_path is not None:
|
||||
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
|
||||
|
||||
else:
|
||||
# clear env back to having no entry
|
||||
os.environ.pop('PYTHONBREAKPOINT', None)
|
||||
|
||||
|
||||
@acm
|
||||
async def open_root_actor(
|
||||
|
||||
*,
|
||||
# defaults are above
|
||||
registry_addrs: list[UnwrappedAddress]|None = None,
|
||||
|
@ -111,355 +190,323 @@ async def open_root_actor(
|
|||
Runtime init entry point for ``tractor``.
|
||||
|
||||
'''
|
||||
_debug.hide_runtime_frames()
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# TODO: stick this in a `@cm` defined in `devx._debug`?
|
||||
#
|
||||
# Override the global debugger hook to make it play nice with
|
||||
# ``trio``, see much discussion in:
|
||||
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
|
||||
builtin_bp_handler: Callable = sys.breakpointhook
|
||||
orig_bp_path: str|None = os.environ.get(
|
||||
'PYTHONBREAKPOINT',
|
||||
None,
|
||||
)
|
||||
if (
|
||||
debug_mode
|
||||
and maybe_enable_greenback
|
||||
and (
|
||||
maybe_mod := await _debug.maybe_init_greenback(
|
||||
raise_not_found=False,
|
||||
)
|
||||
# XXX NEVER allow nested actor-trees!
|
||||
if already_actor := _state.current_actor(err_on_no_runtime=False):
|
||||
rtvs: dict[str, Any] = _state._runtime_vars
|
||||
root_mailbox: list[str, int] = rtvs['_root_mailbox']
|
||||
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
|
||||
raise ActorFailure(
|
||||
f'A current actor already exists !?\n'
|
||||
f'({already_actor}\n'
|
||||
f'\n'
|
||||
f'You can NOT open a second root actor from within '
|
||||
f'an existing tree and the current root of this '
|
||||
f'already exists !!\n'
|
||||
f'\n'
|
||||
f'_root_mailbox: {root_mailbox!r}\n'
|
||||
f'_registry_addrs: {registry_addrs!r}\n'
|
||||
)
|
||||
|
||||
async with maybe_block_bp(
|
||||
debug_mode=debug_mode,
|
||||
maybe_enable_greenback=maybe_enable_greenback,
|
||||
):
|
||||
logger.info(
|
||||
f'Found `greenback` installed @ {maybe_mod}\n'
|
||||
'Enabling `tractor.pause_from_sync()` support!\n'
|
||||
)
|
||||
os.environ['PYTHONBREAKPOINT'] = (
|
||||
'tractor.devx._debug._sync_pause_from_builtin'
|
||||
)
|
||||
_state._runtime_vars['use_greenback'] = True
|
||||
_debug.hide_runtime_frames()
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
else:
|
||||
# TODO: disable `breakpoint()` by default (without
|
||||
# `greenback`) since it will break any multi-actor
|
||||
# usage by a clobbered TTY's stdstreams!
|
||||
def block_bps(*args, **kwargs):
|
||||
raise RuntimeError(
|
||||
'Trying to use `breakpoint()` eh?\n\n'
|
||||
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
|
||||
'If you need to use it please install `greenback` and set '
|
||||
'`debug_mode=True` when opening the runtime '
|
||||
'(either via `.open_nursery()` or `open_root_actor()`)\n'
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
# on our debugger lock state.
|
||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
# mark top most level process as root actor
|
||||
_state._runtime_vars['_is_root'] = True
|
||||
|
||||
# caps based rpc list
|
||||
enable_modules = (
|
||||
enable_modules
|
||||
or
|
||||
[]
|
||||
)
|
||||
|
||||
if rpc_module_paths:
|
||||
warnings.warn(
|
||||
"`rpc_module_paths` is now deprecated, use "
|
||||
" `enable_modules` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
enable_modules.extend(rpc_module_paths)
|
||||
|
||||
if start_method is not None:
|
||||
_spawn.try_set_start_method(start_method)
|
||||
|
||||
# TODO! remove this ASAP!
|
||||
if arbiter_addr is not None:
|
||||
warnings.warn(
|
||||
'`arbiter_addr` is now deprecated\n'
|
||||
'Use `registry_addrs: list[tuple]` instead..',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
registry_addrs = [arbiter_addr]
|
||||
|
||||
if not registry_addrs:
|
||||
registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
|
||||
enable_transports
|
||||
)
|
||||
|
||||
sys.breakpointhook = block_bps
|
||||
# lol ok,
|
||||
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
|
||||
os.environ['PYTHONBREAKPOINT'] = "0"
|
||||
assert registry_addrs
|
||||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
# on our debugger lock state.
|
||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
loglevel = (
|
||||
loglevel
|
||||
or log._default_loglevel
|
||||
).upper()
|
||||
|
||||
# mark top most level process as root actor
|
||||
_state._runtime_vars['_is_root'] = True
|
||||
|
||||
# caps based rpc list
|
||||
enable_modules = (
|
||||
enable_modules
|
||||
or
|
||||
[]
|
||||
)
|
||||
|
||||
if rpc_module_paths:
|
||||
warnings.warn(
|
||||
"`rpc_module_paths` is now deprecated, use "
|
||||
" `enable_modules` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
enable_modules.extend(rpc_module_paths)
|
||||
|
||||
if start_method is not None:
|
||||
_spawn.try_set_start_method(start_method)
|
||||
|
||||
if arbiter_addr is not None:
|
||||
warnings.warn(
|
||||
'`arbiter_addr` is now deprecated\n'
|
||||
'Use `registry_addrs: list[tuple]` instead..',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
registry_addrs = [arbiter_addr]
|
||||
|
||||
if not registry_addrs:
|
||||
registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
|
||||
enable_transports
|
||||
)
|
||||
|
||||
assert registry_addrs
|
||||
|
||||
loglevel = (
|
||||
loglevel
|
||||
or log._default_loglevel
|
||||
).upper()
|
||||
|
||||
if (
|
||||
debug_mode
|
||||
and _spawn._spawn_method == 'trio'
|
||||
):
|
||||
_state._runtime_vars['_debug_mode'] = True
|
||||
|
||||
# expose internal debug module to every actor allowing for
|
||||
# use of ``await tractor.pause()``
|
||||
enable_modules.append('tractor.devx._debug')
|
||||
|
||||
# if debug mode get's enabled *at least* use that level of
|
||||
# logging for some informative console prompts.
|
||||
if (
|
||||
logging.getLevelName(
|
||||
# lul, need the upper case for the -> int map?
|
||||
# sweet "dynamic function behaviour" stdlib...
|
||||
loglevel,
|
||||
) > logging.getLevelName('PDB')
|
||||
debug_mode
|
||||
and _spawn._spawn_method == 'trio'
|
||||
):
|
||||
loglevel = 'PDB'
|
||||
_state._runtime_vars['_debug_mode'] = True
|
||||
|
||||
# expose internal debug module to every actor allowing for
|
||||
# use of ``await tractor.pause()``
|
||||
enable_modules.append('tractor.devx._debug')
|
||||
|
||||
# if debug mode get's enabled *at least* use that level of
|
||||
# logging for some informative console prompts.
|
||||
if (
|
||||
logging.getLevelName(
|
||||
# lul, need the upper case for the -> int map?
|
||||
# sweet "dynamic function behaviour" stdlib...
|
||||
loglevel,
|
||||
) > logging.getLevelName('PDB')
|
||||
):
|
||||
loglevel = 'PDB'
|
||||
|
||||
|
||||
elif debug_mode:
|
||||
raise RuntimeError(
|
||||
"Debug mode is only supported for the `trio` backend!"
|
||||
)
|
||||
|
||||
assert loglevel
|
||||
_log = log.get_console_log(loglevel)
|
||||
assert _log
|
||||
|
||||
# TODO: factor this into `.devx._stackscope`!!
|
||||
if (
|
||||
debug_mode
|
||||
and
|
||||
enable_stack_on_sig
|
||||
):
|
||||
from .devx._stackscope import enable_stack_on_sig
|
||||
enable_stack_on_sig()
|
||||
|
||||
# closed into below ping task-func
|
||||
ponged_addrs: list[UnwrappedAddress] = []
|
||||
|
||||
async def ping_tpt_socket(
|
||||
addr: UnwrappedAddress,
|
||||
timeout: float = 1,
|
||||
) -> None:
|
||||
'''
|
||||
Attempt temporary connection to see if a registry is
|
||||
listening at the requested address by a tranport layer
|
||||
ping.
|
||||
|
||||
If a connection can't be made quickly we assume none no
|
||||
server is listening at that addr.
|
||||
|
||||
'''
|
||||
try:
|
||||
# TODO: this connect-and-bail forces us to have to
|
||||
# carefully rewrap TCP 104-connection-reset errors as
|
||||
# EOF so as to avoid propagating cancel-causing errors
|
||||
# to the channel-msg loop machinery. Likely it would
|
||||
# be better to eventually have a "discovery" protocol
|
||||
# with basic handshake instead?
|
||||
with trio.move_on_after(timeout):
|
||||
async with _connect_chan(addr):
|
||||
ponged_addrs.append(addr)
|
||||
|
||||
except OSError:
|
||||
# TODO: make this a "discovery" log level?
|
||||
logger.info(
|
||||
f'No actor registry found @ {addr}\n'
|
||||
)
|
||||
|
||||
async with trio.open_nursery() as tn:
|
||||
for addr in registry_addrs:
|
||||
tn.start_soon(
|
||||
ping_tpt_socket,
|
||||
addr,
|
||||
)
|
||||
|
||||
trans_bind_addrs: list[UnwrappedAddress] = []
|
||||
|
||||
# Create a new local root-actor instance which IS NOT THE
|
||||
# REGISTRAR
|
||||
if ponged_addrs:
|
||||
if ensure_registry:
|
||||
elif debug_mode:
|
||||
raise RuntimeError(
|
||||
f'Failed to open `{name}`@{ponged_addrs}: '
|
||||
'registry socket(s) already bound'
|
||||
"Debug mode is only supported for the `trio` backend!"
|
||||
)
|
||||
|
||||
# we were able to connect to an arbiter
|
||||
logger.info(
|
||||
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
|
||||
)
|
||||
assert loglevel
|
||||
_log = log.get_console_log(loglevel)
|
||||
assert _log
|
||||
|
||||
actor = Actor(
|
||||
name=name or 'anonymous',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=ponged_addrs,
|
||||
loglevel=loglevel,
|
||||
enable_modules=enable_modules,
|
||||
)
|
||||
# DO NOT use the registry_addrs as the transport server
|
||||
# addrs for this new non-registar, root-actor.
|
||||
for addr in ponged_addrs:
|
||||
waddr = wrap_address(addr)
|
||||
print(waddr)
|
||||
trans_bind_addrs.append(
|
||||
waddr.get_random(namespace=waddr.namespace)
|
||||
)
|
||||
|
||||
# Start this local actor as the "registrar", aka a regular
|
||||
# actor who manages the local registry of "mailboxes" of
|
||||
# other process-tree-local sub-actors.
|
||||
else:
|
||||
|
||||
# NOTE that if the current actor IS THE REGISTAR, the
|
||||
# following init steps are taken:
|
||||
# - the tranport layer server is bound to each addr
|
||||
# pair defined in provided registry_addrs, or the default.
|
||||
trans_bind_addrs = registry_addrs
|
||||
|
||||
# - it is normally desirable for any registrar to stay up
|
||||
# indefinitely until either all registered (child/sub)
|
||||
# actors are terminated (via SC supervision) or,
|
||||
# a re-election process has taken place.
|
||||
# NOTE: all of ^ which is not implemented yet - see:
|
||||
# https://github.com/goodboy/tractor/issues/216
|
||||
# https://github.com/goodboy/tractor/pull/348
|
||||
# https://github.com/goodboy/tractor/issues/296
|
||||
|
||||
actor = Arbiter(
|
||||
name=name or 'registrar',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=registry_addrs,
|
||||
loglevel=loglevel,
|
||||
enable_modules=enable_modules,
|
||||
)
|
||||
# XXX, in case the root actor runtime was actually run from
|
||||
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
|
||||
# `.trio.run()`.
|
||||
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
|
||||
|
||||
# Start up main task set via core actor-runtime nurseries.
|
||||
try:
|
||||
# assign process-local actor
|
||||
_state._current_actor = actor
|
||||
|
||||
# start local channel-server and fake the portal API
|
||||
# NOTE: this won't block since we provide the nursery
|
||||
ml_addrs_str: str = '\n'.join(
|
||||
f'@{addr}' for addr in trans_bind_addrs
|
||||
)
|
||||
logger.info(
|
||||
f'Starting local {actor.uid} on the following transport addrs:\n'
|
||||
f'{ml_addrs_str}'
|
||||
)
|
||||
|
||||
# start the actor runtime in a new task
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
||||
) as nursery:
|
||||
|
||||
# ``_runtime.async_main()`` creates an internal nursery
|
||||
# and blocks here until any underlying actor(-process)
|
||||
# tree has terminated thereby conducting so called
|
||||
# "end-to-end" structured concurrency throughout an
|
||||
# entire hierarchical python sub-process set; all
|
||||
# "actor runtime" primitives are SC-compat and thus all
|
||||
# transitively spawned actors/processes must be as
|
||||
# well.
|
||||
await nursery.start(
|
||||
partial(
|
||||
async_main,
|
||||
actor,
|
||||
accept_addrs=trans_bind_addrs,
|
||||
parent_addr=None
|
||||
)
|
||||
)
|
||||
try:
|
||||
yield actor
|
||||
except (
|
||||
Exception,
|
||||
BaseExceptionGroup,
|
||||
) as err:
|
||||
|
||||
# TODO, in beginning to handle the subsubactor with
|
||||
# crashed grandparent cases..
|
||||
#
|
||||
# was_locked: bool = await _debug.maybe_wait_for_debugger(
|
||||
# child_in_debug=True,
|
||||
# )
|
||||
# XXX NOTE XXX see equiv note inside
|
||||
# `._runtime.Actor._stream_handler()` where in the
|
||||
# non-root or root-that-opened-this-mahually case we
|
||||
# wait for the local actor-nursery to exit before
|
||||
# exiting the transport channel handler.
|
||||
entered: bool = await _debug._maybe_enter_pm(
|
||||
err,
|
||||
api_frame=inspect.currentframe(),
|
||||
debug_filter=debug_filter,
|
||||
)
|
||||
|
||||
if (
|
||||
not entered
|
||||
and
|
||||
not is_multi_cancelled(
|
||||
err,
|
||||
)
|
||||
):
|
||||
logger.exception('Root actor crashed\n')
|
||||
|
||||
# ALWAYS re-raise any error bubbled up from the
|
||||
# runtime!
|
||||
raise
|
||||
|
||||
finally:
|
||||
# NOTE: not sure if we'll ever need this but it's
|
||||
# possibly better for even more determinism?
|
||||
# logger.cancel(
|
||||
# f'Waiting on {len(nurseries)} nurseries in root..')
|
||||
# nurseries = actor._actoruid2nursery.values()
|
||||
# async with trio.open_nursery() as tempn:
|
||||
# for an in nurseries:
|
||||
# tempn.start_soon(an.exited.wait)
|
||||
|
||||
logger.info(
|
||||
'Closing down root actor'
|
||||
)
|
||||
await actor.cancel(None) # self cancel
|
||||
finally:
|
||||
_state._current_actor = None
|
||||
_state._last_actor_terminated = actor
|
||||
|
||||
# restore built-in `breakpoint()` hook state
|
||||
# TODO: factor this into `.devx._stackscope`!!
|
||||
if (
|
||||
debug_mode
|
||||
and
|
||||
maybe_enable_greenback
|
||||
enable_stack_on_sig
|
||||
):
|
||||
if builtin_bp_handler is not None:
|
||||
sys.breakpointhook = builtin_bp_handler
|
||||
from .devx._stackscope import enable_stack_on_sig
|
||||
enable_stack_on_sig()
|
||||
|
||||
if orig_bp_path is not None:
|
||||
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
|
||||
# closed into below ping task-func
|
||||
ponged_addrs: list[UnwrappedAddress] = []
|
||||
|
||||
else:
|
||||
# clear env back to having no entry
|
||||
os.environ.pop('PYTHONBREAKPOINT', None)
|
||||
async def ping_tpt_socket(
|
||||
addr: UnwrappedAddress,
|
||||
timeout: float = 1,
|
||||
) -> None:
|
||||
'''
|
||||
Attempt temporary connection to see if a registry is
|
||||
listening at the requested address by a tranport layer
|
||||
ping.
|
||||
|
||||
logger.runtime("Root actor terminated")
|
||||
If a connection can't be made quickly we assume none no
|
||||
server is listening at that addr.
|
||||
|
||||
'''
|
||||
try:
|
||||
# TODO: this connect-and-bail forces us to have to
|
||||
# carefully rewrap TCP 104-connection-reset errors as
|
||||
# EOF so as to avoid propagating cancel-causing errors
|
||||
# to the channel-msg loop machinery. Likely it would
|
||||
# be better to eventually have a "discovery" protocol
|
||||
# with basic handshake instead?
|
||||
with trio.move_on_after(timeout):
|
||||
async with _connect_chan(addr):
|
||||
ponged_addrs.append(addr)
|
||||
|
||||
except OSError:
|
||||
# TODO: make this a "discovery" log level?
|
||||
logger.info(
|
||||
f'No actor registry found @ {addr}\n'
|
||||
)
|
||||
|
||||
async with trio.open_nursery() as tn:
|
||||
for addr in registry_addrs:
|
||||
tn.start_soon(
|
||||
ping_tpt_socket,
|
||||
addr,
|
||||
)
|
||||
|
||||
trans_bind_addrs: list[UnwrappedAddress] = []
|
||||
|
||||
# Create a new local root-actor instance which IS NOT THE
|
||||
# REGISTRAR
|
||||
if ponged_addrs:
|
||||
if ensure_registry:
|
||||
raise RuntimeError(
|
||||
f'Failed to open `{name}`@{ponged_addrs}: '
|
||||
'registry socket(s) already bound'
|
||||
)
|
||||
|
||||
# we were able to connect to an arbiter
|
||||
logger.info(
|
||||
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
|
||||
)
|
||||
|
||||
actor = Actor(
|
||||
name=name or 'anonymous',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=ponged_addrs,
|
||||
loglevel=loglevel,
|
||||
enable_modules=enable_modules,
|
||||
)
|
||||
# DO NOT use the registry_addrs as the transport server
|
||||
# addrs for this new non-registar, root-actor.
|
||||
for addr in ponged_addrs:
|
||||
waddr: Address = wrap_address(addr)
|
||||
trans_bind_addrs.append(
|
||||
waddr.get_random(bindspace=waddr.bindspace)
|
||||
)
|
||||
|
||||
# Start this local actor as the "registrar", aka a regular
|
||||
# actor who manages the local registry of "mailboxes" of
|
||||
# other process-tree-local sub-actors.
|
||||
else:
|
||||
|
||||
# NOTE that if the current actor IS THE REGISTAR, the
|
||||
# following init steps are taken:
|
||||
# - the tranport layer server is bound to each addr
|
||||
# pair defined in provided registry_addrs, or the default.
|
||||
trans_bind_addrs = registry_addrs
|
||||
|
||||
# - it is normally desirable for any registrar to stay up
|
||||
# indefinitely until either all registered (child/sub)
|
||||
# actors are terminated (via SC supervision) or,
|
||||
# a re-election process has taken place.
|
||||
# NOTE: all of ^ which is not implemented yet - see:
|
||||
# https://github.com/goodboy/tractor/issues/216
|
||||
# https://github.com/goodboy/tractor/pull/348
|
||||
# https://github.com/goodboy/tractor/issues/296
|
||||
|
||||
actor = Arbiter(
|
||||
name=name or 'registrar',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=registry_addrs,
|
||||
loglevel=loglevel,
|
||||
enable_modules=enable_modules,
|
||||
)
|
||||
# XXX, in case the root actor runtime was actually run from
|
||||
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
|
||||
# `.trio.run()`.
|
||||
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
|
||||
|
||||
# Start up main task set via core actor-runtime nurseries.
|
||||
try:
|
||||
# assign process-local actor
|
||||
_state._current_actor = actor
|
||||
|
||||
# start local channel-server and fake the portal API
|
||||
# NOTE: this won't block since we provide the nursery
|
||||
ml_addrs_str: str = '\n'.join(
|
||||
f'@{addr}' for addr in trans_bind_addrs
|
||||
)
|
||||
logger.info(
|
||||
f'Starting local {actor.uid} on the following transport addrs:\n'
|
||||
f'{ml_addrs_str}'
|
||||
)
|
||||
|
||||
# start the actor runtime in a new task
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
||||
) as nursery:
|
||||
|
||||
# ``_runtime.async_main()`` creates an internal nursery
|
||||
# and blocks here until any underlying actor(-process)
|
||||
# tree has terminated thereby conducting so called
|
||||
# "end-to-end" structured concurrency throughout an
|
||||
# entire hierarchical python sub-process set; all
|
||||
# "actor runtime" primitives are SC-compat and thus all
|
||||
# transitively spawned actors/processes must be as
|
||||
# well.
|
||||
await nursery.start(
|
||||
partial(
|
||||
async_main,
|
||||
actor,
|
||||
accept_addrs=trans_bind_addrs,
|
||||
parent_addr=None
|
||||
)
|
||||
)
|
||||
try:
|
||||
yield actor
|
||||
except (
|
||||
Exception,
|
||||
BaseExceptionGroup,
|
||||
) as err:
|
||||
|
||||
# TODO, in beginning to handle the subsubactor with
|
||||
# crashed grandparent cases..
|
||||
#
|
||||
# was_locked: bool = await _debug.maybe_wait_for_debugger(
|
||||
# child_in_debug=True,
|
||||
# )
|
||||
# XXX NOTE XXX see equiv note inside
|
||||
# `._runtime.Actor._stream_handler()` where in the
|
||||
# non-root or root-that-opened-this-mahually case we
|
||||
# wait for the local actor-nursery to exit before
|
||||
# exiting the transport channel handler.
|
||||
entered: bool = await _debug._maybe_enter_pm(
|
||||
err,
|
||||
api_frame=inspect.currentframe(),
|
||||
debug_filter=debug_filter,
|
||||
)
|
||||
|
||||
if (
|
||||
not entered
|
||||
and
|
||||
not is_multi_cancelled(
|
||||
err,
|
||||
)
|
||||
):
|
||||
logger.exception(
|
||||
'Root actor crashed\n'
|
||||
f'>x)\n'
|
||||
f' |_{actor}\n'
|
||||
)
|
||||
|
||||
# ALWAYS re-raise any error bubbled up from the
|
||||
# runtime!
|
||||
raise
|
||||
|
||||
finally:
|
||||
# NOTE: not sure if we'll ever need this but it's
|
||||
# possibly better for even more determinism?
|
||||
# logger.cancel(
|
||||
# f'Waiting on {len(nurseries)} nurseries in root..')
|
||||
# nurseries = actor._actoruid2nursery.values()
|
||||
# async with trio.open_nursery() as tempn:
|
||||
# for an in nurseries:
|
||||
# tempn.start_soon(an.exited.wait)
|
||||
|
||||
logger.info(
|
||||
f'Closing down root actor\n'
|
||||
f'>)\n'
|
||||
f'|_{actor}\n'
|
||||
)
|
||||
await actor.cancel(None) # self cancel
|
||||
finally:
|
||||
_state._current_actor = None
|
||||
_state._last_actor_terminated = actor
|
||||
logger.runtime(
|
||||
f'Root actor terminated\n'
|
||||
f')>\n'
|
||||
f' |_{actor}\n'
|
||||
)
|
||||
|
||||
|
||||
def run_daemon(
|
||||
|
|
Loading…
Reference in New Issue