Merge pull request #422 from goodboy/global_uds_in_test_harness

Run (some) test suites in CI with `--tpt-proto uds`
ns_aware
Bd 2026-03-13 21:50:45 -04:00 committed by GitHub
commit e77198bb64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 304 additions and 151 deletions

View File

@ -75,7 +75,7 @@ jobs:
testing: testing:
name: '${{ matrix.os }} Python${{ matrix.python-version }} - spawn_backend=${{ matrix.spawn_backend }}' name: '${{ matrix.os }} Python${{ matrix.python-version }} spawn_backend=${{ matrix.spawn_backend }} tpt_proto=${{ matrix.tpt_proto }}'
timeout-minutes: 16 timeout-minutes: 16
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
@ -94,7 +94,22 @@ jobs:
'trio', 'trio',
# 'mp_spawn', # 'mp_spawn',
# 'mp_forkserver', # 'mp_forkserver',
# ?TODO^ is it worth it to get these running again?
#
# - [ ] next-gen backends, on 3.13+
# https://github.com/goodboy/tractor/issues/379
# 'subinterpreter',
# 'subint',
] ]
tpt_proto: [
'tcp',
'uds',
]
# https://github.com/orgs/community/discussions/26253#discussioncomment-3250989
exclude:
# don't do UDS run on macOS (for now)
- os: macos-latest
tpt_proto: 'uds'
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
@ -123,7 +138,11 @@ jobs:
run: uv tree run: uv tree
- name: Run tests - name: Run tests
run: uv run pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx run: >
uv run
pytest tests/ -rsx
--spawn-backend=${{ matrix.spawn_backend }}
--tpt-proto=${{ matrix.tpt_proto }}
# XXX legacy NOTE XXX # XXX legacy NOTE XXX
# #

View File

@ -23,6 +23,7 @@ pytest_plugins: list[str] = [
'tractor._testing.pytest', 'tractor._testing.pytest',
] ]
_ci_env: bool = os.environ.get('CI', False)
_non_linux: bool = platform.system() != 'Linux' _non_linux: bool = platform.system() != 'Linux'
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives # Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
@ -36,9 +37,8 @@ else:
_INT_SIGNAL = signal.SIGINT _INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = ( _PROC_SPAWN_WAIT = (
0.6 2 if _ci_env
if sys.version_info < (3, 7) else 1
else 0.4
) )
@ -115,9 +115,6 @@ def test_log(
yield _log yield _log
_ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def ci_env() -> bool: def ci_env() -> bool:
''' '''
@ -152,6 +149,7 @@ def daemon(
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
tpt_proto: str, tpt_proto: str,
ci_env: bool, ci_env: bool,
test_log: tractor.log.StackLevelAdapter,
) -> subprocess.Popen: ) -> subprocess.Popen:
''' '''
@ -167,10 +165,12 @@ def daemon(
"import tractor; " "import tractor; "
"tractor.run_daemon([], " "tractor.run_daemon([], "
"registry_addrs={reg_addrs}, " "registry_addrs={reg_addrs}, "
"enable_transports={enable_tpts}, "
"debug_mode={debug_mode}, " "debug_mode={debug_mode}, "
"loglevel={ll})" "loglevel={ll})"
).format( ).format(
reg_addrs=str([reg_addr]), reg_addrs=str([reg_addr]),
enable_tpts=str([tpt_proto]),
ll="'{}'".format(loglevel) if loglevel else None, ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode, debug_mode=debug_mode,
) )
@ -201,7 +201,7 @@ def daemon(
# so it's often required that we delay a bit more starting # so it's often required that we delay a bit more starting
# the first actor-tree.. # the first actor-tree..
if tpt_proto == 'uds': if tpt_proto == 'uds':
_PROC_SPAWN_WAIT = 0.6 _PROC_SPAWN_WAIT += 1.6
if _non_linux and ci_env: if _non_linux and ci_env:
_PROC_SPAWN_WAIT += 1 _PROC_SPAWN_WAIT += 1
@ -240,7 +240,7 @@ def daemon(
if rc < 0: if rc < 0:
raise RuntimeError(msg) raise RuntimeError(msg)
log.error(msg) test_log.error(msg)
# @pytest.fixture(autouse=True) # @pytest.fixture(autouse=True)

View File

@ -89,6 +89,7 @@ def spawn(
def _spawn( def _spawn(
cmd: str, cmd: str,
expect_timeout: float = 4,
**mkcmd_kwargs, **mkcmd_kwargs,
) -> pty_spawn.spawn: ) -> pty_spawn.spawn:
nonlocal spawned nonlocal spawned
@ -98,14 +99,17 @@ def spawn(
cmd, cmd,
**mkcmd_kwargs, **mkcmd_kwargs,
), ),
expect_timeout=( expect_timeout=(timeout:=(
10 if _non_linux and _ci_env expect_timeout + 6
else 3 if _non_linux and _ci_env
), else expect_timeout
)),
# preexec_fn=unset_colors, # preexec_fn=unset_colors,
# ^TODO? get `pytest` core to expose underlying # ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff? # `pexpect.spawn()` stuff?
) )
# sanity
assert spawned.timeout == timeout
return spawned return spawned
# such that test-dep can pass input script name. # such that test-dep can pass input script name.

View File

