Further formalize `greenback` integration

Since we more or less require it for `tractor.pause_from_sync()` this
refines enable toggles and their relay down the actor tree as well as
more explicit logging around init and activation.

Tweaks summary:
- `.info()` report the module if discovered during root boot.
- use a `._state._runtime_vars['use_greenback']: bool` activation flag
  inside `Actor._from_parent()` to determine if the sub should try to
  use it and set to `False` if mod-loading fails / not installed.
- expose `maybe_init_greenback()` from `.devx` sugpkg.
- comment out RTE in `._pause()` for now since we already have it in
  `.pause_from_sync()`.
- always `.exception()` on `maybe_init_greenback()` import errors to
  clarify the underlying failure deats.
- always explicitly report if `._state._runtime_vars['use_greenback']`
  was NOT set when `.pause_from_sync()` is called.

Other `._runtime.async_main()` adjustments:
- combine the "internal error call ur parents" message and the failed
  registry contact status into one new `err_report: str`.
- drop the final exception handler's call to
  `Actor.lifetime_stack.close()` since we're already doing it in the
  `finally:` block and the earlier call has no currently known benefit.
- only report on the `.lifetime_stack()` callbacks if any are detected
  as registered.
aio_abandons
Tyler Goodlet 2024-06-28 14:25:53 -04:00
parent b72a025d0f
commit 5e009a8229
5 changed files with 175 additions and 82 deletions

View File

