diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 25d07448..eab7cd7f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,7 +75,7 @@ jobs: testing: - name: '${{ matrix.os }} Python${{ matrix.python-version }} - spawn_backend=${{ matrix.spawn_backend }}' + name: '${{ matrix.os }} Python${{ matrix.python-version }} spawn_backend=${{ matrix.spawn_backend }} tpt_proto=${{ matrix.tpt_proto }}' timeout-minutes: 16 runs-on: ${{ matrix.os }} @@ -94,7 +94,22 @@ jobs: 'trio', # 'mp_spawn', # 'mp_forkserver', + # ?TODO^ is it worth it to get these running again? + # + # - [ ] next-gen backends, on 3.13+ + # https://github.com/goodboy/tractor/issues/379 + # 'subinterpreter', + # 'subint', ] + tpt_proto: [ + 'tcp', + 'uds', + ] + # https://github.com/orgs/community/discussions/26253#discussioncomment-3250989 + exclude: + # don't do UDS run on macOS (for now) + - os: macos-latest + tpt_proto: 'uds' steps: - uses: actions/checkout@v4 @@ -123,7 +138,11 @@ jobs: run: uv tree - name: Run tests - run: uv run pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx + run: > + uv run + pytest tests/ -rsx + --spawn-backend=${{ matrix.spawn_backend }} + --tpt-proto=${{ matrix.tpt_proto }} # XXX legacy NOTE XXX # diff --git a/tests/conftest.py b/tests/conftest.py index 31787fe2..9dacb36a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,7 @@ pytest_plugins: list[str] = [ 'tractor._testing.pytest', ] +_ci_env: bool = os.environ.get('CI', False) _non_linux: bool = platform.system() != 'Linux' # Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives @@ -36,9 +37,8 @@ else: _INT_SIGNAL = signal.SIGINT _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value _PROC_SPAWN_WAIT = ( - 0.6 - if sys.version_info < (3, 7) - else 0.4 + 2 if _ci_env + else 1 ) @@ -115,9 +115,6 @@ def test_log( yield _log -_ci_env: bool = os.environ.get('CI', False) - - @pytest.fixture(scope='session') def ci_env() -> bool: ''' @@ -152,6 +149,7 @@ def daemon( reg_addr: tuple[str, int], tpt_proto: str, ci_env: bool, + test_log: tractor.log.StackLevelAdapter, ) -> subprocess.Popen: ''' @@ -167,10 +165,12 @@ def daemon( "import tractor; " "tractor.run_daemon([], " "registry_addrs={reg_addrs}, " + "enable_transports={enable_tpts}, " "debug_mode={debug_mode}, " "loglevel={ll})" ).format( reg_addrs=str([reg_addr]), + enable_tpts=str([tpt_proto]), ll="'{}'".format(loglevel) if loglevel else None, debug_mode=debug_mode, ) @@ -201,7 +201,7 @@ def daemon( # so it's often required that we delay a bit more starting # the first actor-tree.. if tpt_proto == 'uds': - _PROC_SPAWN_WAIT = 0.6 + _PROC_SPAWN_WAIT += 1.6 if _non_linux and ci_env: _PROC_SPAWN_WAIT += 1 @@ -240,7 +240,7 @@ def daemon( if rc < 0: raise RuntimeError(msg) - log.error(msg) + test_log.error(msg) # @pytest.fixture(autouse=True) diff --git a/tests/devx/conftest.py b/tests/devx/conftest.py index fbb4ff6b..eb56d74c 100644 --- a/tests/devx/conftest.py +++ b/tests/devx/conftest.py @@ -89,6 +89,7 @@ def spawn( def _spawn( cmd: str, + expect_timeout: float = 4, **mkcmd_kwargs, ) -> pty_spawn.spawn: nonlocal spawned @@ -98,14 +99,17 @@ def spawn( cmd, **mkcmd_kwargs, ), - expect_timeout=( - 10 if _non_linux and _ci_env - else 3 - ), + expect_timeout=(timeout:=( + expect_timeout + 6 + if _non_linux and _ci_env + else expect_timeout + )), # preexec_fn=unset_colors, # ^TODO? get `pytest` core to expose underlying # `pexpect.spawn()` stuff? ) + # sanity + assert spawned.timeout == timeout return spawned # such that test-dep can pass input script name. diff --git a/tests/ipc/test_multi_tpt.py b/tests/ipc/test_multi_tpt.py index 353385e1..289f5135 100644 --- a/tests/ipc/test_multi_tpt.py +++ b/tests/ipc/test_multi_tpt.py @@ -62,6 +62,13 @@ def test_root_passes_tpt_to_sub( reg_addr: tuple, debug_mode: bool, ): + # XXX NOTE, the `reg_addr` addr won't be the same type as the + # `tpt_proto_key` would deliver here unless you pass `--tpt-proto + # ` on the CLI. + # + # if tpt_proto_key == 'uds': + # breakpoint() + async def main(): async with tractor.open_nursery( enable_transports=[tpt_proto_key], diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 2103f627..009e25b0 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -146,9 +146,6 @@ def test_ipc_channel_break_during_stream( # a user sending ctl-c by raising a KBI. if pre_aclose_msgstream: expect_final_exc = KeyboardInterrupt - if tpt_proto == 'uds': - expect_final_exc = TransportClosed - expect_final_cause = trio.BrokenResourceError # XXX OLD XXX # if child calls `MsgStream.aclose()` then expect EoC. @@ -169,10 +166,6 @@ def test_ipc_channel_break_during_stream( if pre_aclose_msgstream: expect_final_exc = KeyboardInterrupt - if tpt_proto == 'uds': - expect_final_exc = TransportClosed - expect_final_cause = trio.BrokenResourceError - # NOTE when the parent IPC side dies (even if the child does as well # but the child fails BEFORE the parent) we always expect the # IPC layer to raise a closed-resource, NEVER do we expect diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 7d14df12..d3555d37 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -649,7 +649,11 @@ def test_cancel_while_childs_child_in_sync_sleep( # # delay = 1 # no AssertionError in eg, TooSlowError raised. # delay = 2 # is AssertionError in eg AND no TooSlowError !? - delay = 4 # is AssertionError in eg AND no _cs cancellation. + # is AssertionError in eg AND no _cs cancellation. + delay = ( + 6 if _non_linux + else 4 + ) with trio.fail_after(delay) as _cs: # with trio.CancelScope() as cs: diff --git a/tests/test_clustering.py b/tests/test_clustering.py index 603b2eb4..308a92a2 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -11,7 +11,6 @@ MESSAGE = 'tractoring at full speed' def test_empty_mngrs_input_raises() -> None: - async def main(): with trio.fail_after(3): async with ( @@ -56,25 +55,39 @@ async def worker( print(msg) assert msg == MESSAGE - # TODO: does this ever cause a hang + # ?TODO, does this ever cause a hang? # assert 0 +# ?TODO, but needs a fn-scoped tpt_proto fixture.. +# @pytest.mark.no_tpt('uds') @tractor_test -async def test_streaming_to_actor_cluster() -> None: +async def test_streaming_to_actor_cluster( + tpt_proto: str, +): + ''' + Open an actor "cluster" using the (experimental) `._clustering` + API and conduct standard inter-task-ctx streaming. - async with ( - open_actor_cluster(modules=[__name__]) as portals, + ''' + if tpt_proto == 'uds': + pytest.skip( + f'Test currently fails with tpt-proto={tpt_proto!r}\n' + ) - gather_contexts( - mngrs=[p.open_context(worker) for p in portals.values()], - ) as contexts, + with trio.fail_after(6): + async with ( + open_actor_cluster(modules=[__name__]) as portals, - gather_contexts( - mngrs=[ctx[0].open_stream() for ctx in contexts], - ) as streams, + gather_contexts( + mngrs=[p.open_context(worker) for p in portals.values()], + ) as contexts, - ): - with trio.move_on_after(1): - for stream in itertools.cycle(streams): - await stream.send(MESSAGE) + gather_contexts( + mngrs=[ctx[0].open_stream() for ctx in contexts], + ) as streams, + + ): + with trio.move_on_after(1): + for stream in itertools.cycle(streams): + await stream.send(MESSAGE) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 3e5964ec..b76e3706 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -7,6 +7,7 @@ import signal import platform from functools import partial import itertools +import time from typing import Callable import psutil @@ -31,7 +32,7 @@ async def test_reg_then_unreg( ) as n: portal = await n.start_actor('actor', enable_modules=[__name__]) - uid = portal.channel.uid + uid = portal.channel.aid.uid async with tractor.get_registry(reg_addr) as aportal: # this local actor should be the arbiter @@ -205,7 +206,7 @@ async def spawn_and_check_registry( # ensure current actor is registered registry: dict = await get_reg() - assert actor.uid in registry + assert actor.aid.uid in registry try: async with tractor.open_nursery() as an: @@ -253,8 +254,21 @@ async def spawn_and_check_registry( # all subactors should have de-registered registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + start: float = time.time() + while ( + not (len(registry) == extra) + and + (time.time() - start) < 5 + ): + print( + f'Waiting for remaining subs to dereg..\n' + f'{registry!r}\n' + ) + await trio.sleep(0.3) + else: + assert len(registry) == extra + + assert actor.aid.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) @@ -384,8 +398,8 @@ async def close_chans_before_nursery( # all subactors should have de-registered registry = await get_reg() - assert portal1.channel.uid not in registry - assert portal2.channel.uid not in registry + assert portal1.channel.aid.uid not in registry + assert portal2.channel.aid.uid not in registry assert len(registry) == entries_at_end diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 4df705b1..42e30345 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -581,7 +581,7 @@ def test_peer_canceller( assert ( re.canceller == - root.uid + root.aid.uid ) else: # the other 2 ctxs @@ -590,7 +590,7 @@ def test_peer_canceller( and ( re.canceller == - canceller.channel.uid + canceller.channel.aid.uid ) ) @@ -745,7 +745,7 @@ def test_peer_canceller( # -> each context should have received # a silently absorbed context cancellation # in its remote nursery scope. - # assert ctx.chan.uid == ctx.canceller + # assert ctx.chan.aid.uid == ctx.canceller # NOTE: when an inter-peer cancellation # occurred, we DO NOT expect this @@ -802,7 +802,7 @@ async def basic_echo_server( ''' actor: Actor = tractor.current_actor() - uid: tuple = actor.uid + uid: tuple = actor.aid.uid await ctx.started(uid) async with ctx.open_stream() as ipc: async for msg in ipc: @@ -857,7 +857,7 @@ async def serve_subactors( f'|_{peer}\n' ) await ipc.send(( - peer.chan.uid, + peer.chan.aid.uid, peer.chan.raddr.unwrap(), )) @@ -992,7 +992,7 @@ async def tell_little_bro( sub_ctx.open_stream() as echo_ipc, ): actor: Actor = current_actor() - uid: tuple = actor.uid + uid: tuple = actor.aid.uid for i in range(rng_seed): msg: tuple = ( uid, @@ -1097,7 +1097,7 @@ def test_peer_spawns_and_cancels_service_subactor( ) as (client_ctx, client_says), ): root: Actor = current_actor() - spawner_uid: tuple = spawn_ctx.chan.uid + spawner_uid: tuple = spawn_ctx.chan.aid.uid print( f'Server says: {first}\n' f'Client says: {client_says}\n' @@ -1116,7 +1116,7 @@ def test_peer_spawns_and_cancels_service_subactor( print( 'Sub-spawn came online\n' f'portal: {sub}\n' - f'.uid: {sub.actor.uid}\n' + f'.uid: {sub.actor.aid.uid}\n' f'chan.raddr: {sub.chan.raddr}\n' ) @@ -1150,7 +1150,7 @@ def test_peer_spawns_and_cancels_service_subactor( assert isinstance(res, ContextCancelled) assert client_ctx.cancel_acked - assert res.canceller == root.uid + assert res.canceller == root.aid.uid assert not raise_sub_spawn_error_after # cancelling the spawner sub should @@ -1184,8 +1184,8 @@ def test_peer_spawns_and_cancels_service_subactor( # little_bro: a `RuntimeError`. # check_inner_rte(rae) - assert rae.relay_uid == client.chan.uid - assert rae.src_uid == sub.chan.uid + assert rae.relay_uid == client.chan.aid.uid + assert rae.src_uid == sub.chan.aid.uid assert not client_ctx.cancel_acked assert ( @@ -1214,12 +1214,12 @@ def test_peer_spawns_and_cancels_service_subactor( except ContextCancelled as ctxc: _ctxc = ctxc print( - f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n' + f'{root.aid.uid} caught ctxc from ctx with {client_ctx.chan.aid.uid}\n' f'{repr(ctxc)}\n' ) if not raise_sub_spawn_error_after: - assert ctxc.canceller == root.uid + assert ctxc.canceller == root.aid.uid else: assert ctxc.canceller == spawner_uid diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index e7821661..76c95e9f 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -190,7 +190,10 @@ async def aggregate(seed: int): # leverage trio's built-in backpressure await send_chan.send(value) - print(f"FINISHED ITERATING {portal.channel.uid}") + print( + f'FINISHED ITERATING!\n' + f'peer: {portal.channel.aid.uid}' + ) # spawn 2 trio tasks to collect streams and push to a local queue async with trio.open_nursery() as tn: diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 55553dd9..782af81e 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -22,6 +22,10 @@ def unlink_file(): async def crash_and_clean_tmpdir( tmp_file_path: str, error: bool = True, + rent_cancel: bool = True, + + # XXX unused, but do we really need to test these cases? + self_cancel: bool = False, ): global _file_path _file_path = tmp_file_path @@ -32,43 +36,75 @@ async def crash_and_clean_tmpdir( assert os.path.isfile(tmp_file_path) await trio.sleep(0.1) if error: + print('erroring in subactor!') assert 0 - else: + + elif self_cancel: + print('SELF-cancelling subactor!') actor.cancel_soon() + elif rent_cancel: + await trio.sleep_forever() + + print('subactor exiting task!') + @pytest.mark.parametrize( 'error_in_child', [True, False], + ids='error_in_child={}'.format, ) @tractor_test async def test_lifetime_stack_wipes_tmpfile( tmp_path, error_in_child: bool, + loglevel: str, + # log: tractor.log.StackLevelAdapter, + # ^TODO, once landed via macos support! ): child_tmp_file = tmp_path / "child.txt" child_tmp_file.touch() assert child_tmp_file.exists() path = str(child_tmp_file) + # NOTE, this is expected to cancel the sub + # in the `error_in_child=False` case! + timeout: float = ( + 1.6 if error_in_child + else 1 + ) try: - with trio.move_on_after(0.5): - async with tractor.open_nursery() as n: - await ( # inlined portal - await n.run_in_actor( - crash_and_clean_tmpdir, - tmp_file_path=path, - error=error_in_child, - ) - ).result() - + with trio.move_on_after(timeout) as cs: + async with tractor.open_nursery( + loglevel=loglevel, + ) as an: + await ( # inlined `tractor.Portal` + await an.run_in_actor( + crash_and_clean_tmpdir, + tmp_file_path=path, + error=error_in_child, + ) + ).result() except ( tractor.RemoteActorError, - # tractor.BaseExceptionGroup, BaseExceptionGroup, - ): - pass + ) as _exc: + exc = _exc + from tractor.log import get_console_log + log = get_console_log( + level=loglevel, + name=__name__, + ) + log.exception( + f'Subactor failed as expected with {type(exc)!r}\n' + ) # tmp file should have been wiped by # teardown stack. assert not child_tmp_file.exists() + + if error_in_child: + assert not cs.cancel_called + else: + # expect timeout in some cases? + assert cs.cancel_called diff --git a/tractor/_clustering.py b/tractor/_clustering.py index dbb50304..4f639239 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -66,7 +66,7 @@ async def open_actor_cluster( trio.open_nursery() as tn, tractor.trionics.maybe_raise_from_masking_exc() ): - uid = tractor.current_actor().uid + uid = tractor.current_actor().aid.uid async def _start(name: str) -> None: name = f'{uid[0]}.{name}' diff --git a/tractor/_context.py b/tractor/_context.py index 5a69077e..fa90759b 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -463,10 +463,11 @@ class Context: # self._cancel_called = val + # TODO, use the `Actor.aid: Aid` instead! @property def canceller(self) -> tuple[str, str]|None: ''' - `Actor.uid: tuple[str, str]` of the (remote) + `Actor.aid.uid: tuple[str, str]` of the (remote) actor-process who's task was cancelled thus causing this (side of the) context to also be cancelled. @@ -499,12 +500,12 @@ class Context: if from_uid := re.src_uid: from_uid: tuple = tuple(from_uid) - our_uid: tuple = self._actor.uid + our_uid: tuple = self._actor.aid.uid our_canceller = self.canceller return bool( isinstance((ctxc := re), ContextCancelled) - and from_uid == self.chan.uid + and from_uid == self.chan.aid.uid and ctxc.canceller == our_uid and our_canceller == our_uid ) @@ -515,7 +516,7 @@ class Context: Records whether the task on the remote side of this IPC context acknowledged a cancel request via a relayed `ContextCancelled` with the `.canceller` attr set to the - `Actor.uid` of the local actor who's task entered + `Actor.aid.uid` of the local actor who's task entered `Portal.open_context()`. This will only be `True` when `.cancel()` is called and @@ -789,8 +790,8 @@ class Context: # appropriately. log.runtime( 'Setting remote error for ctx\n\n' - f'<= {self.peer_side!r}: {self.chan.uid}\n' - f'=> {self.side!r}: {self._actor.uid}\n\n' + f'<= {self.peer_side!r}: {self.chan.aid.reprol()}\n' + f'=> {self.side!r}: {self._actor.aid.reprol()}\n\n' f'{error!r}' ) self._remote_error: BaseException = error @@ -811,7 +812,7 @@ class Context: # cancelled. # # !TODO, switching to `Actor.aid` here! - if (canc := error.canceller) == self._actor.uid: + if (canc := error.canceller) == self._actor.aid.uid: whom: str = 'us' self._canceller = canc else: @@ -1036,7 +1037,7 @@ class Context: --------- - after the far end cancels, the `.cancel()` calling side should receive a `ContextCancelled` with the - `.canceller: tuple` uid set to the current `Actor.uid`. + `.canceller: tuple` uid set to the current `Actor.aid.uid`. - timeout (quickly) on failure to rx this ACK error-msg in an attempt to sidestep 2-generals when the transport @@ -1065,9 +1066,9 @@ class Context: ) reminfo: str = ( # ' =>\n' - # f'Context.cancel() => {self.chan.uid}\n' + # f'Context.cancel() => {self.chan.aid.uid}\n' f'\n' - f'c)=> {self.chan.uid}\n' + f'c)=> {self.chan.aid.reprol()}\n' f' |_[{self.dst_maddr}\n' f' >> {self.repr_rpc}\n' # f' >> {self._nsf}() -> {codec}[dict]:\n\n' @@ -1211,7 +1212,7 @@ class Context: ''' __tracebackhide__: bool = hide_tb - peer_uid: tuple = self.chan.uid + peer_uid: tuple = self.chan.aid.uid # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption # for "graceful cancellation" case(s): @@ -1228,7 +1229,7 @@ class Context: # (`ContextCancelled`) as an expected # error-msg-is-cancellation-ack IFF said # `remote_error: ContextCancelled` has `.canceller` - # set to the `Actor.uid` of THIS task (i.e. the + # set to the `Actor.aid.uid` of THIS task (i.e. the # cancellation requesting task's actor is the actor # checking whether it should absorb the ctxc). self_ctxc: bool = self._is_self_cancelled(remote_error) @@ -1679,7 +1680,7 @@ class Context: elif self._started_called: raise RuntimeError( - f'called `.started()` twice on context with {self.chan.uid}' + f'called `.started()` twice on context with {self.chan.aid.uid}' ) started_msg = Started( @@ -1812,7 +1813,7 @@ class Context: ''' cid: str = self.cid chan: Channel = self.chan - from_uid: tuple[str, str] = chan.uid + from_uid: tuple[str, str] = chan.aid.uid send_chan: trio.MemorySendChannel = self._send_chan nsf: NamespacePath = self._nsf @@ -1953,20 +1954,22 @@ class Context: # overrun state and that msg isn't stuck in an # overflow queue what happens?!? - local_uid = self._actor.uid + local_aid = self._actor.aid txt: str = ( 'on IPC context:\n' f'<= sender: {from_uid}\n' f' |_ {self._nsf}()\n\n' - f'=> overrun: {local_uid}\n' + f'=> overrun: {local_aid.reprol()!r}\n' f' |_cid: {cid}\n' f' |_task: {self._task}\n' ) if not self._stream_opened: txt += ( - f'\n*** No stream open on `{local_uid[0]}` side! ***\n\n' + f'\n' + f'*** No stream open on `{local_aid.name}` side! ***\n' + f'\n' f'{msg}\n' ) @@ -2115,7 +2118,11 @@ async def open_context_from_portal( # XXX NOTE XXX: currenly we do NOT allow opening a contex # with "self" since the local feeder mem-chan processing # is not built for it. - if (uid := portal.channel.uid) == portal.actor.uid: + if ( + (uid := portal.channel.aid.uid) + == + portal.actor.aid.uid + ): raise RuntimeError( '** !! Invalid Operation !! **\n' 'Can not open an IPC ctx with the local actor!\n' @@ -2329,7 +2336,7 @@ async def open_context_from_portal( and ctxc is ctx._remote_error and - ctxc.canceller == portal.actor.uid + ctxc.canceller == portal.actor.aid.uid ): log.cancel( f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' @@ -2406,7 +2413,7 @@ async def open_context_from_portal( logmeth(msg) if debug_mode(): - # async with debug.acquire_debug_lock(portal.actor.uid): + # async with debug.acquire_debug_lock(portal.actor.aid.uid): # pass # TODO: factor ^ into below for non-root cases? # diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 418accc3..15b785fa 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -982,6 +982,7 @@ class TransportClosed(Exception): ''' __tracebackhide__: bool = hide_tb message: str = message or self.message + # when a cause is set, slap it onto the log emission. if cause := self.src_exc: cause_tb_str: str = ''.join( @@ -989,7 +990,7 @@ class TransportClosed(Exception): ) message += ( f'{cause_tb_str}\n' # tb - f' {cause}\n' # exc repr + f'{cause!r}\n' # exc repr ) getattr( diff --git a/tractor/_rpc.py b/tractor/_rpc.py index ac658cb2..9bd1c475 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -252,8 +252,8 @@ async def _invoke_non_context( ): log.warning( 'Failed to send RPC result?\n' - f'|_{func}@{actor.uid}() -> {ret_msg}\n\n' - f'x=> peer: {chan.uid}\n' + f'|_{func}@{actor.aid.reprol()}() -> {ret_msg}\n\n' + f'x=> peer: {chan.aid.reprol()}\n' ) @acm @@ -698,7 +698,7 @@ async def _invoke( # which cancels the scope presuming the input error # is not a `.cancel_acked` pleaser. if rpc_ctx_cs.cancelled_caught: - our_uid: tuple = actor.uid + our_uid: tuple = actor.aid.uid # first check for and raise any remote error # before raising any context cancelled case @@ -730,7 +730,7 @@ async def _invoke( # TODO: determine if the ctx peer task was the # exact task which cancelled, vs. some other # task in the same actor. - elif canceller == ctx.chan.uid: + elif canceller == ctx.chan.aid.uid: explain += f'its {ctx.peer_side!r}-side peer' elif canceller == our_uid: @@ -825,7 +825,7 @@ async def _invoke( # associated child isn't in debug any more await debug.maybe_wait_for_debugger() ctx: Context = actor._contexts.pop(( - chan.uid, + chan.aid.uid, cid, )) @@ -927,7 +927,7 @@ async def try_ship_error_to_remote( log.critical( 'IPC transport failure -> ' f'failed to ship error to {remote_descr}!\n\n' - f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.uid}\n' + f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.aid.uid}\n' f'\n' # TODO: use `.msg.preetty_struct` for this! f'{msg}\n' @@ -1005,7 +1005,7 @@ async def process_messages( async for msg in chan: log.transport( # type: ignore f'IPC msg from peer\n' - f'<= {chan.uid}\n\n' + f'<= {chan.aid.reprol()}\n\n' # TODO: use of the pprinting of structs is # FRAGILE and should prolly not be @@ -1109,7 +1109,7 @@ async def process_messages( except BaseException: log.exception( 'Failed to cancel task?\n' - f'<= canceller: {chan.uid}\n' + f'<= canceller: {chan.aid.reprol()}\n' f' |_{chan}\n\n' f'=> {actor}\n' f' |_cid: {target_cid}\n' @@ -1264,7 +1264,7 @@ async def process_messages( log.transport( 'Waiting on next IPC msg from\n' - f'peer: {chan.uid}\n' + f'peer: {chan.aid.reprol()}\n' f'|_{chan}\n' ) @@ -1313,12 +1313,10 @@ async def process_messages( f'peer IPC channel closed abruptly?\n' f'\n' f'<=x[\n' - f' {chan}\n' - f' |_{chan.raddr}\n\n' + f'{chan}\n' ) + tc.message - ) # transport **WAS** disconnected @@ -1341,8 +1339,8 @@ async def process_messages( match err: case ContextCancelled(): log.cancel( - f'Actor: {actor.uid} was context-cancelled with,\n' - f'str(err)' + f'Actor: {actor.aid.reprol()!r} is ctxc with,\n' + f'{str(err)}' ) case _: log.exception("Actor errored:") diff --git a/tractor/_runtime.py b/tractor/_runtime.py index f77c69c1..93fb68fd 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -691,7 +691,7 @@ class Actor: ''' # ?TODO, use Aid here as well? - actor_uid = chan.uid + actor_uid = chan.aid.uid assert actor_uid try: ctx = self._contexts[( @@ -701,7 +701,7 @@ class Actor: )] log.debug( f'Retreived cached IPC ctx for\n' - f'peer: {chan.uid}\n' + f'peer: {chan.aid.uid}\n' f'cid:{cid}\n' ) ctx._allow_overruns: bool = allow_overruns @@ -718,7 +718,7 @@ class Actor: except KeyError: log.debug( f'Allocate new IPC ctx for\n' - f'peer: {chan.uid}\n' + f'peer: {chan.aid.uid}\n' f'cid: {cid}\n' ) ctx = mk_context( @@ -764,7 +764,7 @@ class Actor: ''' cid: str = str(uuid.uuid4()) - assert chan.uid + assert chan.aid.uid ctx = self.get_context( chan=chan, cid=cid, @@ -791,12 +791,12 @@ class Actor: ns=ns, func=func, kwargs=kwargs, - uid=self.uid, + uid=self.aid.uid, # <- !TODO use .aid! cid=cid, ) log.runtime( 'Sending RPC `Start`\n\n' - f'=> peer: {chan.uid}\n' + f'=> peer: {chan.aid.uid}\n' f' |_ {ns}.{func}({kwargs})\n\n' f'{pretty_struct.pformat(msg)}' @@ -1244,7 +1244,7 @@ class Actor: 'Cancel request for invalid RPC task.\n' 'The task likely already completed or was never started!\n\n' f'<= canceller: {requesting_aid}\n' - f'=> {cid}@{parent_chan.uid}\n' + f'=> {cid}@{parent_chan.aid.uid}\n' f' |_{parent_chan}\n' ) return True @@ -1381,7 +1381,7 @@ class Actor: f'Cancelling {descr} RPC tasks\n\n' f'<=c) {req_aid} [canceller]\n' f'{rent_chan_repr}' - f'c)=> {self.uid} [cancellee]\n' + f'c)=> {self.aid.uid} [cancellee]\n' f' |_{self} [with {len(tasks)} tasks]\n' # f' |_tasks: {len(tasks)}\n' # f'{tasks_str}' @@ -1687,7 +1687,7 @@ async def async_main( await reg_portal.run_from_ns( 'self', 'register_actor', - uid=actor.uid, + uid=actor.aid.uid, addr=accept_addr.unwrap(), ) @@ -1758,9 +1758,11 @@ async def async_main( # always! match internal_err: case ContextCancelled(): + reprol: str = actor.aid.reprol() log.cancel( - f'Actor: {actor.uid} was task-context-cancelled with,\n' - f'str(internal_err)' + f'Actor {reprol!r} was task-ctx-cancelled with,\n' + f'\n' + f'{internal_err!r}' ) case _: log.exception( @@ -1832,7 +1834,7 @@ async def async_main( await reg_portal.run_from_ns( 'self', 'unregister_actor', - uid=actor.uid + uid=actor.aid.uid, ) except OSError: failed = True diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 8d3c2cf6..01026ad9 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -151,7 +151,7 @@ async def exhaust_portal( __tracebackhide__ = True try: log.debug( - f'Waiting on final result from {actor.uid}' + f'Waiting on final result from {actor.aid.uid}' ) # XXX: streams should never be reaped here since they should @@ -210,17 +210,17 @@ async def cancel_on_completion( actor, ) if isinstance(result, Exception): - errors[actor.uid]: Exception = result + errors[actor.aid.uid]: Exception = result log.cancel( 'Cancelling subactor runtime due to error:\n\n' - f'Portal.cancel_actor() => {portal.channel.uid}\n\n' + f'Portal.cancel_actor() => {portal.channel.aid}\n\n' f'error: {result}\n' ) else: log.runtime( 'Cancelling subactor gracefully:\n\n' - f'Portal.cancel_actor() => {portal.channel.uid}\n\n' + f'Portal.cancel_actor() => {portal.channel.aid}\n\n' f'result: {result}\n' ) @@ -308,7 +308,7 @@ async def hard_kill( # ) # with trio.CancelScope(shield=True): # async with debug.acquire_debug_lock( - # subactor_uid=current_actor().uid, + # subactor_uid=current_actor().aid.uid, # ) as _ctx: # log.warning( # 'Acquired debug lock, child ready to be killed ??\n' @@ -483,7 +483,7 @@ async def trio_proc( # TODO, how to pass this over "wire" encodings like # cmdline args? # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ? - str(subactor.uid), + str(subactor.aid.uid), # Address the child must connect to on startup "--parent_addr", str(parent_addr) @@ -514,7 +514,7 @@ async def trio_proc( # channel should have handshake completed by the # local actor by the time we get a ref to it event, chan = await ipc_server.wait_for_peer( - subactor.uid + subactor.aid.uid ) except trio.Cancelled: @@ -528,7 +528,9 @@ async def trio_proc( await debug.maybe_wait_for_debugger() elif proc is not None: - async with debug.acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock( + subactor_uid=subactor.aid.uid + ): # soft wait on the proc to terminate with trio.move_on_after(0.5): await proc.wait() @@ -538,7 +540,7 @@ async def trio_proc( assert proc portal = Portal(chan) - actor_nursery._children[subactor.uid] = ( + actor_nursery._children[subactor.aid.uid] = ( subactor, proc, portal, @@ -563,7 +565,7 @@ async def trio_proc( # track subactor in current nursery curr_actor: Actor = current_actor() - curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery # resume caller at next checkpoint now that child is up task_status.started(portal) @@ -616,7 +618,9 @@ async def trio_proc( # don't clobber an ongoing pdb if cancelled_during_spawn: # Try again to avoid TTY clobbering. - async with debug.acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock( + subactor_uid=subactor.aid.uid + ): with trio.move_on_after(0.5): await proc.wait() @@ -662,7 +666,7 @@ async def trio_proc( if not cancelled_during_spawn: # pop child entry to indicate we no longer managing this # subactor - actor_nursery._children.pop(subactor.uid) + actor_nursery._children.pop(subactor.aid.uid) async def mp_proc( @@ -744,7 +748,7 @@ async def mp_proc( # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait # for it to fully come up before sending a cancel request - actor_nursery._children[subactor.uid] = (subactor, proc, None) + actor_nursery._children[subactor.aid.uid] = (subactor, proc, None) proc.start() if not proc.is_alive(): @@ -758,7 +762,7 @@ async def mp_proc( # channel should have handshake completed by the # local actor by the time we get a ref to it event, chan = await ipc_server.wait_for_peer( - subactor.uid, + subactor.aid.uid, ) # XXX: monkey patch poll API to match the ``subprocess`` API.. @@ -771,7 +775,7 @@ async def mp_proc( # any process we may have started. portal = Portal(chan) - actor_nursery._children[subactor.uid] = (subactor, proc, portal) + actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal) # unblock parent task task_status.started(portal) @@ -810,7 +814,7 @@ async def mp_proc( # tandem if not done already log.warning( "Cancelling existing result waiter task for " - f"{subactor.uid}") + f"{subactor.aid.uid}") nursery.cancel_scope.cancel() finally: @@ -828,7 +832,7 @@ async def mp_proc( log.debug(f"Joined {proc}") # pop child entry to indicate we are no longer managing subactor - actor_nursery._children.pop(subactor.uid) + actor_nursery._children.pop(subactor.aid.uid) # TODO: prolly report to ``mypy`` how this causes all sorts of # false errors.. diff --git a/tractor/_state.py b/tractor/_state.py index 86e3ea12..7a4e1242 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -104,7 +104,7 @@ def current_actor( msg += ( f'Apparently the lact active actor was\n' f'|_{last}\n' - f'|_{last.uid}\n' + f'|_{last.aid.uid}\n' ) # no actor runtime has (as of yet) ever been started for # this process. diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 301c44e8..e1f8a62d 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -391,15 +391,17 @@ class ActorNursery: else: if portal is None: # actor hasn't fully spawned yet - event: trio.Event = server._peer_connected[subactor.uid] + event: trio.Event = server._peer_connected[ + subactor.aid.uid + ] log.warning( - f"{subactor.uid} never 't finished spawning?" + f"{subactor.aid.uid} never 't finished spawning?" ) await event.wait() # channel/portal should now be up - _, _, portal = children[subactor.uid] + _, _, portal = children[subactor.aid.uid] # XXX should be impossible to get here # unless method was called from within @@ -407,7 +409,7 @@ class ActorNursery: if portal is None: # cancelled while waiting on the event # to arrive - chan = server._peers[subactor.uid][-1] + chan = server._peers[subactor.aid.uid][-1] if chan: portal = Portal(chan) else: # there's no other choice left @@ -506,7 +508,7 @@ async def _open_and_supervise_one_cancels_all_nursery( except BaseException as _inner_err: inner_err = _inner_err - errors[actor.uid] = inner_err + errors[actor.aid.uid] = inner_err # If we error in the root but the debugger is # engaged we don't want to prematurely kill (and @@ -539,7 +541,7 @@ async def _open_and_supervise_one_cancels_all_nursery( log.cancel( f'Actor-nursery cancelled by {etype}\n\n' - f'{current_actor().uid}\n' + f'{current_actor().aid.uid}\n' f' |_{an}\n\n' # TODO: show tb str? @@ -630,7 +632,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # show frame on any (likely) internal error if ( - not an.cancelled + not an.cancel_called and an._scope_error ): __tracebackhide__: bool = False @@ -726,7 +728,7 @@ async def open_nursery( if ( an and - not an.cancelled + not an.cancel_called and an._scope_error ): diff --git a/tractor/_testing/addr.py b/tractor/_testing/addr.py index 1b066336..b1ccf005 100644 --- a/tractor/_testing/addr.py +++ b/tractor/_testing/addr.py @@ -61,7 +61,11 @@ def get_rando_addr( # NOTE, file-name uniqueness (no-collisions) will be based on # the runtime-directory and root (pytest-proc's) pid. case 'uds': - testrun_reg_addr = addr_type.get_random().unwrap() + from tractor.ipc._uds import UDSAddress + addr: UDSAddress = addr_type.get_random() + assert addr.is_valid + assert addr.sockpath.resolve() + testrun_reg_addr = addr.unwrap() # XXX, as sanity it should never the same as the default for the # host-singleton registry actor. diff --git a/tractor/_testing/pytest.py b/tractor/_testing/pytest.py index 1a2f63ab..a0d0d0d5 100644 --- a/tractor/_testing/pytest.py +++ b/tractor/_testing/pytest.py @@ -74,6 +74,7 @@ def tractor_test(fn): reg_addr=None, start_method: str|None = None, debug_mode: bool = False, + tpt_proto: str|None=None, **kwargs ): # __tracebackhide__ = True @@ -102,6 +103,9 @@ def tractor_test(fn): # set of subprocess spawning backends kwargs['debug_mode'] = debug_mode + if 'tpt_proto' in inspect.signature(fn).parameters: + # set of subprocess spawning backends + kwargs['tpt_proto'] = tpt_proto if kwargs: @@ -177,6 +181,13 @@ def pytest_configure(config): backend = config.option.spawn_backend tractor._spawn.try_set_start_method(backend) + # register custom marks to avoid warnings see, + # https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers + config.addinivalue_line( + 'markers', + 'no_tpt(proto_key): test will (likely) not behave with tpt backend' + ) + @pytest.fixture(scope='session') def debug_mode(request) -> bool: @@ -225,13 +236,32 @@ def tpt_protos(request) -> list[str]: autouse=True, ) def tpt_proto( + request, tpt_protos: list[str], ) -> str: proto_key: str = tpt_protos[0] + # ?TODO, but needs a fn-scoped tpt_proto fixture.. + # @pytest.mark.no_tpt('uds') + # node = request.node + # markers = node.own_markers + # for mark in markers: + # if ( + # mark.name == 'no_tpt' + # and + # proto_key in mark.args + # ): + # pytest.skip( + # f'Test {node} normally fails with ' + # f'tpt-proto={proto_key!r}\n' + # ) + from tractor import _state if _state._def_tpt_proto != proto_key: _state._def_tpt_proto = proto_key + _state._runtime_vars['_enable_tpts'] = [ + proto_key, + ] yield proto_key diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 9d109f3f..932fc075 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -94,7 +94,7 @@ class Channel: self._transport: MsgTransport|None = transport # set after handshake - always info from peer end - self.aid: Aid|None = None + self._aid: Aid|None = None self._aiter_msgs = self._iter_msgs() self._exc: Exception|None = None @@ -122,6 +122,14 @@ class Channel: ''' return self._cancel_called + @property + def aid(self) -> Aid: + ''' + Peer actor's ID. + + ''' + return self._aid + @property def uid(self) -> tuple[str, str]: ''' @@ -505,7 +513,7 @@ class Channel: f'<= {peer_aid.reprol(sin_uuid=False)}\n' ) # NOTE, we always are referencing the remote peer! - self.aid = peer_aid + self._aid = peer_aid return peer_aid diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py index 4b393fb6..a7d450a6 100644 --- a/tractor/ipc/_uds.py +++ b/tractor/ipc/_uds.py @@ -197,9 +197,11 @@ class UDSAddress( # sockname: str = '.'.join(actor.uid) + f'@{pid}' # -[ ] CURRENTLY using `.` BREAKS TEST SUITE tho.. else: - prefix: str = '' if is_root_process(): - prefix: str = 'root' + prefix: str = 'no_runtime_root' + else: + prefix: str = 'no_runtime_actor' + sockname: str = f'{prefix}@{pid}' sockpath: Path = Path(f'{sockname}.sock') diff --git a/tractor/log.py b/tractor/log.py index 3ff5384b..71708224 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -293,7 +293,7 @@ _conc_name_getters = { 'task': pformat_task_uid, 'actor': lambda: _curr_actor_no_exc(), 'actor_name': lambda: current_actor().name, - 'actor_uid': lambda: current_actor().uid[1][:6], + 'actor_uid': lambda: current_actor().aid.uuid[:6], } diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 6176ca90..ea314f00 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -324,6 +324,8 @@ class Start( # => SEE ABOVE <= kwargs: dict[str, Any] uid: tuple[str, str] # (calling) actor-id + # aid: Aid + # ^TODO, convert stat! # TODO: enforcing a msg-spec in terms `Msg.pld` # parameterizable msgs to be used in the appls IPC dialog.