Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet 10f9b505ee Add `psutil` to `--dev` / testing deps 2025-04-01 22:26:56 -04:00
Tyler Goodlet 0c60914cc4 Factor `breakpoint()` blocking into `@acm`
Call it `maybe_block_bp()` can wrap the `open_root_actor()` body with
it. Main reason is to guarantee we can bp inside actor runtime bootup as
needed when debugging internals! Prolly should factor this to another
module tho?

ALSO, ensure we RTE on recurrent entries to `open_root_actor()` from
within an existing tree! There was actually `test_spawning` test somehow
getting away with this!? Should never be possible or allowed!
2025-04-01 22:25:29 -04:00
Tyler Goodlet 1cb2337c7c Add an `Actor.pformat()`
And map `.__repr__/__str__` to it and add various new fields to fill it
out,
- drop `self.uid` as var and instead add `Actor._aid: Aid` and proxy to
  it for the various `.name/.uid/.pid` properties as well as a new
  `.aid` field.
 |_ the `Aid.pid` addition is also included.

Other improvements,
- flip to a sync call to `Address.close_listener()`.
- track the `async_main()` parent task as `Actor._task`.
- add exception logging around failure to bind due to already-in-use
  when calling `add.open_listener()` in `._stream_forever()`; sometimes
  the error might be overridden by something else during the
  runtime-failure unwind..
2025-04-01 22:21:28 -04:00
5 changed files with 513 additions and 346 deletions

View File

@ -64,6 +64,7 @@ dev = [
"pyperclip>=1.9.0", "pyperclip>=1.9.0",
"prompt-toolkit>=3.0.50", "prompt-toolkit>=3.0.50",
"xonsh>=0.19.2", "xonsh>=0.19.2",
"psutil>=7.0.0",
] ]
# TODO, add these with sane versions; were originally in # TODO, add these with sane versions; were originally in
# `requirements-docs.txt`.. # `requirements-docs.txt`..

View File

