# tractor: structured concurrent "actors". # Copyright 2018-eternity Tyler Goodlet. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' Root actor runtime ignition(s). ''' from contextlib import ( asynccontextmanager as acm, ) from functools import partial import importlib import inspect import logging import os import signal import sys from typing import ( Any, Callable, ) import warnings import trio from .runtime import _runtime from .discovery._registry import Registrar from .devx import ( debug, _frame_stack, pformat as _pformat, ) from .spawn import _spawn from .runtime import _state from . import log from .ipc import ( _connect_chan, ) from .discovery._addr import ( Address, UnwrappedAddress, default_lo_addrs, mk_uuid, wrap_address, ) from .trionics import ( is_multi_cancelled, collapse_eg, ) from ._exceptions import ( RuntimeFailure, ) logger = log.get_logger('tractor') # TODO: stick this in a `@acm` defined in `devx.debug`? # -[ ] also maybe consider making this a `wrapt`-deco to # save an indent level? # @acm async def maybe_block_bp( debug_mode: bool, maybe_enable_greenback: bool, ) -> bool: # Override the global debugger hook to make it play nice with # ``trio``, see much discussion in: # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 builtin_bp_handler: Callable = sys.breakpointhook orig_bp_path: str|None = os.environ.get( 'PYTHONBREAKPOINT', None, ) bp_blocked: bool if ( debug_mode and maybe_enable_greenback and ( maybe_mod := await debug.maybe_init_greenback( raise_not_found=False, ) ) ): logger.info( f'Found `greenback` installed @ {maybe_mod}\n' f'Enabling `tractor.pause_from_sync()` support!\n' ) os.environ['PYTHONBREAKPOINT'] = ( 'tractor.devx.debug._sync_pause_from_builtin' ) _state._runtime_vars['use_greenback'] = True bp_blocked = False else: # TODO: disable `breakpoint()` by default (without # `greenback`) since it will break any multi-actor # usage by a clobbered TTY's stdstreams! def block_bps(*args, **kwargs): raise RuntimeError( 'Trying to use `breakpoint()` eh?\n\n' 'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n' 'If you need to use it please install `greenback` and set ' '`debug_mode=True` when opening the runtime ' '(either via `.open_nursery()` or `open_root_actor()`)\n' ) sys.breakpointhook = block_bps # lol ok, # https://docs.python.org/3/library/sys.html#sys.breakpointhook os.environ['PYTHONBREAKPOINT'] = "0" bp_blocked = True try: yield bp_blocked finally: # restore any prior built-in `breakpoint()` hook state if builtin_bp_handler is not None: sys.breakpointhook = builtin_bp_handler if orig_bp_path is not None: os.environ['PYTHONBREAKPOINT'] = orig_bp_path else: # clear env back to having no entry os.environ.pop('PYTHONBREAKPOINT', None) @acm async def open_root_actor( *, tpt_bind_addrs: list[ Address # `Address.get_random()` case |UnwrappedAddress # registrar case `= uw_reg_addrs` ]|None = None, # defaults are above registry_addrs: list[ Address |UnwrappedAddress ]|None = None, enable_transports: list[ # TODO, this should eventually be the pairs as # defined by (codec, proto) as on `MsgTransport. _state.TransportProtocolKey, ]|None = None, name: str|None = 'root', # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # OR `trio` (the new default). start_method: _spawn.SpawnMethodKey|None = None, # enables the multi-process debugger support debug_mode: bool = False, maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support # ^XXX NOTE^ the perf implications of use, # https://greenback.readthedocs.io/en/latest/principle.html#performance enable_stack_on_sig: bool = False, # internal logging loglevel: str|None = None, enable_modules: list|None = None, rpc_module_paths: list|None = None, # NOTE: allow caller to ensure that only one registry exists # and that this call creates it. ensure_registry: bool = False, hide_tb: bool = True, # XXX, proxied directly to `.devx.debug._maybe_enter_pm()` # for REPL-entry logic. debug_filter: Callable[ [BaseException|BaseExceptionGroup], bool, ] = lambda err: not is_multi_cancelled(err), # TODO, a way for actors to augment passing derived # read-only state to sublayers? # extra_rt_vars: dict|None = None, ) -> _runtime.Actor: ''' Initialize the `tractor` runtime by starting a "root actor" in a parent-most Python process. All (disjoint) actor-process-trees-as-programs are created via this entrypoint. ''' # XXX NEVER allow nested actor-trees! if already_actor := _state.current_actor( err_on_no_runtime=False, ): rtvs: dict[str, Any] = _state._runtime_vars root_mailbox: list[str, int] = rtvs['_root_mailbox'] registry_addrs: list[list[str, int]] = rtvs['_registry_addrs'] raise RuntimeFailure( f'A current actor already exists !?\n' f'({already_actor}\n' f'\n' f'You can NOT open a second root actor from within ' f'an existing tree and the current root of this ' f'already exists !!\n' f'\n' f'_root_mailbox: {root_mailbox!r}\n' f'_registry_addrs: {registry_addrs!r}\n' ) # debug.mk_pdb().set_trace() async with maybe_block_bp( debug_mode=debug_mode, maybe_enable_greenback=maybe_enable_greenback, ): if enable_transports is None: enable_transports: list[str] = _state.current_ipc_protos() else: _state._runtime_vars['_enable_tpts'] = enable_transports # TODO! support multi-tpts per actor! # Bo if not len(enable_transports) == 1: raise RuntimeError( f'No multi-tpt support yet!\n' f'enable_transports={enable_transports!r}\n' ) _frame_stack.hide_runtime_frames() __tracebackhide__: bool = hide_tb # attempt to retreive ``trio``'s sigint handler and stash it # on our debugger lock state. debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) # mark top most level process as root actor _state._runtime_vars['_is_root'] = True # caps based rpc list enable_modules = ( enable_modules or [] ) if rpc_module_paths: warnings.warn( "`rpc_module_paths` is now deprecated, use " " `enable_modules` instead.", DeprecationWarning, stacklevel=2, ) enable_modules.extend(rpc_module_paths) # `TRACTOR_LOGLEVEL` env-var wins over any caller-passed # `loglevel` so devs/test-runs can crank (or silence) # console verbosity without touching application code. env_ll_report: str = '' if env_ll := os.environ.get('TRACTOR_LOGLEVEL'): loglevel = env_ll env_ll_report: str = ( f'Detected env-var setting,\n' f'TRACTOR_LOGLEVEL={env_ll!r}\n' f'\n' f'Setting console loglevel per,\n' f'loglevel={loglevel!r}\n' ) if ( loglevel and loglevel.upper() != env_ll.upper() ): env_ll_report += ( f'\n' f'NOTE env-var OVERRIDES caller-passed,\n' f'loglevel={loglevel!r}\n' ) loglevel: str = ( loglevel or log._default_loglevel ) loglevel: str = loglevel.upper() assert loglevel _log = log.get_console_log( level=loglevel, name='tractor', logger=logger, ) assert _log if env_ll_report: _log.info(env_ll_report) # `TRACTOR_SPAWN_METHOD` env-var wins over any caller-passed # `start_method` so devs/test-runs can swap the actor spawn # backend without touching application code (e.g. driving # the `examples/debugging/