@ -21,6 +21,7 @@ Root actor runtime ignition(s).
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import importlib import importlib
import inspect
import logging import logging
import os import os
import signal import signal
@ -115,10 +116,16 @@ async def open_root_actor(
if ( if (
debug_mode debug_mode
and maybe_enable_greenback and maybe_enable_greenback
and await _debug.maybe_init_greenback( and (
raise_not_found=False, maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
) )
): ):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = ( os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin' 'tractor.devx._debug._sync_pause_from_builtin'
) )
@ -264,7 +271,9 @@ async def open_root_actor(
except OSError: except OSError:
# TODO: make this a "discovery" log level? # TODO: make this a "discovery" log level?
logger.warning(f'No actor registry found @ {addr}') logger.info(
f'No actor registry found @ {addr}\n'
)
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
for addr in registry_addrs: for addr in registry_addrs:
@ -278,7 +287,6 @@ async def open_root_actor(
# Create a new local root-actor instance which IS NOT THE # Create a new local root-actor instance which IS NOT THE
# REGISTRAR # REGISTRAR
if ponged_addrs: if ponged_addrs:
if ensure_registry: if ensure_registry:
raise RuntimeError( raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: ' f'Failed to open `{name}`@{ponged_addrs}: '
@ -365,23 +373,25 @@ async def open_root_actor(
) )
try: try:
yield actor yield actor
except ( except (
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
) as err: ) as err:
# XXX NOTE XXX see equiv note inside
import inspect # `._runtime.Actor._stream_handler()` where in the
# non-root or root-that-opened-this-mahually case we
# wait for the local actor-nursery to exit before
# exiting the transport channel handler.
entered: bool = await _debug._maybe_enter_pm( entered: bool = await _debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
) )
if ( if (
not entered not entered
and not is_multi_cancelled(err) and
not is_multi_cancelled(err)
): ):
logger.exception('Root actor crashed:\n') logger.exception('Root actor crashed\n')
# ALWAYS re-raise any error bubbled up from the # ALWAYS re-raise any error bubbled up from the
# runtime! # runtime!

View File

@ -1046,6 +1046,10 @@ class Actor:
# TODO: another `Struct` for rtvs.. # TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']: if rvs['_debug_mode']:
from .devx import (
enable_stack_on_sig,
maybe_init_greenback,
)
try: try:
# TODO: maybe return some status msgs upward # TODO: maybe return some status msgs upward
# to that we can emit them in `con_status` # to that we can emit them in `con_status`
@ -1053,13 +1057,27 @@ class Actor:
log.devx( log.devx(
'Enabling `stackscope` traces on SIGUSR1' 'Enabling `stackscope` traces on SIGUSR1'
) )
from .devx import enable_stack_on_sig
enable_stack_on_sig() enable_stack_on_sig()
except ImportError: except ImportError:
log.warning( log.warning(
'`stackscope` not installed for use in debug mode!' '`stackscope` not installed for use in debug mode!'
) )
if rvs.get('use_greenback', False):
maybe_mod: ModuleType|None = await maybe_init_greenback()
if maybe_mod:
log.devx(
'Activated `greenback` '
'for `tractor.pause_from_sync()` support!'
)
else:
rvs['use_greenback'] = False
log.warning(
'`greenback` not installed for use in debug mode!\n'
'`tractor.pause_from_sync()` not available!'
)
rvs['_is_root'] = False rvs['_is_root'] = False
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
@ -1717,8 +1735,8 @@ async def async_main(
# Register with the arbiter if we're told its addr # Register with the arbiter if we're told its addr
log.runtime( log.runtime(
f'Registering `{actor.name}` ->\n' f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
f'{pformat(accept_addrs)}' # ^-TODO-^ we should instead show the maddr here^^
) )
# TODO: ideally we don't fan out to all registrars # TODO: ideally we don't fan out to all registrars
@ -1776,57 +1794,90 @@ async def async_main(
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err: except Exception as internal_err:
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not is_registered: if not is_registered:
err_report: str = (
'\n'
"Actor runtime (internally) failed BEFORE contacting the registry?\n"
f'registrars -> {actor.reg_addrs} ?!?!\n\n'
'^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n'
'\t>> CALMLY CANCEL YOUR CHILDREN AND CALL YOUR PARENTS <<\n\n'
'\tIf this is a sub-actor hopefully its parent will keep running '
'and cancel/reap this sub-process..\n'
'(well, presuming this error was propagated upward)\n\n'
'\t---------------------------------------------\n'
'\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT @ ' # oneline
'https://github.com/goodboy/tractor/issues\n'
'\t---------------------------------------------\n'
)
# TODO: I guess we could try to connect back # TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger # to the parent through a channel and engage a debugger
# once we have that all working with std streams locking? # once we have that all working with std streams locking?
log.exception( log.exception(err_report)
f"Actor errored and failed to register with arbiter "
f"@ {actor.reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
"\tIf this is a sub-actor hopefully its parent will keep running "
"correctly presuming this error was safely ignored..\n\n"
"\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
"https://github.com/goodboy/tractor/issues\n"
)
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_remote( await try_ship_error_to_remote(
actor._parent_chan, actor._parent_chan,
err, internal_err,
) )
# always! # always!
match err: match internal_err:
case ContextCancelled(): case ContextCancelled():
log.cancel( log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n' f'Actor: {actor.uid} was task-context-cancelled with,\n'
f'str(err)' f'str(internal_err)'
) )
case _: case _:
log.exception("Actor errored:") log.exception(
raise 'Main actor-runtime task errored\n'
f'<x)\n'
f' |_{actor}\n'
)
raise internal_err
finally: finally:
log.runtime( teardown_report: str = (
'Runtime nursery complete' 'Main actor-runtime task completed\n'
'-> Closing all actor lifetime contexts..'
) )
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
actor.lifetime_stack.close()
# TODO: we can't actually do this bc the debugger # ?TODO? should this be in `._entry`/`._root` mods instead?
# uses the _service_n to spawn the lock task, BUT, #
# in theory if we had the root nursery surround this finally # teardown any actor-lifetime-bound contexts
# block it might be actually possible to debug THIS ls: ExitStack = actor.lifetime_stack
# machinery in the same way as user task code? # only report if there are any registered
cbs: list[Callable] = [
repr(tup[1].__wrapped__)
for tup in ls._exit_callbacks
]
if cbs:
cbs_str: str = '\n'.join(cbs)
teardown_report += (
'-> Closing actor-lifetime-bound callbacks\n\n'
f'}}>\n'
f' |_{ls}\n'
f' |_{cbs_str}\n'
)
# XXX NOTE XXX this will cause an error which
# prevents any `infected_aio` actor from continuing
# and any callbacks in the `ls` here WILL NOT be
# called!!
# await _debug.pause(shield=True)
ls.close()
# XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the
# _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way
# as user task code?
#
# if actor.name == 'brokerd.ib': # if actor.name == 'brokerd.ib':
# with CancelScope(shield=True): # with CancelScope(shield=True):
# await _debug.breakpoint() # await _debug.breakpoint()
@ -1856,9 +1907,9 @@ async def async_main(
failed = True failed = True
if failed: if failed:
log.warning( teardown_report += (
f'Failed to unregister {actor.name} from ' f'-> Failed to unregister {actor.name} from '
f'registar @ {addr}' f'registar @ {addr}\n'
) )
# Ensure all peers (actors connected to us as clients) are finished # Ensure all peers (actors connected to us as clients) are finished
@ -1866,13 +1917,17 @@ async def async_main(
if any( if any(
chan.connected() for chan in chain(*actor._peers.values()) chan.connected() for chan in chain(*actor._peers.values())
): ):
log.runtime( teardown_report += (
f"Waiting for remaining peers {actor._peers} to clear") f'-> Waiting for remaining peers {actor._peers} to clear..\n'
)
log.runtime(teardown_report)
with CancelScope(shield=True): with CancelScope(shield=True):
await actor._no_more_peers.wait() await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
log.runtime("Runtime completed") teardown_report += ('-> All peer channels are complete\n')
teardown_report += ('Actor runtime exited')
log.info(teardown_report)
# TODO: rename to `Registry` and move to `._discovery`! # TODO: rename to `Registry` and move to `._discovery`!

View File

@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
# for `breakpoint()` support # for `tractor.pause_from_sync()` & `breakpoint()` support
'use_greenback': False, 'use_greenback': False,
} }

View File

@ -29,6 +29,7 @@ from ._debug import (
shield_sigint_handler as shield_sigint_handler, shield_sigint_handler as shield_sigint_handler,
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback,
post_mortem as post_mortem, post_mortem as post_mortem,
mk_pdb as mk_pdb, mk_pdb as mk_pdb,
) )

View File

@ -69,6 +69,7 @@ from trio import (
import tractor import tractor
from tractor.log import get_logger from tractor.log import get_logger
from tractor._context import Context from tractor._context import Context
from tractor import _state
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
is_root_process, is_root_process,
@ -87,9 +88,6 @@ if TYPE_CHECKING:
from tractor._runtime import ( from tractor._runtime import (
Actor, Actor,
) )
from tractor.msg import (
_codec,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -1599,12 +1597,16 @@ async def _pause(
try: try:
task: Task = current_task() task: Task = current_task()
except RuntimeError as rte: except RuntimeError as rte:
log.exception('Failed to get current task?') __tracebackhide__: bool = False
if actor.is_infected_aio(): log.exception(
raise RuntimeError( 'Failed to get current `trio`-task?'
'`tractor.pause[_from_sync]()` not yet supported ' )
'for infected `asyncio` mode!' # if actor.is_infected_aio():
) from rte # mk_pdb().set_trace()
# raise RuntimeError(
# '`tractor.pause[_from_sync]()` not yet supported '
# 'directly (infected) `asyncio` tasks!'
# ) from rte
raise raise
@ -2163,22 +2165,22 @@ def maybe_import_greenback(
return False return False
async def maybe_init_greenback( async def maybe_init_greenback(**kwargs) -> None|ModuleType:
**kwargs, try:
) -> None|ModuleType: if mod := maybe_import_greenback(**kwargs):
await mod.ensure_portal()
if mod := maybe_import_greenback(**kwargs): log.devx(
await mod.ensure_portal() '`greenback` portal opened!\n'
log.devx( 'Sync debug support activated!\n'
'`greenback` portal opened!\n' )
'Sync debug support activated!\n' return mod
) except BaseException:
return mod log.exception('Failed to init `greenback`..')
raise
return None return None
async def _pause_from_bg_root_thread( async def _pause_from_bg_root_thread(
behalf_of_thread: Thread, behalf_of_thread: Thread,
repl: PdbREPL, repl: PdbREPL,
@ -2324,6 +2326,12 @@ def pause_from_sync(
# TODO: once supported, remove this AND the one # TODO: once supported, remove this AND the one
# inside `._pause()`! # inside `._pause()`!
# outstanding impl fixes:
# -[ ] need to make `.shield_sigint()` below work here!
# -[ ] how to handle `asyncio`'s new SIGINT-handler
# injection?
# -[ ] should `breakpoint()` work and what does it normally
# do in `asyncio` ctxs?
if actor.is_infected_aio(): if actor.is_infected_aio():
raise RuntimeError( raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported ' '`tractor.pause[_from_sync]()` not yet supported '
@ -2399,18 +2407,37 @@ def pause_from_sync(
else: # we are presumably the `trio.run()` + main thread else: # we are presumably the `trio.run()` + main thread
# raises on not-found by default # raises on not-found by default
greenback: ModuleType = maybe_import_greenback() greenback: ModuleType = maybe_import_greenback()
# TODO: how to ensure this is either dynamically (if
# needed) called here (in some bg tn??) or that the
# subactor always already called it?
# greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n' message += f'-> imported {greenback}\n'
repl_owner: Task = current_task() repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
out = greenback.await_( try:
_pause( out = greenback.await_(
debug_func=None, _pause(
repl=repl, debug_func=None,
hide_tb=hide_tb, repl=repl,
called_from_sync=True, hide_tb=hide_tb,
**_pause_kwargs, called_from_sync=True,
**_pause_kwargs,
)
) )
) except RuntimeError as rte:
if not _state._runtime_vars.get(
'use_greenback',
False,
):
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
) from rte
raise
if out: if out:
bg_task, repl = out bg_task, repl = out
assert repl is repl assert repl is repl
@ -2801,10 +2828,10 @@ def open_crash_handler(
`trio.run()`. `trio.run()`.
''' '''
err: BaseException
try: try:
yield yield
except tuple(catch) as err: except tuple(catch) as err:
if type(err) not in ignore: if type(err) not in ignore:
pdbp.xpm() pdbp.xpm()