Compare commits

..

No commits in common. "29db08b37076d453e1e7b04926129b37796b9d78" and "c0058024c2cc9dabf160173a21d9cbb42b27e349" have entirely different histories.

6 changed files with 102 additions and 180 deletions

View File

@ -743,8 +743,6 @@ class Context:
# cancelled, NOT their reported canceller. IOW in the
# latter case we're cancelled by someone else getting
# cancelled.
#
# !TODO, switching to `Actor.aid` here!
if (canc := error.canceller) == self._actor.uid:
whom: str = 'us'
self._canceller = canc

View File

@ -202,9 +202,7 @@ async def open_root_actor(
'''
# XXX NEVER allow nested actor-trees!
if already_actor := _state.current_actor(
err_on_no_runtime=False,
):
if already_actor := _state.current_actor(err_on_no_runtime=False):
rtvs: dict[str, Any] = _state._runtime_vars
root_mailbox: list[str, int] = rtvs['_root_mailbox']
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
@ -274,20 +272,14 @@ async def open_root_actor(
DeprecationWarning,
stacklevel=2,
)
uw_reg_addrs = [arbiter_addr]
registry_addrs = [arbiter_addr]
uw_reg_addrs = registry_addrs
if not uw_reg_addrs:
uw_reg_addrs: list[UnwrappedAddress] = default_lo_addrs(
if not registry_addrs:
registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
enable_transports
)
# must exist by now since all below code is dependent
assert uw_reg_addrs
registry_addrs: list[Address] = [
wrap_address(uw_addr)
for uw_addr in uw_reg_addrs
]
assert registry_addrs
loglevel = (
loglevel
@ -336,10 +328,10 @@ async def open_root_actor(
enable_stack_on_sig()
# closed into below ping task-func
ponged_addrs: list[Address] = []
ponged_addrs: list[UnwrappedAddress] = []
async def ping_tpt_socket(
addr: Address,
addr: UnwrappedAddress,
timeout: float = 1,
) -> None:
'''
@ -359,22 +351,17 @@ async def open_root_actor(
# be better to eventually have a "discovery" protocol
# with basic handshake instead?
with trio.move_on_after(timeout):
async with _connect_chan(addr.unwrap()):
async with _connect_chan(addr):
ponged_addrs.append(addr)
except OSError:
# ?TODO, make this a "discovery" log level?
# TODO: make this a "discovery" log level?
logger.info(
f'No root-actor registry found @ {addr!r}\n'
f'No actor registry found @ {addr}\n'
)
# !TODO, this is basically just another (abstract)
# happy-eyeballs, so we should try for formalize it somewhere
# in a `.[_]discovery` ya?
#
async with trio.open_nursery() as tn:
for uw_addr in uw_reg_addrs:
addr: Address = wrap_address(uw_addr)
for addr in registry_addrs:
tn.start_soon(
ping_tpt_socket,
addr,
@ -403,28 +390,24 @@ async def open_root_actor(
loglevel=loglevel,
enable_modules=enable_modules,
)
# **DO NOT** use the registry_addrs as the
# ipc-transport-server's bind-addrs as this is
# a new NON-registrar, ROOT-actor.
#
# XXX INSTEAD, bind random addrs using the same tpt
# proto.
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
for addr in ponged_addrs:
waddr: Address = wrap_address(addr)
trans_bind_addrs.append(
addr.get_random(
bindspace=addr.bindspace,
)
waddr.get_random(bindspace=waddr.bindspace)
)
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
else:
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = uw_reg_addrs
trans_bind_addrs = registry_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
@ -448,16 +431,6 @@ async def open_root_actor(
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
@ -465,26 +438,20 @@ async def open_root_actor(
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
report: str = f'Starting actor-runtime for {actor.aid.reprol()!r}\n'
if reg_addrs := actor.registry_addrs:
report += (
'-> Opening new registry @ '
+
'\n'.join(
f'@{addr}' for addr in reg_addrs
)
)
logger.info(f'{report}\n')
ml_addrs_str: str = '\n'.join(
f'@{addr}' for addr in trans_bind_addrs
)
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
)
# start runtime in a bg sub-task, yield to caller.
async with (
collapse_eg(),
trio.open_nursery() as root_tn,
# XXX, finally-footgun below?
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
@ -559,14 +526,9 @@ async def open_root_actor(
)
logger.info(
f'Closing down root actor\n'
f'{op_nested_actor_repr}'
f'{op_nested_actor_repr}\n'
)
# XXX, THIS IS A *finally-footgun*!
# -> though already shields iternally it can
# taskc here and mask underlying errors raised in
# the try-block above?
with trio.CancelScope(shield=True):
await actor.cancel(None) # self cancel
await actor.cancel(None) # self cancel
finally:
# revert all process-global runtime state
if (
@ -579,16 +541,10 @@ async def open_root_actor(
_state._current_actor = None
_state._last_actor_terminated = actor
sclang_repr: str = _pformat.nest_from_op(
input_op=')>',
text=actor.pformat(),
nest_prefix='|_',
nest_indent=1,
)
logger.info(
logger.runtime(
f'Root actor terminated\n'
f'{sclang_repr}'
f')>\n'
f' |_{actor}\n'
)

View File

@ -672,8 +672,7 @@ async def _invoke(
ctx._result = res
log.runtime(
f'Sending result msg and exiting {ctx.side!r}\n'
f'\n'
f'{pretty_struct.pformat(return_msg)}\n'
f'{return_msg}\n'
)
await chan.send(return_msg)
@ -840,12 +839,12 @@ async def _invoke(
else:
descr_str += f'\n{merr!r}\n'
else:
descr_str += f'\nwith final result {ctx.outcome!r}\n'
descr_str += f'\nand final result {ctx.outcome!r}\n'
logmeth(
f'{message}\n'
f'\n'
f'{descr_str}\n'
message
+
descr_str
)
@ -1012,6 +1011,8 @@ async def process_messages(
cid=cid,
kwargs=kwargs,
):
kwargs |= {'req_chan': chan}
# XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is
# currently in debug mode!
@ -1030,14 +1031,14 @@ async def process_messages(
cid,
chan,
actor.cancel,
kwargs | {'req_chan': chan},
kwargs,
is_rpc=False,
return_msg_type=CancelAck,
)
log.runtime(
'Cancelling RPC-msg-loop with peer\n'
f'->c}} {chan.aid.reprol()}@[{chan.maddr}]\n'
'Cancelling IPC transport msg-loop with peer:\n'
f'|_{chan}\n'
)
loop_cs.cancel()
break
@ -1233,21 +1234,9 @@ async def process_messages(
# END-OF `async for`:
# IPC disconnected via `trio.EndOfChannel`, likely
# due to a (graceful) `Channel.aclose()`.
chan_op_repr: str = '<=x] '
chan_repr: str = _pformat.nest_from_op(
input_op=chan_op_repr,
op_suffix='',
nest_prefix='',
text=chan.pformat(),
nest_indent=len(chan_op_repr)-1,
rm_from_first_ln='<',
)
log.runtime(
f'IPC channel disconnected\n'
f'{chan_repr}\n'
f'\n'
f'->c) cancelling RPC tasks.\n'
f'channel for {chan.uid} disconnected, cancelling RPC tasks\n'
f'|_{chan}\n'
)
await actor.cancel_rpc_tasks(
req_aid=actor.aid,

View File

@ -213,7 +213,7 @@ class Actor:
*,
enable_modules: list[str] = [],
loglevel: str|None = None,
registry_addrs: list[Address]|None = None,
registry_addrs: list[UnwrappedAddress]|None = None,
spawn_method: str|None = None,
# TODO: remove!
@ -256,12 +256,11 @@ class Actor:
if arbiter_addr is not None:
warnings.warn(
'`Actor(arbiter_addr=<blah>)` is now deprecated.\n'
'Use `registry_addrs: list[Address]` instead.',
'Use `registry_addrs: list[tuple]` instead.',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -300,10 +299,8 @@ class Actor:
# input via the validator.
self._reg_addrs: list[UnwrappedAddress] = []
if registry_addrs:
_state._runtime_vars['_registry_addrs'] = self.reg_addrs = [
addr.unwrap()
for addr in registry_addrs
]
self.reg_addrs: list[UnwrappedAddress] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs
@property
def aid(self) -> msgtypes.Aid:
@ -475,11 +472,7 @@ class Actor:
def reg_addrs(self) -> list[UnwrappedAddress]:
'''
List of (socket) addresses for all known (and contactable)
registry-service actors in "unwrapped" (i.e. IPC interchange
wire-compat) form.
If you are looking for the "wrapped" address form, use
`.registry_addrs` instead.
registry actors.
'''
return self._reg_addrs
@ -498,14 +491,8 @@ class Actor:
self._reg_addrs = addrs
@property
def registry_addrs(self) -> list[Address]:
return [wrap_address(uw_addr)
for uw_addr in self.reg_addrs]
def load_modules(
self,
) -> None:
'''
Load explicitly enabled python modules from local fs after
@ -1378,7 +1365,7 @@ class Actor:
Return all IPC channels to the actor with provided `uid`.
'''
return self._ipc_server._peers[uid]
return self._peers[uid]
def is_infected_aio(self) -> bool:
'''
@ -1433,8 +1420,6 @@ async def async_main(
# establish primary connection with immediate parent
actor._parent_chan: Channel|None = None
# is this a sub-actor?
# get runtime info from parent.
if parent_addr is not None:
(
actor._parent_chan,
@ -1479,8 +1464,10 @@ async def async_main(
ipc_server: _server.IPCServer
async with (
collapse_eg(),
trio.open_nursery() as service_nursery,
trio.open_nursery(
strict_exception_groups=False,
) as service_nursery,
_server.open_ipc_server(
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
@ -1531,6 +1518,9 @@ async def async_main(
# TODO: why is this not with the root nursery?
try:
log.runtime(
'Booting IPC server'
)
eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery,
@ -1562,6 +1552,18 @@ async def async_main(
# TODO, just read direct from ipc_server?
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
# all sub-actors should be able to speak to
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
for addr in accept_addrs:
waddr: Address = wrap_address(addr)
raddrs.append(addr)
else:
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Register with the arbiter if we're told its addr
log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
@ -1579,7 +1581,6 @@ async def async_main(
except AssertionError:
await debug.pause()
# !TODO, get rid of the local-portal crap XD
async with get_registry(addr) as reg_portal:
for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr)
@ -1618,7 +1619,7 @@ async def async_main(
log.runtime(
'Service nursery complete\n'
'\n'
'->} waiting on root nursery to complete..\n'
'-> Waiting on root nursery to complete'
)
# Blocks here as expected until the root nursery is
@ -1673,7 +1674,6 @@ async def async_main(
finally:
teardown_report: str = (
'Main actor-runtime task completed\n'
'\n'
)
# ?TODO? should this be in `._entry`/`._root` mods instead?
@ -1715,8 +1715,7 @@ async def async_main(
# Unregister actor from the registry-sys / registrar.
if (
is_registered
and
not actor.is_registrar
and not actor.is_registrar
):
failed: bool = False
for addr in actor.reg_addrs:
@ -1751,8 +1750,7 @@ async def async_main(
ipc_server.has_peers(check_chans=True)
):
teardown_report += (
f'-> Waiting for remaining peers to clear..\n'
f' {pformat(ipc_server._peers)}'
f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
)
log.runtime(teardown_report)
await ipc_server.wait_for_no_more_peers(
@ -1760,23 +1758,20 @@ async def async_main(
)
teardown_report += (
'-]> all peer channels are complete.\n'
'-> All peer channels are complete\n'
)
# op_nested_actor_repr: str = _pformat.nest_from_op(
# input_op=')>',
# text=actor.pformat(),
# nest_prefix='|_',
# nest_indent=1, # under >
# )
teardown_report += (
'-)> actor runtime main task exit.\n'
# f'{op_nested_actor_repr}'
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op=')>',
text=actor.pformat(),
nest_prefix='|_',
nest_indent=1, # under >
)
# if _state._runtime_vars['_is_root']:
# log.info(teardown_report)
# else:
log.runtime(teardown_report)
teardown_report += (
'Actor runtime exited\n'
f'{op_nested_actor_repr}'
)
log.info(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`!

View File

@ -21,6 +21,7 @@
from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from pprint import pformat
from typing import (
TYPE_CHECKING,
)
@ -30,10 +31,7 @@ import warnings
import trio
from .devx import (
debug,
pformat as _pformat,
)
from .devx.debug import maybe_wait_for_debugger
from ._addr import (
UnwrappedAddress,
mk_uuid,
@ -202,7 +200,7 @@ class ActorNursery:
loglevel=loglevel,
# verbatim relay this actor's registrar addresses
registry_addrs=current_actor().registry_addrs,
registry_addrs=current_actor().reg_addrs,
)
parent_addr: UnwrappedAddress = self._actor.accept_addr
assert parent_addr
@ -456,7 +454,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# the "hard join phase".
log.runtime(
'Waiting on subactors to complete:\n'
f'>}} {len(an._children)}\n'
f'{pformat(an._children)}\n'
)
an._join_procs.set()
@ -470,7 +468,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
await debug.maybe_wait_for_debugger(
await maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
@ -546,7 +544,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
await debug.maybe_wait_for_debugger(
await maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
@ -595,10 +593,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
# final exit
_shutdown_msg: str = (
'Actor-runtime-shutdown'
)
# @api_frame
@acm
async def open_nursery(
@ -687,26 +681,17 @@ async def open_nursery(
):
__tracebackhide__: bool = False
op_nested_an_repr: str = _pformat.nest_from_op(
input_op=')>',
text=f'{an}',
# nest_prefix='|_',
nest_indent=1, # under >
msg: str = (
'Actor-nursery exited\n'
f'|_{an}\n'
)
an_msg: str = (
f'Actor-nursery exited\n'
f'{op_nested_an_repr}\n'
)
# keep noise low during std operation.
log.runtime(an_msg)
if implicit_runtime:
# shutdown runtime if it was started and report noisly
# that we're did so.
msg: str = (
'\n'
'\n'
f'{_shutdown_msg} )>\n'
)
msg += '=> Shutting down actor runtime <=\n'
log.info(msg)
else:
# keep noise low during std operation.
log.runtime(msg)

View File

@ -31,7 +31,7 @@ import trio
def maybe_collapse_eg(
beg: BaseExceptionGroup,
) -> BaseException|bool:
) -> BaseException:
'''
If the input beg can collapse to a single non-eg sub-exception,
return it instead.
@ -40,12 +40,13 @@ def maybe_collapse_eg(
if len(excs := beg.exceptions) == 1:
return excs[0]
return False
return beg
@acm
async def collapse_eg(
hide_tb: bool = True,
raise_from_src: bool = False,
):
'''
If `BaseExceptionGroup` raised in the body scope is
@ -60,11 +61,9 @@ async def collapse_eg(
except* BaseException as beg:
if (
exc := maybe_collapse_eg(beg)
):
if cause := exc.__cause__:
raise exc from cause
raise exc
) is not beg:
from_exc = beg if raise_from_src else None
raise exc from from_exc
raise beg