@ -18,7 +18,9 @@
Root actor runtime ignition(s). Root actor runtime ignition(s).
''' '''
from contextlib import asynccontextmanager as acm from contextlib import (
asynccontextmanager as acm,
)
from functools import partial from functools import partial
import importlib import importlib
import inspect import inspect
@ -26,7 +28,10 @@ import logging
import os import os
import signal import signal
import sys import sys
from typing import Callable from typing import (
Any,
Callable,
)
import warnings import warnings
@ -47,21 +52,95 @@ from .ipc import (
_connect_chan, _connect_chan,
) )
from ._addr import ( from ._addr import (
Address,
UnwrappedAddress, UnwrappedAddress,
default_lo_addrs, default_lo_addrs,
mk_uuid, mk_uuid,
preferred_transport, preferred_transport,
wrap_address, wrap_address,
) )
from ._exceptions import is_multi_cancelled from ._exceptions import (
ActorFailure,
is_multi_cancelled,
)
logger = log.get_logger('tractor') logger = log.get_logger('tractor')
# TODO: stick this in a `@acm` defined in `devx._debug`?
# -[ ] also maybe consider making this a `wrapt`-deco to
# save an indent level?
#
@acm
async def maybe_block_bp(
debug_mode: bool,
maybe_enable_greenback: bool,
) -> bool:
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler: Callable = sys.breakpointhook
orig_bp_path: str|None = os.environ.get(
'PYTHONBREAKPOINT',
None,
)
bp_blocked: bool
if (
debug_mode
and maybe_enable_greenback
and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
)
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin'
)
_state._runtime_vars['use_greenback'] = True
bp_blocked = False
else:
# TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)
sys.breakpointhook = block_bps
# lol ok,
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
os.environ['PYTHONBREAKPOINT'] = "0"
bp_blocked = True
try:
yield bp_blocked
finally:
# restore any prior built-in `breakpoint()` hook state
if builtin_bp_handler is not None:
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT', None)
@acm @acm
async def open_root_actor( async def open_root_actor(
*, *,
# defaults are above # defaults are above
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[UnwrappedAddress]|None = None,
@ -111,55 +190,30 @@ async def open_root_actor(
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.
''' '''
# XXX NEVER allow nested actor-trees!
if already_actor := _state.current_actor(err_on_no_runtime=False):
rtvs: dict[str, Any] = _state._runtime_vars
root_mailbox: list[str, int] = rtvs['_root_mailbox']
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
raise ActorFailure(
f'A current actor already exists !?\n'
f'({already_actor}\n'
f'\n'
f'You can NOT open a second root actor from within '
f'an existing tree and the current root of this '
f'already exists !!\n'
f'\n'
f'_root_mailbox: {root_mailbox!r}\n'
f'_registry_addrs: {registry_addrs!r}\n'
)
async with maybe_block_bp(
debug_mode=debug_mode,
maybe_enable_greenback=maybe_enable_greenback,
):
_debug.hide_runtime_frames() _debug.hide_runtime_frames()
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: stick this in a `@cm` defined in `devx._debug`?
#
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler: Callable = sys.breakpointhook
orig_bp_path: str|None = os.environ.get(
'PYTHONBREAKPOINT',
None,
)
if (
debug_mode
and maybe_enable_greenback
and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
)
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin'
)
_state._runtime_vars['use_greenback'] = True
else:
# TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)
sys.breakpointhook = block_bps
# lol ok,
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
os.environ['PYTHONBREAKPOINT'] = "0"
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger lock state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
@ -186,6 +240,7 @@ async def open_root_actor(
if start_method is not None: if start_method is not None:
_spawn.try_set_start_method(start_method) _spawn.try_set_start_method(start_method)
# TODO! remove this ASAP!
if arbiter_addr is not None: if arbiter_addr is not None:
warnings.warn( warnings.warn(
'`arbiter_addr` is now deprecated\n' '`arbiter_addr` is now deprecated\n'
@ -313,10 +368,9 @@ async def open_root_actor(
# DO NOT use the registry_addrs as the transport server # DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor. # addrs for this new non-registar, root-actor.
for addr in ponged_addrs: for addr in ponged_addrs:
waddr = wrap_address(addr) waddr: Address = wrap_address(addr)
print(waddr)
trans_bind_addrs.append( trans_bind_addrs.append(
waddr.get_random(namespace=waddr.namespace) waddr.get_random(bindspace=waddr.bindspace)
) )
# Start this local actor as the "registrar", aka a regular # Start this local actor as the "registrar", aka a regular
@ -419,7 +473,11 @@ async def open_root_actor(
err, err,
) )
): ):
logger.exception('Root actor crashed\n') logger.exception(
'Root actor crashed\n'
f'>x)\n'
f' |_{actor}\n'
)
# ALWAYS re-raise any error bubbled up from the # ALWAYS re-raise any error bubbled up from the
# runtime! # runtime!
@ -436,30 +494,19 @@ async def open_root_actor(
# tempn.start_soon(an.exited.wait) # tempn.start_soon(an.exited.wait)
logger.info( logger.info(
'Closing down root actor' f'Closing down root actor\n'
f'>)\n'
f'|_{actor}\n'
) )
await actor.cancel(None) # self cancel await actor.cancel(None) # self cancel
finally: finally:
_state._current_actor = None _state._current_actor = None
_state._last_actor_terminated = actor _state._last_actor_terminated = actor
logger.runtime(
# restore built-in `breakpoint()` hook state f'Root actor terminated\n'
if ( f')>\n'
debug_mode f' |_{actor}\n'
and )
maybe_enable_greenback
):
if builtin_bp_handler is not None:
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT', None)
logger.runtime("Root actor terminated")
def run_daemon( def run_daemon(

View File

@ -200,9 +200,14 @@ class Actor:
phase (aka before a new process is executed). phase (aka before a new process is executed).
''' '''
self.name = name self._aid = msgtypes.Aid(
self.uid = (name, uuid) name=name,
uuid=uuid,
pid=os.getpid(),
)
self._task: trio.Task|None = None
# state
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called_by_remote: tuple[str, tuple]|None = None
self._cancel_called: bool = False self._cancel_called: bool = False
@ -281,6 +286,77 @@ class Actor:
self.reg_addrs: list[UnwrappedAddress] = registry_addrs self.reg_addrs: list[UnwrappedAddress] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs _state._runtime_vars['_registry_addrs'] = registry_addrs
@property
def aid(self) -> msgtypes.Aid:
'''
This process-singleton-actor's "unique ID" in struct form.
'''
return self._aid
@property
def name(self) -> str:
return self._aid.name
@property
def uid(self) -> tuple[str, str]:
'''
This process-singleton's "unique (cross-host) ID".
Delivered from the `.Aid.name/.uuid` fields as a `tuple` pair
and should be multi-host unique despite a large distributed
process plane.
'''
return (
self._aid.name,
self._aid.uuid,
)
@property
def pid(self) -> int:
return self._aid.pid
def pformat(self) -> str:
ds: str = '='
parent_uid: tuple|None = None
if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid
peers: list[tuple] = list(self._peer_connected)
listen_addrs: str = pformat(self._listen_addrs)
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f" _listen_addrs{ds}'{listen_addrs}'\n"
f" _listeners{ds}'{self._listeners}'\n"
f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
)
return (
'<Actor(\n'
+
fmtstr
+
')>\n'
)
__repr__ = pformat
@property @property
def reg_addrs(self) -> list[UnwrappedAddress]: def reg_addrs(self) -> list[UnwrappedAddress]:
''' '''
@ -421,12 +497,19 @@ class Actor:
try: try:
uid: tuple|None = await self._do_handshake(chan) uid: tuple|None = await self._do_handshake(chan)
except ( except (
# we need this for ``msgspec`` for some reason? TransportClosed,
# for now, it's been put in the stream backend. # ^XXX NOTE, the above wraps `trio` exc types raised
# during various `SocketStream.send/receive_xx()` calls
# under different fault conditions such as,
#
# trio.BrokenResourceError, # trio.BrokenResourceError,
# trio.ClosedResourceError, # trio.ClosedResourceError,
#
TransportClosed, # Inside our `.ipc._transport` layer we absorb and
# re-raise our own `TransportClosed` exc such that this
# higher level runtime code can only worry one
# "kinda-error" that we expect to tolerate during
# discovery-sys related pings, queires, DoS etc.
): ):
# XXX: This may propagate up from `Channel._aiter_recv()` # XXX: This may propagate up from `Channel._aiter_recv()`
# and `MsgpackStream._inter_packets()` on a read from the # and `MsgpackStream._inter_packets()` on a read from the
@ -1205,7 +1288,8 @@ class Actor:
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
Start the IPC transport server, begin listening for new connections. Start the IPC transport server, begin listening/accepting new
`trio.SocketStream` connections.
This will cause an actor to continue living (and thus This will cause an actor to continue living (and thus
blocking at the process/OS-thread level) until blocking at the process/OS-thread level) until
@ -1223,10 +1307,24 @@ class Actor:
self._server_down = trio.Event() self._server_down = trio.Event()
try: try:
async with trio.open_nursery() as server_n: async with trio.open_nursery() as server_n:
listeners: list[trio.abc.Listener] = [
await addr.open_listener() listeners: list[trio.abc.Listener] = []
for addr in listen_addrs for addr in listen_addrs:
] try:
listener: trio.abc.Listener = await addr.open_listener()
except OSError as oserr:
if (
'[Errno 98] Address already in use'
in
oserr.args[0]
):
log.exception(
f'Address already in use?\n'
f'{addr}\n'
)
raise
listeners.append(listener)
await server_n.start( await server_n.start(
partial( partial(
trio.serve_listeners, trio.serve_listeners,
@ -1249,8 +1347,10 @@ class Actor:
task_status.started(server_n) task_status.started(server_n)
finally: finally:
addr: Address
for addr in listen_addrs: for addr in listen_addrs:
await addr.close_listener() addr.close_listener()
# signal the server is down since nursery above terminated # signal the server is down since nursery above terminated
self._server_down.set() self._server_down.set()
@ -1717,6 +1817,8 @@ async def async_main(
the actor's "runtime" and all thus all ongoing RPC tasks. the actor's "runtime" and all thus all ongoing RPC tasks.
''' '''
actor._task: trio.Task = trio.lowlevel.current_task()
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger state. # on our debugger state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
@ -1726,18 +1828,17 @@ async def async_main(
# establish primary connection with immediate parent # establish primary connection with immediate parent
actor._parent_chan: Channel|None = None actor._parent_chan: Channel|None = None
if parent_addr is not None:
if parent_addr is not None:
( (
actor._parent_chan, actor._parent_chan,
set_accept_addr_says_rent, set_accept_addr_says_rent,
maybe_preferred_transports_says_rent, maybe_preferred_transports_says_rent,
) = await actor._from_parent(parent_addr) ) = await actor._from_parent(parent_addr)
accept_addrs: list[UnwrappedAddress] = []
# either it's passed in because we're not a child or # either it's passed in because we're not a child or
# because we're running in mp mode # because we're running in mp mode
accept_addrs: list[UnwrappedAddress] = []
if ( if (
set_accept_addr_says_rent set_accept_addr_says_rent
and and

View File

@ -143,6 +143,7 @@ class Aid(
''' '''
name: str name: str
uuid: str uuid: str
pid: int|None = None
# TODO? can/should we extend this field set? # TODO? can/should we extend this field set?
# -[ ] use built-in support for UUIDs? `uuid.UUID` which has # -[ ] use built-in support for UUIDs? `uuid.UUID` which has

17
uv.lock
View File

@ -257,6 +257,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e4/ea/d836f008d33151c7a1f62caf3d8dd782e4d15f6a43897f64480c2b8de2ad/prompt_toolkit-3.0.50-py3-none-any.whl", hash = "sha256:9b6427eb19e479d98acff65196a307c555eb567989e6d88ebbb1b509d9779198", size = 387816 }, { url = "https://files.pythonhosted.org/packages/e4/ea/d836f008d33151c7a1f62caf3d8dd782e4d15f6a43897f64480c2b8de2ad/prompt_toolkit-3.0.50-py3-none-any.whl", hash = "sha256:9b6427eb19e479d98acff65196a307c555eb567989e6d88ebbb1b509d9779198", size = 387816 },
] ]
[[package]]
name = "psutil"
version = "7.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051 },
{ url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535 },
{ url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004 },
{ url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986 },
{ url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544 },
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053 },
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885 },
]
[[package]] [[package]]
name = "ptyprocess" name = "ptyprocess"
version = "0.7.0" version = "0.7.0"
@ -373,6 +388,7 @@ dev = [
{ name = "greenback" }, { name = "greenback" },
{ name = "pexpect" }, { name = "pexpect" },
{ name = "prompt-toolkit" }, { name = "prompt-toolkit" },
{ name = "psutil" },
{ name = "pyperclip" }, { name = "pyperclip" },
{ name = "pytest" }, { name = "pytest" },
{ name = "stackscope" }, { name = "stackscope" },
@ -396,6 +412,7 @@ dev = [
{ name = "greenback", specifier = ">=1.2.1,<2" }, { name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "pexpect", specifier = ">=4.9.0,<5" }, { name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "prompt-toolkit", specifier = ">=3.0.50" }, { name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pytest", specifier = ">=8.3.5" }, { name = "pytest", specifier = ">=8.3.5" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" }, { name = "stackscope", specifier = ">=0.2.2,<0.3" },