@ -62,6 +62,13 @@ def test_root_passes_tpt_to_sub(
reg_addr: tuple, reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
# XXX NOTE, the `reg_addr` addr won't be the same type as the
# `tpt_proto_key` would deliver here unless you pass `--tpt-proto
# <tpt_proto_key>` on the CLI.
#
# if tpt_proto_key == 'uds':
# breakpoint()
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
enable_transports=[tpt_proto_key], enable_transports=[tpt_proto_key],

View File

@ -146,9 +146,6 @@ def test_ipc_channel_break_during_stream(
# a user sending ctl-c by raising a KBI. # a user sending ctl-c by raising a KBI.
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# XXX OLD XXX # XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC. # if child calls `MsgStream.aclose()` then expect EoC.
@ -169,10 +166,6 @@ def test_ipc_channel_break_during_stream(
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# NOTE when the parent IPC side dies (even if the child does as well # NOTE when the parent IPC side dies (even if the child does as well
# but the child fails BEFORE the parent) we always expect the # but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect # IPC layer to raise a closed-resource, NEVER do we expect

View File

@ -649,7 +649,11 @@ def test_cancel_while_childs_child_in_sync_sleep(
# #
# delay = 1 # no AssertionError in eg, TooSlowError raised. # delay = 1 # no AssertionError in eg, TooSlowError raised.
# delay = 2 # is AssertionError in eg AND no TooSlowError !? # delay = 2 # is AssertionError in eg AND no TooSlowError !?
delay = 4 # is AssertionError in eg AND no _cs cancellation. # is AssertionError in eg AND no _cs cancellation.
delay = (
6 if _non_linux
else 4
)
with trio.fail_after(delay) as _cs: with trio.fail_after(delay) as _cs:
# with trio.CancelScope() as cs: # with trio.CancelScope() as cs:

View File

@ -11,7 +11,6 @@ MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None: def test_empty_mngrs_input_raises() -> None:
async def main(): async def main():
with trio.fail_after(3): with trio.fail_after(3):
async with ( async with (
@ -56,25 +55,39 @@ async def worker(
print(msg) print(msg)
assert msg == MESSAGE assert msg == MESSAGE
# TODO: does this ever cause a hang # ?TODO, does this ever cause a hang?
# assert 0 # assert 0
# ?TODO, but needs a fn-scoped tpt_proto fixture..
# @pytest.mark.no_tpt('uds')
@tractor_test @tractor_test
async def test_streaming_to_actor_cluster() -> None: async def test_streaming_to_actor_cluster(
tpt_proto: str,
):
'''
Open an actor "cluster" using the (experimental) `._clustering`
API and conduct standard inter-task-ctx streaming.
async with ( '''
open_actor_cluster(modules=[__name__]) as portals, if tpt_proto == 'uds':
pytest.skip(
f'Test currently fails with tpt-proto={tpt_proto!r}\n'
)
gather_contexts( with trio.fail_after(6):
mngrs=[p.open_context(worker) for p in portals.values()], async with (
) as contexts, open_actor_cluster(modules=[__name__]) as portals,
gather_contexts( gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts], mngrs=[p.open_context(worker) for p in portals.values()],
) as streams, ) as contexts,
): gather_contexts(
with trio.move_on_after(1): mngrs=[ctx[0].open_stream() for ctx in contexts],
for stream in itertools.cycle(streams): ) as streams,
await stream.send(MESSAGE)
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)

View File

@ -7,6 +7,7 @@ import signal
import platform import platform
from functools import partial from functools import partial
import itertools import itertools
import time
from typing import Callable from typing import Callable
import psutil import psutil
@ -31,7 +32,7 @@ async def test_reg_then_unreg(
) as n: ) as n:
portal = await n.start_actor('actor', enable_modules=[__name__]) portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid uid = portal.channel.aid.uid
async with tractor.get_registry(reg_addr) as aportal: async with tractor.get_registry(reg_addr) as aportal:
# this local actor should be the arbiter # this local actor should be the arbiter
@ -205,7 +206,7 @@ async def spawn_and_check_registry(
# ensure current actor is registered # ensure current actor is registered
registry: dict = await get_reg() registry: dict = await get_reg()
assert actor.uid in registry assert actor.aid.uid in registry
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
@ -253,8 +254,21 @@ async def spawn_and_check_registry(
# all subactors should have de-registered # all subactors should have de-registered
registry = await get_reg() registry = await get_reg()
assert len(registry) == extra start: float = time.time()
assert actor.uid in registry while (
not (len(registry) == extra)
and
(time.time() - start) < 5
):
print(
f'Waiting for remaining subs to dereg..\n'
f'{registry!r}\n'
)
await trio.sleep(0.3)
else:
assert len(registry) == extra
assert actor.aid.uid in registry
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@ -384,8 +398,8 @@ async def close_chans_before_nursery(
# all subactors should have de-registered # all subactors should have de-registered
registry = await get_reg() registry = await get_reg()
assert portal1.channel.uid not in registry assert portal1.channel.aid.uid not in registry
assert portal2.channel.uid not in registry assert portal2.channel.aid.uid not in registry
assert len(registry) == entries_at_end assert len(registry) == entries_at_end

View File

@ -581,7 +581,7 @@ def test_peer_canceller(
assert ( assert (
re.canceller re.canceller
== ==
root.uid root.aid.uid
) )
else: # the other 2 ctxs else: # the other 2 ctxs
@ -590,7 +590,7 @@ def test_peer_canceller(
and ( and (
re.canceller re.canceller
== ==
canceller.channel.uid canceller.channel.aid.uid
) )
) )
@ -745,7 +745,7 @@ def test_peer_canceller(
# -> each context should have received # -> each context should have received
# a silently absorbed context cancellation # a silently absorbed context cancellation
# in its remote nursery scope. # in its remote nursery scope.
# assert ctx.chan.uid == ctx.canceller # assert ctx.chan.aid.uid == ctx.canceller
# NOTE: when an inter-peer cancellation # NOTE: when an inter-peer cancellation
# occurred, we DO NOT expect this # occurred, we DO NOT expect this
@ -802,7 +802,7 @@ async def basic_echo_server(
''' '''
actor: Actor = tractor.current_actor() actor: Actor = tractor.current_actor()
uid: tuple = actor.uid uid: tuple = actor.aid.uid
await ctx.started(uid) await ctx.started(uid)
async with ctx.open_stream() as ipc: async with ctx.open_stream() as ipc:
async for msg in ipc: async for msg in ipc:
@ -857,7 +857,7 @@ async def serve_subactors(
f'|_{peer}\n' f'|_{peer}\n'
) )
await ipc.send(( await ipc.send((
peer.chan.uid, peer.chan.aid.uid,
peer.chan.raddr.unwrap(), peer.chan.raddr.unwrap(),
)) ))
@ -992,7 +992,7 @@ async def tell_little_bro(
sub_ctx.open_stream() as echo_ipc, sub_ctx.open_stream() as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
uid: tuple = actor.uid uid: tuple = actor.aid.uid
for i in range(rng_seed): for i in range(rng_seed):
msg: tuple = ( msg: tuple = (
uid, uid,
@ -1097,7 +1097,7 @@ def test_peer_spawns_and_cancels_service_subactor(
) as (client_ctx, client_says), ) as (client_ctx, client_says),
): ):
root: Actor = current_actor() root: Actor = current_actor()
spawner_uid: tuple = spawn_ctx.chan.uid spawner_uid: tuple = spawn_ctx.chan.aid.uid
print( print(
f'Server says: {first}\n' f'Server says: {first}\n'
f'Client says: {client_says}\n' f'Client says: {client_says}\n'
@ -1116,7 +1116,7 @@ def test_peer_spawns_and_cancels_service_subactor(
print( print(
'Sub-spawn came online\n' 'Sub-spawn came online\n'
f'portal: {sub}\n' f'portal: {sub}\n'
f'.uid: {sub.actor.uid}\n' f'.uid: {sub.actor.aid.uid}\n'
f'chan.raddr: {sub.chan.raddr}\n' f'chan.raddr: {sub.chan.raddr}\n'
) )
@ -1150,7 +1150,7 @@ def test_peer_spawns_and_cancels_service_subactor(
assert isinstance(res, ContextCancelled) assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked assert client_ctx.cancel_acked
assert res.canceller == root.uid assert res.canceller == root.aid.uid
assert not raise_sub_spawn_error_after assert not raise_sub_spawn_error_after
# cancelling the spawner sub should # cancelling the spawner sub should
@ -1184,8 +1184,8 @@ def test_peer_spawns_and_cancels_service_subactor(
# little_bro: a `RuntimeError`. # little_bro: a `RuntimeError`.
# #
check_inner_rte(rae) check_inner_rte(rae)
assert rae.relay_uid == client.chan.uid assert rae.relay_uid == client.chan.aid.uid
assert rae.src_uid == sub.chan.uid assert rae.src_uid == sub.chan.aid.uid
assert not client_ctx.cancel_acked assert not client_ctx.cancel_acked
assert ( assert (
@ -1214,12 +1214,12 @@ def test_peer_spawns_and_cancels_service_subactor(
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
_ctxc = ctxc _ctxc = ctxc
print( print(
f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n' f'{root.aid.uid} caught ctxc from ctx with {client_ctx.chan.aid.uid}\n'
f'{repr(ctxc)}\n' f'{repr(ctxc)}\n'
) )
if not raise_sub_spawn_error_after: if not raise_sub_spawn_error_after:
assert ctxc.canceller == root.uid assert ctxc.canceller == root.aid.uid
else: else:
assert ctxc.canceller == spawner_uid assert ctxc.canceller == spawner_uid

View File

@ -190,7 +190,10 @@ async def aggregate(seed: int):
# leverage trio's built-in backpressure # leverage trio's built-in backpressure
await send_chan.send(value) await send_chan.send(value)
print(f"FINISHED ITERATING {portal.channel.uid}") print(
f'FINISHED ITERATING!\n'
f'peer: {portal.channel.aid.uid}'
)
# spawn 2 trio tasks to collect streams and push to a local queue # spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:

View File

@ -22,6 +22,10 @@ def unlink_file():
async def crash_and_clean_tmpdir( async def crash_and_clean_tmpdir(
tmp_file_path: str, tmp_file_path: str,
error: bool = True, error: bool = True,
rent_cancel: bool = True,
# XXX unused, but do we really need to test these cases?
self_cancel: bool = False,
): ):
global _file_path global _file_path
_file_path = tmp_file_path _file_path = tmp_file_path
@ -32,43 +36,75 @@ async def crash_and_clean_tmpdir(
assert os.path.isfile(tmp_file_path) assert os.path.isfile(tmp_file_path)
await trio.sleep(0.1) await trio.sleep(0.1)
if error: if error:
print('erroring in subactor!')
assert 0 assert 0
else:
elif self_cancel:
print('SELF-cancelling subactor!')
actor.cancel_soon() actor.cancel_soon()
elif rent_cancel:
await trio.sleep_forever()
print('subactor exiting task!')
@pytest.mark.parametrize( @pytest.mark.parametrize(
'error_in_child', 'error_in_child',
[True, False], [True, False],
ids='error_in_child={}'.format,
) )
@tractor_test @tractor_test
async def test_lifetime_stack_wipes_tmpfile( async def test_lifetime_stack_wipes_tmpfile(
tmp_path, tmp_path,
error_in_child: bool, error_in_child: bool,
loglevel: str,
# log: tractor.log.StackLevelAdapter,
# ^TODO, once landed via macos support!
): ):
child_tmp_file = tmp_path / "child.txt" child_tmp_file = tmp_path / "child.txt"
child_tmp_file.touch() child_tmp_file.touch()
assert child_tmp_file.exists() assert child_tmp_file.exists()
path = str(child_tmp_file) path = str(child_tmp_file)
# NOTE, this is expected to cancel the sub
# in the `error_in_child=False` case!
timeout: float = (
1.6 if error_in_child
else 1
)
try: try:
with trio.move_on_after(0.5): with trio.move_on_after(timeout) as cs:
async with tractor.open_nursery() as n: async with tractor.open_nursery(
await ( # inlined portal loglevel=loglevel,
await n.run_in_actor( ) as an:
crash_and_clean_tmpdir, await ( # inlined `tractor.Portal`
tmp_file_path=path, await an.run_in_actor(
error=error_in_child, crash_and_clean_tmpdir,
) tmp_file_path=path,
).result() error=error_in_child,
)
).result()
except ( except (
tractor.RemoteActorError, tractor.RemoteActorError,
# tractor.BaseExceptionGroup,
BaseExceptionGroup, BaseExceptionGroup,
): ) as _exc:
pass exc = _exc
from tractor.log import get_console_log
log = get_console_log(
level=loglevel,
name=__name__,
)
log.exception(
f'Subactor failed as expected with {type(exc)!r}\n'
)
# tmp file should have been wiped by # tmp file should have been wiped by
# teardown stack. # teardown stack.
assert not child_tmp_file.exists() assert not child_tmp_file.exists()
if error_in_child:
assert not cs.cancel_called
else:
# expect timeout in some cases?
assert cs.cancel_called

View File

@ -66,7 +66,7 @@ async def open_actor_cluster(
trio.open_nursery() as tn, trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc() tractor.trionics.maybe_raise_from_masking_exc()
): ):
uid = tractor.current_actor().uid uid = tractor.current_actor().aid.uid
async def _start(name: str) -> None: async def _start(name: str) -> None:
name = f'{uid[0]}.{name}' name = f'{uid[0]}.{name}'

View File

@ -463,10 +463,11 @@ class Context:
# self._cancel_called = val # self._cancel_called = val
# TODO, use the `Actor.aid: Aid` instead!
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str]|None:
''' '''
`Actor.uid: tuple[str, str]` of the (remote) `Actor.aid.uid: tuple[str, str]` of the (remote)
actor-process who's task was cancelled thus causing this actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled. (side of the) context to also be cancelled.
@ -499,12 +500,12 @@ class Context:
if from_uid := re.src_uid: if from_uid := re.src_uid:
from_uid: tuple = tuple(from_uid) from_uid: tuple = tuple(from_uid)
our_uid: tuple = self._actor.uid our_uid: tuple = self._actor.aid.uid
our_canceller = self.canceller our_canceller = self.canceller
return bool( return bool(
isinstance((ctxc := re), ContextCancelled) isinstance((ctxc := re), ContextCancelled)
and from_uid == self.chan.uid and from_uid == self.chan.aid.uid
and ctxc.canceller == our_uid and ctxc.canceller == our_uid
and our_canceller == our_uid and our_canceller == our_uid
) )
@ -515,7 +516,7 @@ class Context:
Records whether the task on the remote side of this IPC Records whether the task on the remote side of this IPC
context acknowledged a cancel request via a relayed context acknowledged a cancel request via a relayed
`ContextCancelled` with the `.canceller` attr set to the `ContextCancelled` with the `.canceller` attr set to the
`Actor.uid` of the local actor who's task entered `Actor.aid.uid` of the local actor who's task entered
`Portal.open_context()`. `Portal.open_context()`.
This will only be `True` when `.cancel()` is called and This will only be `True` when `.cancel()` is called and
@ -789,8 +790,8 @@ class Context:
# appropriately. # appropriately.
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n' f'<= {self.peer_side!r}: {self.chan.aid.reprol()}\n'
f'=> {self.side!r}: {self._actor.uid}\n\n' f'=> {self.side!r}: {self._actor.aid.reprol()}\n\n'
f'{error!r}' f'{error!r}'
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
@ -811,7 +812,7 @@ class Context:
# cancelled. # cancelled.
# #
# !TODO, switching to `Actor.aid` here! # !TODO, switching to `Actor.aid` here!
if (canc := error.canceller) == self._actor.uid: if (canc := error.canceller) == self._actor.aid.uid:
whom: str = 'us' whom: str = 'us'
self._canceller = canc self._canceller = canc
else: else:
@ -1036,7 +1037,7 @@ class Context:
--------- ---------
- after the far end cancels, the `.cancel()` calling side - after the far end cancels, the `.cancel()` calling side
should receive a `ContextCancelled` with the should receive a `ContextCancelled` with the
`.canceller: tuple` uid set to the current `Actor.uid`. `.canceller: tuple` uid set to the current `Actor.aid.uid`.
- timeout (quickly) on failure to rx this ACK error-msg in - timeout (quickly) on failure to rx this ACK error-msg in
an attempt to sidestep 2-generals when the transport an attempt to sidestep 2-generals when the transport
@ -1065,9 +1066,9 @@ class Context:
) )
reminfo: str = ( reminfo: str = (
# ' =>\n' # ' =>\n'
# f'Context.cancel() => {self.chan.uid}\n' # f'Context.cancel() => {self.chan.aid.uid}\n'
f'\n' f'\n'
f'c)=> {self.chan.uid}\n' f'c)=> {self.chan.aid.reprol()}\n'
f' |_[{self.dst_maddr}\n' f' |_[{self.dst_maddr}\n'
f' >> {self.repr_rpc}\n' f' >> {self.repr_rpc}\n'
# f' >> {self._nsf}() -> {codec}[dict]:\n\n' # f' >> {self._nsf}() -> {codec}[dict]:\n\n'
@ -1211,7 +1212,7 @@ class Context:
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
peer_uid: tuple = self.chan.uid peer_uid: tuple = self.chan.aid.uid
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
# for "graceful cancellation" case(s): # for "graceful cancellation" case(s):
@ -1228,7 +1229,7 @@ class Context:
# (`ContextCancelled`) as an expected # (`ContextCancelled`) as an expected
# error-msg-is-cancellation-ack IFF said # error-msg-is-cancellation-ack IFF said
# `remote_error: ContextCancelled` has `.canceller` # `remote_error: ContextCancelled` has `.canceller`
# set to the `Actor.uid` of THIS task (i.e. the # set to the `Actor.aid.uid` of THIS task (i.e. the
# cancellation requesting task's actor is the actor # cancellation requesting task's actor is the actor
# checking whether it should absorb the ctxc). # checking whether it should absorb the ctxc).
self_ctxc: bool = self._is_self_cancelled(remote_error) self_ctxc: bool = self._is_self_cancelled(remote_error)
@ -1679,7 +1680,7 @@ class Context:
elif self._started_called: elif self._started_called:
raise RuntimeError( raise RuntimeError(
f'called `.started()` twice on context with {self.chan.uid}' f'called `.started()` twice on context with {self.chan.aid.uid}'
) )
started_msg = Started( started_msg = Started(
@ -1812,7 +1813,7 @@ class Context:
''' '''
cid: str = self.cid cid: str = self.cid
chan: Channel = self.chan chan: Channel = self.chan
from_uid: tuple[str, str] = chan.uid from_uid: tuple[str, str] = chan.aid.uid
send_chan: trio.MemorySendChannel = self._send_chan send_chan: trio.MemorySendChannel = self._send_chan
nsf: NamespacePath = self._nsf nsf: NamespacePath = self._nsf
@ -1953,20 +1954,22 @@ class Context:
# overrun state and that msg isn't stuck in an # overrun state and that msg isn't stuck in an
# overflow queue what happens?!? # overflow queue what happens?!?
local_uid = self._actor.uid local_aid = self._actor.aid
txt: str = ( txt: str = (
'on IPC context:\n' 'on IPC context:\n'
f'<= sender: {from_uid}\n' f'<= sender: {from_uid}\n'
f' |_ {self._nsf}()\n\n' f' |_ {self._nsf}()\n\n'
f'=> overrun: {local_uid}\n' f'=> overrun: {local_aid.reprol()!r}\n'
f' |_cid: {cid}\n' f' |_cid: {cid}\n'
f' |_task: {self._task}\n' f' |_task: {self._task}\n'
) )
if not self._stream_opened: if not self._stream_opened:
txt += ( txt += (
f'\n*** No stream open on `{local_uid[0]}` side! ***\n\n' f'\n'
f'*** No stream open on `{local_aid.name}` side! ***\n'
f'\n'
f'{msg}\n' f'{msg}\n'
) )
@ -2115,7 +2118,11 @@ async def open_context_from_portal(
# XXX NOTE XXX: currenly we do NOT allow opening a contex # XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing # with "self" since the local feeder mem-chan processing
# is not built for it. # is not built for it.
if (uid := portal.channel.uid) == portal.actor.uid: if (
(uid := portal.channel.aid.uid)
==
portal.actor.aid.uid
):
raise RuntimeError( raise RuntimeError(
'** !! Invalid Operation !! **\n' '** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n' 'Can not open an IPC ctx with the local actor!\n'
@ -2329,7 +2336,7 @@ async def open_context_from_portal(
and and
ctxc is ctx._remote_error ctxc is ctx._remote_error
and and
ctxc.canceller == portal.actor.uid ctxc.canceller == portal.actor.aid.uid
): ):
log.cancel( log.cancel(
f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
@ -2406,7 +2413,7 @@ async def open_context_from_portal(
logmeth(msg) logmeth(msg)
if debug_mode(): if debug_mode():
# async with debug.acquire_debug_lock(portal.actor.uid): # async with debug.acquire_debug_lock(portal.actor.aid.uid):
# pass # pass
# TODO: factor ^ into below for non-root cases? # TODO: factor ^ into below for non-root cases?
# #

View File

@ -982,6 +982,7 @@ class TransportClosed(Exception):
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
message: str = message or self.message message: str = message or self.message
# when a cause is set, slap it onto the log emission. # when a cause is set, slap it onto the log emission.
if cause := self.src_exc: if cause := self.src_exc:
cause_tb_str: str = ''.join( cause_tb_str: str = ''.join(
@ -989,7 +990,7 @@ class TransportClosed(Exception):
) )
message += ( message += (
f'{cause_tb_str}\n' # tb f'{cause_tb_str}\n' # tb
f' {cause}\n' # exc repr f'{cause!r}\n' # exc repr
) )
getattr( getattr(

View File

@ -252,8 +252,8 @@ async def _invoke_non_context(
): ):
log.warning( log.warning(
'Failed to send RPC result?\n' 'Failed to send RPC result?\n'
f'|_{func}@{actor.uid}() -> {ret_msg}\n\n' f'|_{func}@{actor.aid.reprol()}() -> {ret_msg}\n\n'
f'x=> peer: {chan.uid}\n' f'x=> peer: {chan.aid.reprol()}\n'
) )
@acm @acm
@ -698,7 +698,7 @@ async def _invoke(
# which cancels the scope presuming the input error # which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser. # is not a `.cancel_acked` pleaser.
if rpc_ctx_cs.cancelled_caught: if rpc_ctx_cs.cancelled_caught:
our_uid: tuple = actor.uid our_uid: tuple = actor.aid.uid
# first check for and raise any remote error # first check for and raise any remote error
# before raising any context cancelled case # before raising any context cancelled case
@ -730,7 +730,7 @@ async def _invoke(
# TODO: determine if the ctx peer task was the # TODO: determine if the ctx peer task was the
# exact task which cancelled, vs. some other # exact task which cancelled, vs. some other
# task in the same actor. # task in the same actor.
elif canceller == ctx.chan.uid: elif canceller == ctx.chan.aid.uid:
explain += f'its {ctx.peer_side!r}-side peer' explain += f'its {ctx.peer_side!r}-side peer'
elif canceller == our_uid: elif canceller == our_uid:
@ -825,7 +825,7 @@ async def _invoke(
# associated child isn't in debug any more # associated child isn't in debug any more
await debug.maybe_wait_for_debugger() await debug.maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop(( ctx: Context = actor._contexts.pop((
chan.uid, chan.aid.uid,
cid, cid,
)) ))
@ -927,7 +927,7 @@ async def try_ship_error_to_remote(
log.critical( log.critical(
'IPC transport failure -> ' 'IPC transport failure -> '
f'failed to ship error to {remote_descr}!\n\n' f'failed to ship error to {remote_descr}!\n\n'
f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.uid}\n' f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.aid.uid}\n'
f'\n' f'\n'
# TODO: use `.msg.preetty_struct` for this! # TODO: use `.msg.preetty_struct` for this!
f'{msg}\n' f'{msg}\n'
@ -1005,7 +1005,7 @@ async def process_messages(
async for msg in chan: async for msg in chan:
log.transport( # type: ignore log.transport( # type: ignore
f'IPC msg from peer\n' f'IPC msg from peer\n'
f'<= {chan.uid}\n\n' f'<= {chan.aid.reprol()}\n\n'
# TODO: use of the pprinting of structs is # TODO: use of the pprinting of structs is
# FRAGILE and should prolly not be # FRAGILE and should prolly not be
@ -1109,7 +1109,7 @@ async def process_messages(
except BaseException: except BaseException:
log.exception( log.exception(
'Failed to cancel task?\n' 'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n' f'<= canceller: {chan.aid.reprol()}\n'
f' |_{chan}\n\n' f' |_{chan}\n\n'
f'=> {actor}\n' f'=> {actor}\n'
f' |_cid: {target_cid}\n' f' |_cid: {target_cid}\n'
@ -1264,7 +1264,7 @@ async def process_messages(
log.transport( log.transport(
'Waiting on next IPC msg from\n' 'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n' f'peer: {chan.aid.reprol()}\n'
f'|_{chan}\n' f'|_{chan}\n'
) )
@ -1313,12 +1313,10 @@ async def process_messages(
f'peer IPC channel closed abruptly?\n' f'peer IPC channel closed abruptly?\n'
f'\n' f'\n'
f'<=x[\n' f'<=x[\n'
f' {chan}\n' f'{chan}\n'
f' |_{chan.raddr}\n\n'
) )
+ +
tc.message tc.message
) )
# transport **WAS** disconnected # transport **WAS** disconnected
@ -1341,8 +1339,8 @@ async def process_messages(
match err: match err:
case ContextCancelled(): case ContextCancelled():
log.cancel( log.cancel(
f'Actor: {actor.uid} was context-cancelled with,\n' f'Actor: {actor.aid.reprol()!r} is ctxc with,\n'
f'str(err)' f'{str(err)}'
) )
case _: case _:
log.exception("Actor errored:") log.exception("Actor errored:")

View File

@ -691,7 +691,7 @@ class Actor:
''' '''
# ?TODO, use Aid here as well? # ?TODO, use Aid here as well?
actor_uid = chan.uid actor_uid = chan.aid.uid
assert actor_uid assert actor_uid
try: try:
ctx = self._contexts[( ctx = self._contexts[(
@ -701,7 +701,7 @@ class Actor:
)] )]
log.debug( log.debug(
f'Retreived cached IPC ctx for\n' f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.aid.uid}\n'
f'cid:{cid}\n' f'cid:{cid}\n'
) )
ctx._allow_overruns: bool = allow_overruns ctx._allow_overruns: bool = allow_overruns
@ -718,7 +718,7 @@ class Actor:
except KeyError: except KeyError:
log.debug( log.debug(
f'Allocate new IPC ctx for\n' f'Allocate new IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.aid.uid}\n'
f'cid: {cid}\n' f'cid: {cid}\n'
) )
ctx = mk_context( ctx = mk_context(
@ -764,7 +764,7 @@ class Actor:
''' '''
cid: str = str(uuid.uuid4()) cid: str = str(uuid.uuid4())
assert chan.uid assert chan.aid.uid
ctx = self.get_context( ctx = self.get_context(
chan=chan, chan=chan,
cid=cid, cid=cid,
@ -791,12 +791,12 @@ class Actor:
ns=ns, ns=ns,
func=func, func=func,
kwargs=kwargs, kwargs=kwargs,
uid=self.uid, uid=self.aid.uid, # <- !TODO use .aid!
cid=cid, cid=cid,
) )
log.runtime( log.runtime(
'Sending RPC `Start`\n\n' 'Sending RPC `Start`\n\n'
f'=> peer: {chan.uid}\n' f'=> peer: {chan.aid.uid}\n'
f' |_ {ns}.{func}({kwargs})\n\n' f' |_ {ns}.{func}({kwargs})\n\n'
f'{pretty_struct.pformat(msg)}' f'{pretty_struct.pformat(msg)}'
@ -1244,7 +1244,7 @@ class Actor:
'Cancel request for invalid RPC task.\n' 'Cancel request for invalid RPC task.\n'
'The task likely already completed or was never started!\n\n' 'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_aid}\n' f'<= canceller: {requesting_aid}\n'
f'=> {cid}@{parent_chan.uid}\n' f'=> {cid}@{parent_chan.aid.uid}\n'
f' |_{parent_chan}\n' f' |_{parent_chan}\n'
) )
return True return True
@ -1381,7 +1381,7 @@ class Actor:
f'Cancelling {descr} RPC tasks\n\n' f'Cancelling {descr} RPC tasks\n\n'
f'<=c) {req_aid} [canceller]\n' f'<=c) {req_aid} [canceller]\n'
f'{rent_chan_repr}' f'{rent_chan_repr}'
f'c)=> {self.uid} [cancellee]\n' f'c)=> {self.aid.uid} [cancellee]\n'
f' |_{self} [with {len(tasks)} tasks]\n' f' |_{self} [with {len(tasks)} tasks]\n'
# f' |_tasks: {len(tasks)}\n' # f' |_tasks: {len(tasks)}\n'
# f'{tasks_str}' # f'{tasks_str}'
@ -1687,7 +1687,7 @@ async def async_main(
await reg_portal.run_from_ns( await reg_portal.run_from_ns(
'self', 'self',
'register_actor', 'register_actor',
uid=actor.uid, uid=actor.aid.uid,
addr=accept_addr.unwrap(), addr=accept_addr.unwrap(),
) )
@ -1758,9 +1758,11 @@ async def async_main(
# always! # always!
match internal_err: match internal_err:
case ContextCancelled(): case ContextCancelled():
reprol: str = actor.aid.reprol()
log.cancel( log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n' f'Actor {reprol!r} was task-ctx-cancelled with,\n'
f'str(internal_err)' f'\n'
f'{internal_err!r}'
) )
case _: case _:
log.exception( log.exception(
@ -1832,7 +1834,7 @@ async def async_main(
await reg_portal.run_from_ns( await reg_portal.run_from_ns(
'self', 'self',
'unregister_actor', 'unregister_actor',
uid=actor.uid uid=actor.aid.uid,
) )
except OSError: except OSError:
failed = True failed = True

View File

@ -151,7 +151,7 @@ async def exhaust_portal(
__tracebackhide__ = True __tracebackhide__ = True
try: try:
log.debug( log.debug(
f'Waiting on final result from {actor.uid}' f'Waiting on final result from {actor.aid.uid}'
) )
# XXX: streams should never be reaped here since they should # XXX: streams should never be reaped here since they should
@ -210,17 +210,17 @@ async def cancel_on_completion(
actor, actor,
) )
if isinstance(result, Exception): if isinstance(result, Exception):
errors[actor.uid]: Exception = result errors[actor.aid.uid]: Exception = result
log.cancel( log.cancel(
'Cancelling subactor runtime due to error:\n\n' 'Cancelling subactor runtime due to error:\n\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n\n' f'Portal.cancel_actor() => {portal.channel.aid}\n\n'
f'error: {result}\n' f'error: {result}\n'
) )
else: else:
log.runtime( log.runtime(
'Cancelling subactor gracefully:\n\n' 'Cancelling subactor gracefully:\n\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n\n' f'Portal.cancel_actor() => {portal.channel.aid}\n\n'
f'result: {result}\n' f'result: {result}\n'
) )
@ -308,7 +308,7 @@ async def hard_kill(
# ) # )
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
# async with debug.acquire_debug_lock( # async with debug.acquire_debug_lock(
# subactor_uid=current_actor().uid, # subactor_uid=current_actor().aid.uid,
# ) as _ctx: # ) as _ctx:
# log.warning( # log.warning(
# 'Acquired debug lock, child ready to be killed ??\n' # 'Acquired debug lock, child ready to be killed ??\n'
@ -483,7 +483,7 @@ async def trio_proc(
# TODO, how to pass this over "wire" encodings like # TODO, how to pass this over "wire" encodings like
# cmdline args? # cmdline args?
# -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ? # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
str(subactor.uid), str(subactor.aid.uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
str(parent_addr) str(parent_addr)
@ -514,7 +514,7 @@ async def trio_proc(
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await ipc_server.wait_for_peer( event, chan = await ipc_server.wait_for_peer(
subactor.uid subactor.aid.uid
) )
except trio.Cancelled: except trio.Cancelled:
@ -528,7 +528,9 @@ async def trio_proc(
await debug.maybe_wait_for_debugger() await debug.maybe_wait_for_debugger()
elif proc is not None: elif proc is not None:
async with debug.acquire_debug_lock(subactor.uid): async with debug.acquire_debug_lock(
subactor_uid=subactor.aid.uid
):
# soft wait on the proc to terminate # soft wait on the proc to terminate
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
@ -538,7 +540,7 @@ async def trio_proc(
assert proc assert proc
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = ( actor_nursery._children[subactor.aid.uid] = (
subactor, subactor,
proc, proc,
portal, portal,
@ -563,7 +565,7 @@ async def trio_proc(
# track subactor in current nursery # track subactor in current nursery
curr_actor: Actor = current_actor() curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery
# resume caller at next checkpoint now that child is up # resume caller at next checkpoint now that child is up
task_status.started(portal) task_status.started(portal)
@ -616,7 +618,9 @@ async def trio_proc(
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if cancelled_during_spawn: if cancelled_during_spawn:
# Try again to avoid TTY clobbering. # Try again to avoid TTY clobbering.
async with debug.acquire_debug_lock(subactor.uid): async with debug.acquire_debug_lock(
subactor_uid=subactor.aid.uid
):
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
@ -662,7 +666,7 @@ async def trio_proc(
if not cancelled_during_spawn: if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this # pop child entry to indicate we no longer managing this
# subactor # subactor
actor_nursery._children.pop(subactor.uid) actor_nursery._children.pop(subactor.aid.uid)
async def mp_proc( async def mp_proc(
@ -744,7 +748,7 @@ async def mp_proc(
# register the process before start in case we get a cancel # register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait # request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request # for it to fully come up before sending a cancel request
actor_nursery._children[subactor.uid] = (subactor, proc, None) actor_nursery._children[subactor.aid.uid] = (subactor, proc, None)
proc.start() proc.start()
if not proc.is_alive(): if not proc.is_alive():
@ -758,7 +762,7 @@ async def mp_proc(
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await ipc_server.wait_for_peer( event, chan = await ipc_server.wait_for_peer(
subactor.uid, subactor.aid.uid,
) )
# XXX: monkey patch poll API to match the ``subprocess`` API.. # XXX: monkey patch poll API to match the ``subprocess`` API..
@ -771,7 +775,7 @@ async def mp_proc(
# any process we may have started. # any process we may have started.
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = (subactor, proc, portal) actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal)
# unblock parent task # unblock parent task
task_status.started(portal) task_status.started(portal)
@ -810,7 +814,7 @@ async def mp_proc(
# tandem if not done already # tandem if not done already
log.warning( log.warning(
"Cancelling existing result waiter task for " "Cancelling existing result waiter task for "
f"{subactor.uid}") f"{subactor.aid.uid}")
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
finally: finally:
@ -828,7 +832,7 @@ async def mp_proc(
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor # pop child entry to indicate we are no longer managing subactor
actor_nursery._children.pop(subactor.uid) actor_nursery._children.pop(subactor.aid.uid)
# TODO: prolly report to ``mypy`` how this causes all sorts of # TODO: prolly report to ``mypy`` how this causes all sorts of
# false errors.. # false errors..

View File

@ -104,7 +104,7 @@ def current_actor(
msg += ( msg += (
f'Apparently the lact active actor was\n' f'Apparently the lact active actor was\n'
f'|_{last}\n' f'|_{last}\n'
f'|_{last.uid}\n' f'|_{last.aid.uid}\n'
) )
# no actor runtime has (as of yet) ever been started for # no actor runtime has (as of yet) ever been started for
# this process. # this process.

View File

@ -391,15 +391,17 @@ class ActorNursery:
else: else:
if portal is None: # actor hasn't fully spawned yet if portal is None: # actor hasn't fully spawned yet
event: trio.Event = server._peer_connected[subactor.uid] event: trio.Event = server._peer_connected[
subactor.aid.uid
]
log.warning( log.warning(
f"{subactor.uid} never 't finished spawning?" f"{subactor.aid.uid} never 't finished spawning?"
) )
await event.wait() await event.wait()
# channel/portal should now be up # channel/portal should now be up
_, _, portal = children[subactor.uid] _, _, portal = children[subactor.aid.uid]
# XXX should be impossible to get here # XXX should be impossible to get here
# unless method was called from within # unless method was called from within
@ -407,7 +409,7 @@ class ActorNursery:
if portal is None: if portal is None:
# cancelled while waiting on the event # cancelled while waiting on the event
# to arrive # to arrive
chan = server._peers[subactor.uid][-1] chan = server._peers[subactor.aid.uid][-1]
if chan: if chan:
portal = Portal(chan) portal = Portal(chan)
else: # there's no other choice left else: # there's no other choice left
@ -506,7 +508,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
except BaseException as _inner_err: except BaseException as _inner_err:
inner_err = _inner_err inner_err = _inner_err
errors[actor.uid] = inner_err errors[actor.aid.uid] = inner_err
# If we error in the root but the debugger is # If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and # engaged we don't want to prematurely kill (and
@ -539,7 +541,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
log.cancel( log.cancel(
f'Actor-nursery cancelled by {etype}\n\n' f'Actor-nursery cancelled by {etype}\n\n'
f'{current_actor().uid}\n' f'{current_actor().aid.uid}\n'
f' |_{an}\n\n' f' |_{an}\n\n'
# TODO: show tb str? # TODO: show tb str?
@ -630,7 +632,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# show frame on any (likely) internal error # show frame on any (likely) internal error
if ( if (
not an.cancelled not an.cancel_called
and an._scope_error and an._scope_error
): ):
__tracebackhide__: bool = False __tracebackhide__: bool = False
@ -726,7 +728,7 @@ async def open_nursery(
if ( if (
an an
and and
not an.cancelled not an.cancel_called
and and
an._scope_error an._scope_error
): ):

View File

@ -61,7 +61,11 @@ def get_rando_addr(
# NOTE, file-name uniqueness (no-collisions) will be based on # NOTE, file-name uniqueness (no-collisions) will be based on
# the runtime-directory and root (pytest-proc's) pid. # the runtime-directory and root (pytest-proc's) pid.
case 'uds': case 'uds':
testrun_reg_addr = addr_type.get_random().unwrap() from tractor.ipc._uds import UDSAddress
addr: UDSAddress = addr_type.get_random()
assert addr.is_valid
assert addr.sockpath.resolve()
testrun_reg_addr = addr.unwrap()
# XXX, as sanity it should never the same as the default for the # XXX, as sanity it should never the same as the default for the
# host-singleton registry actor. # host-singleton registry actor.

View File

@ -74,6 +74,7 @@ def tractor_test(fn):
reg_addr=None, reg_addr=None,
start_method: str|None = None, start_method: str|None = None,
debug_mode: bool = False, debug_mode: bool = False,
tpt_proto: str|None=None,
**kwargs **kwargs
): ):
# __tracebackhide__ = True # __tracebackhide__ = True
@ -102,6 +103,9 @@ def tractor_test(fn):
# set of subprocess spawning backends # set of subprocess spawning backends
kwargs['debug_mode'] = debug_mode kwargs['debug_mode'] = debug_mode
if 'tpt_proto' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['tpt_proto'] = tpt_proto
if kwargs: if kwargs:
@ -177,6 +181,13 @@ def pytest_configure(config):
backend = config.option.spawn_backend backend = config.option.spawn_backend
tractor._spawn.try_set_start_method(backend) tractor._spawn.try_set_start_method(backend)
# register custom marks to avoid warnings see,
# https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers
config.addinivalue_line(
'markers',
'no_tpt(proto_key): test will (likely) not behave with tpt backend'
)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def debug_mode(request) -> bool: def debug_mode(request) -> bool:
@ -225,13 +236,32 @@ def tpt_protos(request) -> list[str]:
autouse=True, autouse=True,
) )
def tpt_proto( def tpt_proto(
request,
tpt_protos: list[str], tpt_protos: list[str],
) -> str: ) -> str:
proto_key: str = tpt_protos[0] proto_key: str = tpt_protos[0]
# ?TODO, but needs a fn-scoped tpt_proto fixture..
# @pytest.mark.no_tpt('uds')
# node = request.node
# markers = node.own_markers
# for mark in markers:
# if (
# mark.name == 'no_tpt'
# and
# proto_key in mark.args
# ):
# pytest.skip(
# f'Test {node} normally fails with '
# f'tpt-proto={proto_key!r}\n'
# )
from tractor import _state from tractor import _state
if _state._def_tpt_proto != proto_key: if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key _state._def_tpt_proto = proto_key
_state._runtime_vars['_enable_tpts'] = [
proto_key,
]
yield proto_key yield proto_key

View File

@ -94,7 +94,7 @@ class Channel:
self._transport: MsgTransport|None = transport self._transport: MsgTransport|None = transport
# set after handshake - always info from peer end # set after handshake - always info from peer end
self.aid: Aid|None = None self._aid: Aid|None = None
self._aiter_msgs = self._iter_msgs() self._aiter_msgs = self._iter_msgs()
self._exc: Exception|None = None self._exc: Exception|None = None
@ -122,6 +122,14 @@ class Channel:
''' '''
return self._cancel_called return self._cancel_called
@property
def aid(self) -> Aid:
'''
Peer actor's ID.
'''
return self._aid
@property @property
def uid(self) -> tuple[str, str]: def uid(self) -> tuple[str, str]:
''' '''
@ -505,7 +513,7 @@ class Channel:
f'<= {peer_aid.reprol(sin_uuid=False)}\n' f'<= {peer_aid.reprol(sin_uuid=False)}\n'
) )
# NOTE, we always are referencing the remote peer! # NOTE, we always are referencing the remote peer!
self.aid = peer_aid self._aid = peer_aid
return peer_aid return peer_aid

View File

@ -197,9 +197,11 @@ class UDSAddress(
# sockname: str = '.'.join(actor.uid) + f'@{pid}' # sockname: str = '.'.join(actor.uid) + f'@{pid}'
# -[ ] CURRENTLY using `.` BREAKS TEST SUITE tho.. # -[ ] CURRENTLY using `.` BREAKS TEST SUITE tho..
else: else:
prefix: str = '<unknown-actor>'
if is_root_process(): if is_root_process():
prefix: str = 'root' prefix: str = 'no_runtime_root'
else:
prefix: str = 'no_runtime_actor'
sockname: str = f'{prefix}@{pid}' sockname: str = f'{prefix}@{pid}'
sockpath: Path = Path(f'{sockname}.sock') sockpath: Path = Path(f'{sockname}.sock')

View File

@ -293,7 +293,7 @@ _conc_name_getters = {
'task': pformat_task_uid, 'task': pformat_task_uid,
'actor': lambda: _curr_actor_no_exc(), 'actor': lambda: _curr_actor_no_exc(),
'actor_name': lambda: current_actor().name, 'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6], 'actor_uid': lambda: current_actor().aid.uuid[:6],
} }

View File

@ -324,6 +324,8 @@ class Start(
# => SEE ABOVE <= # => SEE ABOVE <=
kwargs: dict[str, Any] kwargs: dict[str, Any]
uid: tuple[str, str] # (calling) actor-id uid: tuple[str, str] # (calling) actor-id
# aid: Aid
# ^TODO, convert stat!
# TODO: enforcing a msg-spec in terms `Msg.pld` # TODO: enforcing a msg-spec in terms `Msg.pld`
# parameterizable msgs to be used in the appls IPC dialog. # parameterizable msgs to be used in the appls IPC dialog.