diff --git a/tests/conftest.py b/tests/conftest.py index 5ce8442..7f53cbd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ """ ``tractor`` testing!! """ +from functools import partial import sys import subprocess import os @@ -8,6 +9,9 @@ import random import signal import platform import time +from typing import ( + AsyncContextManager, +) import pytest import tractor @@ -150,6 +154,18 @@ def pytest_generate_tests(metafunc): metafunc.parametrize("start_method", [spawn_backend], scope='module') +# TODO: a way to let test scripts (like from `examples/`) +# guarantee they won't registry addr collide! +@pytest.fixture +def open_test_runtime( + reg_addr: tuple, +) -> AsyncContextManager: + return partial( + tractor.open_nursery, + registry_addrs=[reg_addr], + ) + + def sig_prog(proc, sig): "Kill the actor-process with ``sig``." proc.send_signal(sig) diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index 9a73ba8..ea626a5 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -41,7 +41,7 @@ from tractor.msg import ( from tractor.msg.types import ( _payload_msgs, log, - Msg, + PayloadMsg, Started, mk_msg_spec, ) @@ -61,7 +61,7 @@ def mk_custom_codec( uid: tuple[str, str] = tractor.current_actor().uid # XXX NOTE XXX: despite defining `NamespacePath` as a type - # field on our `Msg.pld`, we still need a enc/dec_hook() pair + # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair # to cast to/from that type on the wire. See the docs: # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types @@ -321,12 +321,12 @@ def dec_type_union( import importlib types: list[Type] = [] for type_name in type_names: - for ns in [ + for mod in [ typing, importlib.import_module(__name__), ]: if type_ref := getattr( - ns, + mod, type_name, False, ): @@ -744,7 +744,7 @@ def chk_pld_type( # 'Error', .pld: ErrorData codec: MsgCodec = mk_codec( - # NOTE: this ONLY accepts `Msg.pld` fields of a specified + # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified # type union. ipc_pld_spec=payload_spec, ) @@ -752,7 +752,7 @@ def chk_pld_type( # make a one-off dec to compare with our `MsgCodec` instance # which does the below `mk_msg_spec()` call internally ipc_msg_spec: Union[Type[Struct]] - msg_types: list[Msg[payload_spec]] + msg_types: list[PayloadMsg[payload_spec]] ( ipc_msg_spec, msg_types, @@ -761,7 +761,7 @@ def chk_pld_type( ) _enc = msgpack.Encoder() _dec = msgpack.Decoder( - type=ipc_msg_spec or Any, # like `Msg[Any]` + type=ipc_msg_spec or Any, # like `PayloadMsg[Any]` ) assert ( @@ -806,7 +806,7 @@ def chk_pld_type( 'cid': '666', 'pld': pld, } - enc_msg: Msg = typedef(**kwargs) + enc_msg: PayloadMsg = typedef(**kwargs) _wire_bytes: bytes = _enc.encode(enc_msg) wire_bytes: bytes = codec.enc.encode(enc_msg) @@ -883,25 +883,16 @@ def test_limit_msgspec(): debug_mode=True ): - # ensure we can round-trip a boxing `Msg` + # ensure we can round-trip a boxing `PayloadMsg` assert chk_pld_type( - # Msg, - Any, - None, + payload_spec=Any, + pld=None, expect_roundtrip=True, ) - # TODO: don't need this any more right since - # `msgspec>=0.15` has the nice generics stuff yah?? - # - # manually override the type annot of the payload - # field and ensure it propagates to all msg-subtypes. - # Msg.__annotations__['pld'] = Any - # verify that a mis-typed payload value won't decode assert not chk_pld_type( - # Msg, - int, + payload_spec=int, pld='doggy', ) @@ -913,18 +904,16 @@ def test_limit_msgspec(): value: Any assert not chk_pld_type( - # Msg, - CustomPayload, + payload_spec=CustomPayload, pld='doggy', ) assert chk_pld_type( - # Msg, - CustomPayload, + payload_spec=CustomPayload, pld=CustomPayload(name='doggy', value='urmom') ) - # uhh bc we can `.pause_from_sync()` now! :surfer: + # yah, we can `.pause_from_sync()` now! # breakpoint() trio.run(main) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 43dadbb..71c691a 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -1336,6 +1336,23 @@ def test_shield_pause( child.expect(pexpect.EOF) +# TODO: better error for "non-ideal" usage from the root actor. +# -[ ] if called from an async scope emit a message that suggests +# using `await tractor.pause()` instead since it's less overhead +# (in terms of `greenback` and/or extra threads) and if it's from +# a sync scope suggest that usage must first call +# `ensure_portal()` in the (eventual parent) async calling scope? +def test_sync_pause_from_bg_task_in_root_actor_(): + ''' + When used from the root actor, normally we can only implicitly + support `.pause_from_sync()` from the main-parent-task (that + opens the runtime via `open_root_actor()`) since `greenback` + requires a `.ensure_portal()` call per `trio.Task` where it is + used. + + ''' + ... + # TODO: needs ANSI code stripping tho, see `assert_before()` # above! def test_correct_frames_below_hidden(): ''' diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 79a2200..3a1d2f2 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -19,7 +19,7 @@ from tractor._testing import ( @pytest.fixture def run_example_in_subproc( loglevel: str, - testdir, + testdir: pytest.Testdir, reg_addr: tuple[str, int], ): diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 45722a6..8d4697f 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -2,16 +2,25 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'. ''' -from typing import Optional, Iterable, Union import asyncio import builtins +from contextlib import ExitStack import itertools import importlib +import os +from pathlib import Path +import signal +from typing import ( + Callable, + Iterable, + Union, +) import pytest import trio import tractor from tractor import ( + current_actor, to_asyncio, RemoteActorError, ContextCancelled, @@ -25,8 +34,8 @@ async def sleep_and_err( # just signature placeholders for compat with # ``to_asyncio.open_channel_from()`` - to_trio: Optional[trio.MemorySendChannel] = None, - from_trio: Optional[asyncio.Queue] = None, + to_trio: trio.MemorySendChannel|None = None, + from_trio: asyncio.Queue|None = None, ): if to_trio: @@ -36,7 +45,7 @@ async def sleep_and_err( assert 0 -async def sleep_forever(): +async def aio_sleep_forever(): await asyncio.sleep(float('inf')) @@ -44,7 +53,7 @@ async def trio_cancels_single_aio_task(): # spawn an ``asyncio`` task to run a func and return result with trio.move_on_after(.2): - await tractor.to_asyncio.run_task(sleep_forever) + await tractor.to_asyncio.run_task(aio_sleep_forever) def test_trio_cancels_aio_on_actor_side(reg_addr): @@ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr): async def asyncio_actor( - target: str, expect_err: Exception|None = None ) -> None: assert tractor.current_actor().is_infected_aio() - target = globals()[target] + target: Callable = globals()[target] if '.' in expect_err: modpath, _, name = expect_err.rpartition('.') @@ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr): async with tractor.open_nursery() as n: portal = await n.run_in_actor( asyncio_actor, - target='sleep_forever', + target='aio_sleep_forever', expect_err='trio.Cancelled', infect_asyncio=True, ) @@ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr): async with tractor.open_nursery() as n: await n.run_in_actor( asyncio_actor, - target='sleep_forever', + target='aio_sleep_forever', expect_err='trio.Cancelled', infect_asyncio=True, ) @@ -195,7 +203,7 @@ async def trio_ctx( # spawn another asyncio task for the cuck of it. n.start_soon( tractor.to_asyncio.run_task, - sleep_forever, + aio_sleep_forever, ) await trio.sleep_forever() @@ -285,7 +293,7 @@ async def aio_cancel(): # cancel and enter sleep task.cancel() - await sleep_forever() + await aio_sleep_forever() def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): @@ -355,7 +363,6 @@ async def push_from_aio_task( async def stream_from_aio( - exit_early: bool = False, raise_err: bool = False, aio_raise_err: bool = False, @@ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics( trio.run(main) + +@tractor.context +async def manage_file( + ctx: tractor.Context, + tmp_path_str: str, + bg_aio_task: bool = False, +): + ''' + Start an `asyncio` task that just sleeps after registering a context + with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree + and ensure the stack is closed in the infected mode child. + + To verify the teardown state just write a tmpfile to the `testdir` + and delete it on actor close. + + ''' + + tmp_path: Path = Path(tmp_path_str) + tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file' + + # create a the tmp file and tell the parent where it's at + assert not tmp_file.is_file() + tmp_file.touch() + + stack: ExitStack = current_actor().lifetime_stack + stack.callback(tmp_file.unlink) + + await ctx.started(( + str(tmp_file), + os.getpid(), + )) + + # expect to be cancelled from here! + try: + + # NOTE: turns out you don't even need to sched an aio task + # since the original issue, even though seemingly was due to + # the guest-run being abandoned + a `._debug.pause()` inside + # `._runtime._async_main()` (which was originally trying to + # debug the `.lifetime_stack` not closing), IS NOT actually + # the core issue? + # + # further notes: + # + # - `trio` only issues the " RuntimeWarning: Trio guest run + # got abandoned without properly finishing... weird stuff + # might happen" IFF you DO run a asyncio task here, BUT + # - the original issue of the `.lifetime_stack` not closing + # will still happen even if you don't run an `asyncio` task + # here even though the "abandon" messgage won't be shown.. + # + # => ????? honestly i'm lost but it seems to be some issue + # with `asyncio` and SIGINT.. + # + # XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and + # `run_as_asyncio_guest()` is written WITHOUT THE + # `.cancel_soon()` soln, both of these tests will pass ?? + # so maybe it has something to do with `asyncio` loop init + # state?!? + # honestly, this REALLY reminds me why i haven't used + # `asyncio` by choice in years.. XD + # + # await tractor.to_asyncio.run_task(aio_sleep_forever) + if bg_aio_task: + async with trio.open_nursery() as tn: + tn.start_soon( + tractor.to_asyncio.run_task, + aio_sleep_forever, + ) + + await trio.sleep_forever() + + # signalled manually at the OS level (aka KBI) by the parent actor. + except KeyboardInterrupt: + print('child raised KBI..') + assert tmp_file.exists() + raise + else: + raise RuntimeError('shoulda received a KBI?') + + +@pytest.mark.parametrize( + 'bg_aio_task', + [ + False, + + # NOTE: (and see notes in `manage_file()` above as well) if + # we FOR SURE SPAWN AN AIO TASK in the child it seems the + # "silent-abandon" case (as is described in detail in + # `to_asyncio.run_as_asyncio_guest()`) does not happen and + # `asyncio`'s loop will at least abandon the `trio` side + # loudly? .. prolly the state-spot to start looking for + # a soln that results in NO ABANDONMENT.. XD + True, + ], + ids=[ + 'bg_aio_task', + 'just_trio_slee', + ], +) +@pytest.mark.parametrize( + 'wait_for_ctx', + [ + False, + True, + ], + ids=[ + 'raise_KBI_in_rent', + 'wait_for_ctx', + ], +) +def test_sigint_closes_lifetime_stack( + tmp_path: Path, + wait_for_ctx: bool, + bg_aio_task: bool, +): + ''' + Ensure that an infected child can use the `Actor.lifetime_stack` + to make a file on boot and it's automatically cleaned up by the + actor-lifetime-linked exit stack closure. + + ''' + async def main(): + try: + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'file_mngr', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + manage_file, + tmp_path_str=str(tmp_path), + bg_aio_task=bg_aio_task, + ) as (ctx, first): + + path_str, cpid = first + tmp_file: Path = Path(path_str) + assert tmp_file.exists() + + # XXX originally to simulate what (hopefully) + # the below now triggers.. had to manually + # trigger a SIGINT from a ctl-c in the root. + # await trio.sleep_forever() + + # XXX NOTE XXX signal infected-`asyncio` child to + # OS-cancel with SIGINT; this should trigger the + # bad `asyncio` cancel behaviour that can cause + # a guest-run abandon as was seen causing + # shm-buffer leaks in `piker`'s live quote stream + # susbys! + # + # await trio.sleep(.5) + await trio.sleep(.2) + os.kill( + cpid, + signal.SIGINT, + ) + + # XXX CASE 1: without the bug fixed, in + # the non-KBI-raised-in-parent case, this + # timeout should trigger! + if wait_for_ctx: + print('waiting for ctx outcome in parent..') + try: + with trio.fail_after(.7): + await ctx.wait_for_result() + except tractor.ContextCancelled as ctxc: + assert ctxc.canceller == ctx.chan.uid + raise + + # XXX CASE 2: this seems to be the source of the + # original issue which exhibited BEFORE we put + # a `Actor.cancel_soon()` inside + # `run_as_asyncio_guest()`.. + else: + raise KeyboardInterrupt + + pytest.fail('should have raised some kinda error?!?') + + except ( + KeyboardInterrupt, + ContextCancelled, + ): + # XXX CASE 2: without the bug fixed, in the + # KBI-raised-in-parent case, the actor teardown should + # never get run (silently abaondoned by `asyncio`..) and + # thus the file should leak! + assert not tmp_file.exists() + assert ctx.maybe_error + + trio.run(main) + + # TODO: debug_mode tests once we get support for `asyncio`! # # -[ ] need tests to wrap both scripts: diff --git a/tractor/_context.py b/tractor/_context.py index 32acf83..9504d6f 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -121,10 +121,19 @@ class Unresolved: @dataclass class Context: ''' - An inter-actor, SC transitive, `Task` communication context. + An inter-actor, SC transitive, `trio.Task` (pair) + communication context. - NB: This class should **never be instatiated directly**, it is allocated - by the runtime in 2 ways: + (We've also considered other names and ideas: + - "communicating tasks scope": cts + - "distributed task scope": dts + - "communicating tasks context": ctc + + **Got a better idea for naming? Make an issue dawg!** + ) + + NB: This class should **never be instatiated directly**, it is + allocated by the runtime in 2 ways: - by entering `Portal.open_context()` which is the primary public API for any "parent" task or, - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg @@ -210,6 +219,16 @@ class Context: # more the the `Context` is needed? _portal: Portal | None = None + @property + def portal(self) -> Portal|None: + ''' + Return any wrapping memory-`Portal` if this is + a 'parent'-side task which called `Portal.open_context()`, + otherwise `None`. + + ''' + return self._portal + # NOTE: each side of the context has its own cancel scope # which is exactly the primitive that allows for # cross-actor-task-supervision and thus SC. @@ -299,6 +318,8 @@ class Context: # boxed exception. NOW, it's used for spawning overrun queuing # tasks when `.allow_overruns == True` !!! _scope_nursery: trio.Nursery|None = None + # ^-TODO-^ change name? + # -> `._scope_tn` "scope task nursery" # streaming overrun state tracking _in_overrun: bool = False @@ -408,10 +429,23 @@ class Context: ''' return self._cancel_called + @cancel_called.setter + def cancel_called(self, val: bool) -> None: + ''' + Set the self-cancelled request `bool` value. + + ''' + # to debug who frickin sets it.. + # if val: + # from .devx import pause_from_sync + # pause_from_sync() + + self._cancel_called = val + @property def canceller(self) -> tuple[str, str]|None: ''' - ``Actor.uid: tuple[str, str]`` of the (remote) + `Actor.uid: tuple[str, str]` of the (remote) actor-process who's task was cancelled thus causing this (side of the) context to also be cancelled. @@ -515,7 +549,7 @@ class Context: # the local scope was never cancelled # and instead likely we received a remote side - # # cancellation that was raised inside `.result()` + # # cancellation that was raised inside `.wait_for_result()` # or ( # (se := self._local_error) # and se is re @@ -585,6 +619,8 @@ class Context: self, error: BaseException, + set_cancel_called: bool = False, + ) -> None: ''' (Maybe) cancel this local scope due to a received remote @@ -603,7 +639,7 @@ class Context: - `Portal.open_context()` - `Portal.result()` - `Context.open_stream()` - - `Context.result()` + - `Context.wait_for_result()` when called/closed by actor local task(s). @@ -729,7 +765,7 @@ class Context: # Cancel the local `._scope`, catch that # `._scope.cancelled_caught` and re-raise any remote error - # once exiting (or manually calling `.result()`) the + # once exiting (or manually calling `.wait_for_result()`) the # `.open_context()` block. cs: trio.CancelScope = self._scope if ( @@ -764,8 +800,9 @@ class Context: # `trio.Cancelled` subtype here ;) # https://github.com/goodboy/tractor/issues/368 message: str = 'Cancelling `Context._scope` !\n\n' + # from .devx import pause_from_sync + # pause_from_sync() self._scope.cancel() - else: message: str = 'NOT cancelling `Context._scope` !\n\n' # from .devx import mk_pdb @@ -889,7 +926,7 @@ class Context: ''' side: str = self.side - self._cancel_called: bool = True + self.cancel_called: bool = True header: str = ( f'Cancelling ctx with peer from {side.upper()} side\n\n' @@ -912,7 +949,7 @@ class Context: # `._scope.cancel()` since we expect the eventual # `ContextCancelled` from the other side to trigger this # when the runtime finally receives it during teardown - # (normally in `.result()` called from + # (normally in `.wait_for_result()` called from # `Portal.open_context().__aexit__()`) if side == 'parent': if not self._portal: @@ -1025,10 +1062,10 @@ class Context: ''' __tracebackhide__: bool = hide_tb - our_uid: tuple = self.chan.uid + peer_uid: tuple = self.chan.uid # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption - # for "graceful cancellation" case: + # for "graceful cancellation" case(s): # # Whenever a "side" of a context (a `Task` running in # an actor) **is** the side which requested ctx @@ -1045,9 +1082,11 @@ class Context: # set to the `Actor.uid` of THIS task (i.e. the # cancellation requesting task's actor is the actor # checking whether it should absorb the ctxc). + self_ctxc: bool = self._is_self_cancelled(remote_error) if ( + self_ctxc + and not raise_ctxc_from_self_call - and self._is_self_cancelled(remote_error) # TODO: ?potentially it is useful to emit certain # warning/cancel logs for the cases where the @@ -1077,8 +1116,8 @@ class Context: and isinstance(remote_error, RemoteActorError) and remote_error.boxed_type is StreamOverrun - # and tuple(remote_error.msgdata['sender']) == our_uid - and tuple(remote_error.sender) == our_uid + # and tuple(remote_error.msgdata['sender']) == peer_uid + and tuple(remote_error.sender) == peer_uid ): # NOTE: we set the local scope error to any "self # cancellation" error-response thus "absorbing" @@ -1140,9 +1179,9 @@ class Context: of the remote cancellation. ''' - __tracebackhide__ = hide_tb + __tracebackhide__: bool = False assert self._portal, ( - "Context.result() can not be called from callee side!" + '`Context.wait_for_result()` can not be called from callee side!' ) if self._final_result_is_set(): return self._result @@ -1169,7 +1208,8 @@ class Context: drained_msgs, ) = await msgops.drain_to_final_msg( ctx=self, - hide_tb=hide_tb, + # hide_tb=hide_tb, + hide_tb=False, ) drained_status: str = ( @@ -1185,6 +1225,8 @@ class Context: log.cancel(drained_status) + # __tracebackhide__: bool = hide_tb + self.maybe_raise( # NOTE: obvi we don't care if we # overran the far end if we're already @@ -1197,7 +1239,8 @@ class Context: # raising something we know might happen # during cancellation ;) (not self._cancel_called) - ) + ), + hide_tb=hide_tb, ) # TODO: eventually make `.outcome: Outcome` and thus return # `self.outcome.unwrap()` here! @@ -1583,7 +1626,7 @@ class Context: - NEVER `return` early before delivering the msg! bc if the error is a ctxc and there is a task waiting on - `.result()` we need the msg to be + `.wait_for_result()` we need the msg to be `send_chan.send_nowait()`-ed over the `._rx_chan` so that the error is relayed to that waiter task and thus raised in user code! @@ -1828,7 +1871,7 @@ async def open_context_from_portal( When the "callee" (side that is "called"/started by a call to *this* method) returns, the caller side (this) unblocks and any final value delivered from the other end can be - retrieved using the `Contex.result()` api. + retrieved using the `Contex.wait_for_result()` api. The yielded ``Context`` instance further allows for opening bidirectional streams, explicit cancellation and @@ -1965,14 +2008,14 @@ async def open_context_from_portal( yield ctx, first # ??TODO??: do we still want to consider this or is - # the `else:` block handling via a `.result()` + # the `else:` block handling via a `.wait_for_result()` # call below enough?? # - # -[ ] pretty sure `.result()` internals do the + # -[ ] pretty sure `.wait_for_result()` internals do the # same as our ctxc handler below so it ended up # being same (repeated?) behaviour, but ideally we # wouldn't have that duplication either by somehow - # factoring the `.result()` handler impl in a way + # factoring the `.wait_for_result()` handler impl in a way # that we can re-use it around the `yield` ^ here # or vice versa? # @@ -2110,7 +2153,7 @@ async def open_context_from_portal( # AND a group-exc is only raised if there was > 1 # tasks started *here* in the "caller" / opener # block. If any one of those tasks calls - # `.result()` or `MsgStream.receive()` + # `.wait_for_result()` or `MsgStream.receive()` # `._maybe_raise_remote_err()` will be transitively # called and the remote error raised causing all # tasks to be cancelled. @@ -2180,7 +2223,7 @@ async def open_context_from_portal( f'|_{ctx._task}\n' ) # XXX NOTE XXX: the below call to - # `Context.result()` will ALWAYS raise + # `Context.wait_for_result()` will ALWAYS raise # a `ContextCancelled` (via an embedded call to # `Context._maybe_raise_remote_err()`) IFF # a `Context._remote_error` was set by the runtime @@ -2190,10 +2233,10 @@ async def open_context_from_portal( # ALWAYS SET any time "callee" side fails and causes "caller # side" cancellation via a `ContextCancelled` here. try: - result_or_err: Exception|Any = await ctx.result() + result_or_err: Exception|Any = await ctx.wait_for_result() except BaseException as berr: # on normal teardown, if we get some error - # raised in `Context.result()` we still want to + # raised in `Context.wait_for_result()` we still want to # save that error on the ctx's state to # determine things like `.cancelled_caught` for # cases where there was remote cancellation but diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 99a4dd6..522d4b8 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -56,14 +56,12 @@ async def get_registry( ]: ''' Return a portal instance connected to a local or remote - arbiter. + registry-service actor; if a connection already exists re-use it + (presumably to call a `.register_actor()` registry runtime RPC + ep). ''' - actor = current_actor() - - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - + actor: Actor = current_actor() if actor.is_registrar: # we're already the arbiter # (likely a re-entrant call from the arbiter actor) @@ -72,6 +70,8 @@ async def get_registry( Channel((host, port)) ) else: + # TODO: try to look pre-existing connection from + # `Actor._peers` and use it instead? async with ( _connect_chan(host, port) as chan, open_portal(chan) as regstr_ptl, diff --git a/tractor/_entry.py b/tractor/_entry.py index e22a4f1..4199a92 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -20,7 +20,8 @@ Sub-process entry points. """ from __future__ import annotations from functools import partial -# import textwrap +import os +import textwrap from typing import ( Any, TYPE_CHECKING, @@ -58,7 +59,7 @@ def _mp_main( ) -> None: ''' - The routine called *after fork* which invokes a fresh ``trio.run`` + The routine called *after fork* which invokes a fresh `trio.run()` ''' actor._forkserver_info = forkserver_info @@ -96,6 +97,35 @@ def _mp_main( log.info(f"Subactor {actor.uid} terminated") +# TODO: move this to some kinda `.devx._conc_lang.py` eventually +# as we work out our multi-domain state-flow-syntax! +def nest_from_op( + input_op: str, + tree_str: str, + + back_from_op: int = 1, +) -> str: + ''' + Depth-increment the input (presumably hierarchy/supervision) + input "tree string" below the provided `input_op` execution + operator, so injecting a `"\n|_{input_op}\n"`and indenting the + `tree_str` to nest content aligned with the ops last char. + + ''' + return ( + f'{input_op}\n' + + + textwrap.indent( + tree_str, + prefix=( + len(input_op) + - + back_from_op + ) *' ', + ) + ) + + def _trio_main( actor: Actor, *, @@ -119,7 +149,6 @@ def _trio_main( if actor.loglevel is not None: get_console_log(actor.loglevel) - import os actor_info: str = ( f'|_{actor}\n' f' uid: {actor.uid}\n' @@ -128,13 +157,29 @@ def _trio_main( f' loglevel: {actor.loglevel}\n' ) log.info( - 'Started new trio subactor:\n' + 'Started new `trio` subactor:\n' + - '>\n' # like a "started/play"-icon from super perspective - + - actor_info, + nest_from_op( + input_op='(>', # like a "started/play"-icon from super perspective + tree_str=actor_info, + ) + # '>(\n' # like a "started/play"-icon from super perspective + # + + # actor_info, ) - + logmeth = log.info + message: str = ( + # log.info( + 'Subactor terminated\n' + + + nest_from_op( + input_op=')>', # like a "started/play"-icon from super perspective + tree_str=actor_info, + ) + # 'x\n' # like a "crossed-out/killed" from super perspective + # + + # actor_info + ) try: if infect_asyncio: actor._infected_aio = True @@ -143,16 +188,18 @@ def _trio_main( trio.run(trio_main) except KeyboardInterrupt: - log.cancel( - 'Actor received KBI\n' + logmeth = log.cancel + message: str = ( + 'Actor received KBI (aka an OS-cancel)\n' + - actor_info + nest_from_op( + input_op='c)>', # like a "started/play"-icon from super perspective + tree_str=actor_info, + ) ) + except BaseException: + log.exception('Actor crashed exit?') + raise + finally: - log.info( - 'Subactor terminated\n' - + - 'x\n' # like a "crossed-out/killed" from super perspective - + - actor_info - ) + logmeth(message) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 7164d6a..46b64c1 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -922,15 +922,6 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" - -class AsyncioCancelled(Exception): - ''' - Asyncio cancelled translation (non-base) error - for use with the ``to_asyncio`` module - to be raised in the ``trio`` side task - - ''' - class MessagingError(Exception): ''' IPC related msg (typing), transaction (ordering) or dialog @@ -1324,7 +1315,9 @@ def _mk_recv_mte( any_pld: Any = msgpack.decode(msg.pld) message: str = ( f'invalid `{msg_type.__qualname__}` msg payload\n\n' - f'value: `{any_pld!r}` does not match type-spec: ' + f'{any_pld!r}\n\n' + f'has type {type(any_pld)!r}\n\n' + f'and does not match type-spec ' f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' ) bad_msg = msg diff --git a/tractor/_ipc.py b/tractor/_ipc.py index e5e3d10..492602b 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -40,6 +40,7 @@ from typing import ( TypeVar, ) +# import pdbp import msgspec from tricycle import BufferedReceiveStream import trio @@ -290,12 +291,14 @@ class MsgpackTCPStream(MsgTransport): else: raise + # @pdbp.hideframe async def send( self, msg: msgtypes.MsgType, strict_types: bool = True, - # hide_tb: bool = False, + hide_tb: bool = False, + ) -> None: ''' Send a msgpack encoded py-object-blob-as-msg over TCP. @@ -304,7 +307,10 @@ class MsgpackTCPStream(MsgTransport): invalid msg type ''' - # __tracebackhide__: bool = hide_tb + __tracebackhide__: bool = hide_tb + # try: + # XXX see `trio._sync.AsyncContextManagerMixin` for details + # on the `.acquire()`/`.release()` sequencing.. async with self._send_lock: # NOTE: lookup the `trio.Task.context`'s var for @@ -352,6 +358,14 @@ class MsgpackTCPStream(MsgTransport): size: bytes = struct.pack(" tuple[str, int]: return self._laddr @@ -560,27 +574,40 @@ class Channel: ) return transport + # TODO: something like, + # `pdbp.hideframe_on(errors=[MsgTypeError])` + # @pdbp.hideframe async def send( self, payload: Any, - # hide_tb: bool = False, + hide_tb: bool = False, ) -> None: ''' Send a coded msg-blob over the transport. ''' - # __tracebackhide__: bool = hide_tb - log.transport( - '=> send IPC msg:\n\n' - f'{pformat(payload)}\n' - ) # type: ignore - assert self._transport - await self._transport.send( - payload, - # hide_tb=hide_tb, - ) + __tracebackhide__: bool = hide_tb + try: + log.transport( + '=> send IPC msg:\n\n' + f'{pformat(payload)}\n' + ) + # assert self._transport # but why typing? + await self._transport.send( + payload, + hide_tb=hide_tb, + ) + except BaseException as _err: + err = _err # bind for introspection + if not isinstance(_err, MsgTypeError): + # assert err + __tracebackhide__: bool = False + else: + assert err.cid + + raise async def recv(self) -> Any: assert self._transport diff --git a/tractor/_portal.py b/tractor/_portal.py index 2c676e1..38ce8dc 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -121,7 +121,8 @@ class Portal: ) return self.chan - # TODO: factor this out into an `ActorNursery` wrapper + # TODO: factor this out into a `.highlevel` API-wrapper that uses + # a single `.open_context()` call underneath. async def _submit_for_result( self, ns: str, @@ -141,13 +142,22 @@ class Portal: portal=self, ) + # TODO: we should deprecate this API right? since if we remove + # `.run_in_actor()` (and instead move it to a `.highlevel` + # wrapper api (around a single `.open_context()` call) we don't + # really have any notion of a "main" remote task any more? + # # @api_frame - async def result(self) -> Any: + async def wait_for_result( + self, + hide_tb: bool = True, + ) -> Any: ''' - Return the result(s) from the remote actor's "main" task. + Return the final result delivered by a `Return`-msg from the + remote peer actor's "main" task's `return` statement. ''' - __tracebackhide__ = True + __tracebackhide__: bool = hide_tb # Check for non-rpc errors slapped on the # channel for which we always raise exc = self.channel._exc @@ -182,6 +192,23 @@ class Portal: return self._final_result_pld + # TODO: factor this out into a `.highlevel` API-wrapper that uses + # a single `.open_context()` call underneath. + async def result( + self, + *args, + **kwargs, + ) -> Any|Exception: + typname: str = type(self).__name__ + log.warning( + f'`{typname}.result()` is DEPRECATED!\n' + 'Use `{typname.wait_for_result()` instead!\n' + ) + return await self.wait_for_result( + *args, + **kwargs, + ) + async def _cancel_streams(self): # terminate all locally running async generator # IPC calls @@ -240,6 +267,7 @@ class Portal: f'{reminfo}' ) + # XXX the one spot we set it? self.channel._cancel_called: bool = True try: # send cancel cmd - might not get response @@ -279,6 +307,8 @@ class Portal: ) return False + # TODO: do we still need this for low level `Actor`-runtime + # method calls or can we also remove it? async def run_from_ns( self, namespace_path: str, @@ -316,6 +346,8 @@ class Portal: expect_msg=Return, ) + # TODO: factor this out into a `.highlevel` API-wrapper that uses + # a single `.open_context()` call underneath. async def run( self, func: str, @@ -370,6 +402,8 @@ class Portal: expect_msg=Return, ) + # TODO: factor this out into a `.highlevel` API-wrapper that uses + # a single `.open_context()` call underneath. @acm async def open_stream_from( self, diff --git a/tractor/_root.py b/tractor/_root.py index 7cdef60..3cfa253 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -21,6 +21,7 @@ Root actor runtime ignition(s). from contextlib import asynccontextmanager as acm from functools import partial import importlib +import inspect import logging import os import signal @@ -115,10 +116,16 @@ async def open_root_actor( if ( debug_mode and maybe_enable_greenback - and await _debug.maybe_init_greenback( - raise_not_found=False, + and ( + maybe_mod := await _debug.maybe_init_greenback( + raise_not_found=False, + ) ) ): + logger.info( + f'Found `greenback` installed @ {maybe_mod}\n' + 'Enabling `tractor.pause_from_sync()` support!\n' + ) os.environ['PYTHONBREAKPOINT'] = ( 'tractor.devx._debug._sync_pause_from_builtin' ) @@ -264,7 +271,10 @@ async def open_root_actor( except OSError: # TODO: make this a "discovery" log level? - logger.warning(f'No actor registry found @ {addr}') + logger.info( + f'No actor registry found @ {addr}\n' + # 'Registry will be initialized in local actor..' + ) async with trio.open_nursery() as tn: for addr in registry_addrs: @@ -365,23 +375,25 @@ async def open_root_actor( ) try: yield actor - except ( Exception, BaseExceptionGroup, ) as err: - - import inspect + # XXX NOTE XXX see equiv note inside + # `._runtime.Actor._stream_handler()` where in the + # non-root or root-that-opened-this-mahually case we + # wait for the local actor-nursery to exit before + # exiting the transport channel handler. entered: bool = await _debug._maybe_enter_pm( err, api_frame=inspect.currentframe(), ) - if ( not entered - and not is_multi_cancelled(err) + and + not is_multi_cancelled(err) ): - logger.exception('Root actor crashed:\n') + logger.exception('Root actor crashed\n') # ALWAYS re-raise any error bubbled up from the # runtime! diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 166ee96..f901150 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -89,6 +89,15 @@ if TYPE_CHECKING: log = get_logger('tractor') +# TODO: move to a `tractor.lowlevel.rpc` with the below +# func-type-cases implemented "on top of" `@context` defs. +# -[ ] std async func +# -[ ] `Portal.open_stream_from()` with async-gens? +# |_ possibly a duplex form of this with a +# `sent_from_peer = yield send_to_peer` form, which would require +# syncing the send/recv side with possibly `.receive_nowait()` +# on each `yield`? +# -[ ] async def _invoke_non_context( actor: Actor, cancel_scope: CancelScope, @@ -108,6 +117,7 @@ async def _invoke_non_context( ] = trio.TASK_STATUS_IGNORED, ): __tracebackhide__: bool = True + cs: CancelScope|None = None # ref when activated # TODO: can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): @@ -160,10 +170,6 @@ async def _invoke_non_context( functype='asyncgen', ) ) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above with cancel_scope as cs: ctx._scope = cs task_status.started(ctx) @@ -175,15 +181,13 @@ async def _invoke_non_context( await chan.send( Stop(cid=cid) ) + + # simplest function/method request-response pattern + # XXX: in the most minimally used case, just a scheduled internal runtime + # call to `Actor._cancel_task()` from the ctx-peer task since we + # don't (yet) have a dedicated IPC msg. + # ------ - ------ else: - # regular async function/method - # XXX: possibly just a scheduled `Actor._cancel_task()` - # from a remote request to cancel some `Context`. - # ------ - ------ - # TODO: ideally we unify this with the above `context=True` - # block such that for any remote invocation ftype, we - # always invoke the far end RPC task scheduling the same - # way: using the linked IPC context machinery. failed_resp: bool = False try: ack = StartAck( @@ -354,8 +358,14 @@ async def _errors_relayed_via_ipc( # channel. task_status.started(err) - # always reraise KBIs so they propagate at the sys-process level. - if isinstance(err, KeyboardInterrupt): + # always reraise KBIs so they propagate at the sys-process + # level. + # XXX LOL, except when running in asyncio mode XD + # cmon guys, wtf.. + if ( + isinstance(err, KeyboardInterrupt) + # and not actor.is_infected_aio() + ): raise # RPC task bookeeping. @@ -458,7 +468,6 @@ async def _invoke( # tb: TracebackType = None cancel_scope = CancelScope() - cs: CancelScope|None = None # ref when activated ctx = actor.get_context( chan=chan, cid=cid, @@ -607,6 +616,8 @@ async def _invoke( # `@context` marked RPC function. # - `._portal` is never set. try: + tn: trio.Nursery + rpc_ctx_cs: CancelScope async with ( trio.open_nursery() as tn, msgops.maybe_limit_plds( @@ -616,7 +627,7 @@ async def _invoke( ), ): ctx._scope_nursery = tn - ctx._scope = tn.cancel_scope + rpc_ctx_cs = ctx._scope = tn.cancel_scope task_status.started(ctx) # TODO: better `trionics` tooling: @@ -642,7 +653,7 @@ async def _invoke( # itself calls `ctx._maybe_cancel_and_set_remote_error()` # which cancels the scope presuming the input error # is not a `.cancel_acked` pleaser. - if ctx._scope.cancelled_caught: + if rpc_ctx_cs.cancelled_caught: our_uid: tuple = actor.uid # first check for and raise any remote error @@ -652,9 +663,7 @@ async def _invoke( if re := ctx._remote_error: ctx._maybe_raise_remote_err(re) - cs: CancelScope = ctx._scope - - if cs.cancel_called: + if rpc_ctx_cs.cancel_called: canceller: tuple = ctx.canceller explain: str = f'{ctx.side!r}-side task was cancelled by ' @@ -680,9 +689,15 @@ async def _invoke( elif canceller == ctx.chan.uid: explain += f'its {ctx.peer_side!r}-side peer' - else: + elif canceller == our_uid: + explain += 'itself' + + elif canceller: explain += 'a remote peer' + else: + explain += 'an unknown cause?' + explain += ( add_div(message=explain) + @@ -1238,7 +1253,7 @@ async def process_messages( 'Exiting IPC msg loop with final msg\n\n' f'<= peer: {chan.uid}\n' f' |_{chan}\n\n' - f'{pretty_struct.pformat(msg)}' + # f'{pretty_struct.pformat(msg)}' ) log.runtime(message) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 3cf35ff..604d1e6 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1046,6 +1046,10 @@ class Actor: # TODO: another `Struct` for rtvs.. rvs: dict[str, Any] = spawnspec._runtime_vars if rvs['_debug_mode']: + from .devx import ( + enable_stack_on_sig, + maybe_init_greenback, + ) try: # TODO: maybe return some status msgs upward # to that we can emit them in `con_status` @@ -1053,13 +1057,27 @@ class Actor: log.devx( 'Enabling `stackscope` traces on SIGUSR1' ) - from .devx import enable_stack_on_sig enable_stack_on_sig() + except ImportError: log.warning( '`stackscope` not installed for use in debug mode!' ) + if rvs.get('use_greenback', False): + maybe_mod: ModuleType|None = await maybe_init_greenback() + if maybe_mod: + log.devx( + 'Activated `greenback` ' + 'for `tractor.pause_from_sync()` support!' + ) + else: + rvs['use_greenback'] = False + log.warning( + '`greenback` not installed for use in debug mode!\n' + '`tractor.pause_from_sync()` not available!' + ) + rvs['_is_root'] = False _state._runtime_vars.update(rvs) @@ -1717,8 +1735,8 @@ async def async_main( # Register with the arbiter if we're told its addr log.runtime( - f'Registering `{actor.name}` ->\n' - f'{pformat(accept_addrs)}' + f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' + # ^-TODO-^ we should instead show the maddr here^^ ) # TODO: ideally we don't fan out to all registrars @@ -1776,9 +1794,15 @@ async def async_main( # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception as err: - log.runtime("Closing all actor lifetime contexts") - actor.lifetime_stack.close() + except Exception as internal_err: + # ls: ExitStack = actor.lifetime_stack + # log.cancel( + # 'Closing all actor-lifetime exec scopes\n\n' + # f'|_{ls}\n' + # ) + # # _debug.pause_from_sync() + # # await _debug.pause(shield=True) + # ls.close() if not is_registered: # TODO: I guess we could try to connect back @@ -1786,7 +1810,8 @@ async def async_main( # once we have that all working with std streams locking? log.exception( f"Actor errored and failed to register with arbiter " - f"@ {actor.reg_addrs[0]}?") + f"@ {actor.reg_addrs[0]}?" + ) log.error( "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" @@ -1799,25 +1824,44 @@ async def async_main( if actor._parent_chan: await try_ship_error_to_remote( actor._parent_chan, - err, + internal_err, ) # always! - match err: + match internal_err: case ContextCancelled(): log.cancel( f'Actor: {actor.uid} was task-context-cancelled with,\n' - f'str(err)' + f'str(internal_err)' ) case _: log.exception("Actor errored:") raise finally: - log.runtime( + teardown_msg: str = ( 'Runtime nursery complete' - '-> Closing all actor lifetime contexts..' ) + + ls: ExitStack = actor.lifetime_stack + cbs: list[Callable] = [ + repr(tup[1].__wrapped__) + for tup in ls._exit_callbacks + ] + if cbs: + cbs_str: str = '\n'.join(cbs) + teardown_msg += ( + '-> Closing all actor-lifetime callbacks\n\n' + f'|_{cbs_str}\n' + ) + # XXX NOTE XXX this will cause an error which + # prevents any `infected_aio` actor from continuing + # and any callbacks in the `ls` here WILL NOT be + # called!! + # await _debug.pause(shield=True) + + ls.close() + # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? actor.lifetime_stack.close() @@ -1856,23 +1900,28 @@ async def async_main( failed = True if failed: - log.warning( - f'Failed to unregister {actor.name} from ' - f'registar @ {addr}' + teardown_msg += ( + f'-> Failed to unregister {actor.name} from ' + f'registar @ {addr}\n' ) + # log.warning( # Ensure all peers (actors connected to us as clients) are finished if not actor._no_more_peers.is_set(): if any( chan.connected() for chan in chain(*actor._peers.values()) ): - log.runtime( - f"Waiting for remaining peers {actor._peers} to clear") + teardown_msg += ( + f'-> Waiting for remaining peers {actor._peers} to clear..\n' + ) + log.runtime(teardown_msg) with CancelScope(shield=True): await actor._no_more_peers.wait() - log.runtime("All peer channels are complete") - log.runtime("Runtime completed") + teardown_msg += ('-> All peer channels are complete\n') + + teardown_msg += ('Actor runtime completed') + log.info(teardown_msg) # TODO: rename to `Registry` and move to `._discovery`! diff --git a/tractor/_state.py b/tractor/_state.py index 8c5cca1..9f89600 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = { '_root_mailbox': (None, None), '_registry_addrs': [], - # for `breakpoint()` support + # for `tractor.pause_from_sync()` & `breakpoint()` support 'use_greenback': False, } diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 314a93b..5e1a6bf 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel): @property def ctx(self) -> Context: ''' - This stream's IPC `Context` ref. + A read-only ref to this stream's inter-actor-task `Context`. ''' return self._ctx diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 8f3574b..fb737c1 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -80,6 +80,7 @@ class ActorNursery: ''' def __init__( self, + # TODO: maybe def these as fields of a struct looking type? actor: Actor, ria_nursery: trio.Nursery, da_nursery: trio.Nursery, @@ -88,8 +89,10 @@ class ActorNursery: ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor - self._ria_nursery = ria_nursery + + # TODO: rename to `._tn` for our conventional "task-nursery" self._da_nursery = da_nursery + self._children: dict[ tuple[str, str], tuple[ @@ -98,15 +101,13 @@ class ActorNursery: Portal | None, ] ] = {} - # portals spawned with ``run_in_actor()`` are - # cancelled when their "main" result arrives - self._cancel_after_result_on_exit: set = set() + self.cancelled: bool = False self._join_procs = trio.Event() self._at_least_one_child_in_debug: bool = False self.errors = errors - self.exited = trio.Event() self._scope_error: BaseException|None = None + self.exited = trio.Event() # NOTE: when no explicit call is made to # `.open_root_actor()` by application code, @@ -116,6 +117,13 @@ class ActorNursery: # and syncing purposes to any actor opened nurseries. self._implicit_runtime_started: bool = False + # TODO: remove the `.run_in_actor()` API and thus this 2ndary + # nursery when that API get's moved outside this primitive! + self._ria_nursery = ria_nursery + # portals spawned with ``run_in_actor()`` are + # cancelled when their "main" result arrives + self._cancel_after_result_on_exit: set = set() + async def start_actor( self, name: str, @@ -126,10 +134,14 @@ class ActorNursery: rpc_module_paths: list[str]|None = None, enable_modules: list[str]|None = None, loglevel: str|None = None, # set log level per subactor - nursery: trio.Nursery|None = None, debug_mode: bool|None = None, infect_asyncio: bool = False, + # TODO: ideally we can rm this once we no longer have + # a `._ria_nursery` since the dependent APIs have been + # removed! + nursery: trio.Nursery|None = None, + ) -> Portal: ''' Start a (daemon) actor: an process that has no designated @@ -200,6 +212,7 @@ class ActorNursery: # |_ dynamic @context decoration on child side # |_ implicit `Portal.open_context() as (ctx, first):` # and `return first` on parent side. + # |_ mention how it's similar to `trio-parallel` API? # -[ ] use @api_frame on the wrapper async def run_in_actor( self, @@ -269,11 +282,14 @@ class ActorNursery: ) -> None: ''' - Cancel this nursery by instructing each subactor to cancel - itself and wait for all subactors to terminate. + Cancel this actor-nursery by instructing each subactor's + runtime to cancel and wait for all underlying sub-processes + to terminate. - If ``hard_killl`` is set to ``True`` then kill the processes - directly without any far end graceful ``trio`` cancellation. + If `hard_kill` is set then kill the processes directly using + the spawning-backend's API/OS-machinery without any attempt + at (graceful) `trio`-style cancellation using our + `Actor.cancel()`. ''' __runtimeframe__: int = 1 # noqa @@ -629,8 +645,12 @@ async def open_nursery( f'|_{an}\n' ) - # shutdown runtime if it was started if implicit_runtime: + # shutdown runtime if it was started and report noisly + # that we're did so. msg += '=> Shutting down actor runtime <=\n' + log.info(msg) - log.info(msg) + else: + # keep noise low during std operation. + log.runtime(msg) diff --git a/tractor/devx/__init__.py b/tractor/devx/__init__.py index ab9d2d1..86dd128 100644 --- a/tractor/devx/__init__.py +++ b/tractor/devx/__init__.py @@ -29,6 +29,7 @@ from ._debug import ( shield_sigint_handler as shield_sigint_handler, open_crash_handler as open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler, + maybe_init_greenback as maybe_init_greenback, post_mortem as post_mortem, mk_pdb as mk_pdb, ) diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index ccf57d6..b067275 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -69,6 +69,7 @@ from trio import ( import tractor from tractor.log import get_logger from tractor._context import Context +from tractor import _state from tractor._state import ( current_actor, is_root_process, @@ -87,9 +88,6 @@ if TYPE_CHECKING: from tractor._runtime import ( Actor, ) - from tractor.msg import ( - _codec, - ) log = get_logger(__name__) @@ -1599,11 +1597,13 @@ async def _pause( try: task: Task = current_task() except RuntimeError as rte: + __tracebackhide__: bool = False log.exception('Failed to get current task?') if actor.is_infected_aio(): + # mk_pdb().set_trace() raise RuntimeError( '`tractor.pause[_from_sync]()` not yet supported ' - 'for infected `asyncio` mode!' + 'directly (infected) `asyncio` tasks!' ) from rte raise @@ -2163,22 +2163,22 @@ def maybe_import_greenback( return False -async def maybe_init_greenback( - **kwargs, -) -> None|ModuleType: - - if mod := maybe_import_greenback(**kwargs): - await mod.ensure_portal() - log.devx( - '`greenback` portal opened!\n' - 'Sync debug support activated!\n' - ) - return mod +async def maybe_init_greenback(**kwargs) -> None|ModuleType: + try: + if mod := maybe_import_greenback(**kwargs): + await mod.ensure_portal() + log.devx( + '`greenback` portal opened!\n' + 'Sync debug support activated!\n' + ) + return mod + except BaseException: + log.exception('Failed to init `greenback`..') + raise return None - async def _pause_from_bg_root_thread( behalf_of_thread: Thread, repl: PdbREPL, @@ -2399,18 +2399,37 @@ def pause_from_sync( else: # we are presumably the `trio.run()` + main thread # raises on not-found by default greenback: ModuleType = maybe_import_greenback() + + # TODO: how to ensure this is either dynamically (if + # needed) called here (in some bg tn??) or that the + # subactor always already called it? + # greenback: ModuleType = await maybe_init_greenback() + message += f'-> imported {greenback}\n' repl_owner: Task = current_task() message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' - out = greenback.await_( - _pause( - debug_func=None, - repl=repl, - hide_tb=hide_tb, - called_from_sync=True, - **_pause_kwargs, + try: + out = greenback.await_( + _pause( + debug_func=None, + repl=repl, + hide_tb=hide_tb, + called_from_sync=True, + **_pause_kwargs, + ) ) - ) + except RuntimeError as rte: + if not _state._runtime_vars.get( + 'use_greenback', + False, + ): + raise RuntimeError( + '`greenback` was never initialized in this actor!?\n\n' + f'{_state._runtime_vars}\n' + ) from rte + + raise + if out: bg_task, repl = out assert repl is repl @@ -2801,10 +2820,10 @@ def open_crash_handler( `trio.run()`. ''' + err: BaseException try: yield except tuple(catch) as err: - if type(err) not in ignore: pdbp.xpm() diff --git a/tractor/devx/_frame_stack.py b/tractor/devx/_frame_stack.py index 89a9e84..8e9bf46 100644 --- a/tractor/devx/_frame_stack.py +++ b/tractor/devx/_frame_stack.py @@ -234,7 +234,7 @@ def find_caller_info( _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} -# TODO: -[x] move all this into new `.devx._code`! +# TODO: -[x] move all this into new `.devx._frame_stack`! # -[ ] consider rename to _callstack? # -[ ] prolly create a `@runtime_api` dec? # |_ @api_frame seems better? @@ -286,3 +286,18 @@ def api_frame( wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache wrapped.__api_func__: bool = True return wrapper(wrapped) + + +# TODO: something like this instead of the adhoc frame-unhiding +# blocks all over the runtime!! XD +# -[ ] ideally we can expect a certain error (set) and if something +# else is raised then all frames below the wrapped one will be +# un-hidden via `__tracebackhide__: bool = False`. +# |_ might need to dynamically mutate the code objs like +# `pdbp.hideframe()` does? +# -[ ] use this as a `@acm` decorator as introed in 3.10? +# @acm +# async def unhide_frame_when_not( +# error_set: set[BaseException], +# ) -> TracebackType: +# ... diff --git a/tractor/hilevel/__init__.py b/tractor/hilevel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py new file mode 100644 index 0000000..3ec7535 --- /dev/null +++ b/tractor/hilevel/_service.py @@ -0,0 +1,134 @@ +# tractor: structured concurrent "actors". +# Copyright 2024-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Daemon subactor as service(s) management and supervision primitives +and API. + +''' +from __future__ import annotations +from contextlib import ( + # asynccontextmanager as acm, + contextmanager as cm, +) +from collections import defaultdict +from typing import ( + Callable, + Any, +) + +import trio +from trio import TaskStatus +from tractor import ( + ActorNursery, + current_actor, + ContextCancelled, + Context, + Portal, +) + +from ._util import ( + log, # sub-sys logger +) + + +# TODO: implement a `@singleton` deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# -[ ] go through the options peeps on SO did? +# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python +# * including @mikenerone's answer +# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 +# +# -[ ] put it in `tractor.lowlevel._globals` ? +# * fits with our oustanding actor-local/global feat req? +# |_ https://github.com/goodboy/tractor/issues/55 +# * how can it relate to the `Actor.lifetime_stack` that was +# silently patched in? +# |_ we could implicitly call both of these in the same +# spot in the runtime using the lifetime stack? +# - `open_singleton_cm().__exit__()` +# -`del_singleton()` +# |_ gives SC fixtue semantics to sync code oriented around +# sub-process lifetime? +# * what about with `trio.RunVar`? +# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar +# - which we'll need for no-GIL cpython (right?) presuming +# multiple `trio.run()` calls in process? +# +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + +# a deletion API for explicit instance de-allocation? +# @open_service_mngr.deleter +# def del_service_mngr() -> None: +# mngr = open_service_mngr._singleton[0] +# open_service_mngr._singleton[0] = None +# del mngr + + + +# TODO: singleton factory API instead of a class API +@cm +def open_service_mngr( + *, + _singleton: list[ServiceMngr|None] = [None], + # NOTE; since default values for keyword-args are effectively + # module-vars/globals as per the note from, + # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values + # + # > "The default value is evaluated only once. This makes + # a difference when the default is a mutable object such as + # a list, dictionary, or instances of most classes" + # + **init_kwargs, + +) -> ServiceMngr: + ''' + Open a multi-subactor-as-service-daemon tree supervisor. + + The delivered `ServiceMngr` is a singleton instance for each + actor-process and is allocated on first open and never + de-allocated unless explicitly deleted by al call to + `del_service_mngr()`. + + ''' + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + else: + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + with mngr: + yield mngr + + diff --git a/tractor/log.py b/tractor/log.py index edb058e..47f1f25 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -54,11 +54,12 @@ LOG_FORMAT = ( DATE_FORMAT = '%b %d %H:%M:%S' # FYI, ERROR is 40 +# TODO: use a `bidict` to avoid the :155 check? CUSTOM_LEVELS: dict[str, int] = { 'TRANSPORT': 5, 'RUNTIME': 15, 'DEVX': 17, - 'CANCEL': 18, + 'CANCEL': 22, 'PDB': 500, } STD_PALETTE = { @@ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter): Delegate a log call to the underlying logger, after adding contextual information from this adapter instance. + NOTE: all custom level methods (above) delegate to this! + ''' if self.isEnabledFor(level): stacklevel: int = 3 diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index c1301bd..6d0e3b5 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -41,8 +41,10 @@ import textwrap from typing import ( Any, Callable, + Protocol, Type, TYPE_CHECKING, + TypeVar, Union, ) from types import ModuleType @@ -181,7 +183,11 @@ def mk_dec( dec_hook: Callable|None = None, ) -> MsgDec: + ''' + Create an IPC msg decoder, normally used as the + `PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`. + ''' return MsgDec( _dec=msgpack.Decoder( type=spec, # like `MsgType[Any]` @@ -227,6 +233,13 @@ def pformat_msgspec( join_char: str = '\n', ) -> str: + ''' + Pretty `str` format the `msgspec.msgpack.Decoder.type` attributed + for display in log messages as a nice (maybe multiline) + presentation of all the supported `Struct`s availed for typed + decoding. + + ''' dec: msgpack.Decoder = getattr(codec, 'dec', codec) return join_char.join( mk_msgspec_table( @@ -630,31 +643,57 @@ def limit_msg_spec( # # import pdbp; pdbp.set_trace() # assert ext_codec.pld_spec == extended_spec # yield ext_codec +# +# ^-TODO-^ is it impossible to make something like this orr!? + +# TODO: make an auto-custom hook generator from a set of input custom +# types? +# -[ ] below is a proto design using a `TypeCodec` idea? +# +# type var for the expected interchange-lib's +# IPC-transport type when not available as a built-in +# serialization output. +WireT = TypeVar('WireT') -# TODO: make something similar to this inside `._codec` such that -# user can just pass a type table of some sort? -# -[ ] we would need to decode all msgs to `pretty_struct.Struct` -# and then call `.to_dict()` on them? -# -[x] we're going to need to re-impl all the stuff changed in the -# runtime port such that it can handle dicts or `Msg`s? -# -# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: -# ''' -# Deliver a `enc_hook()`/`dec_hook()` pair which does -# manual convertion from our above native `Msg` set -# to `dict` equivalent (wire msgs) in order to keep legacy compat -# with the original runtime implementation. -# -# Note: this is is/was primarly used while moving the core -# runtime over to using native `Msg`-struct types wherein we -# start with the send side emitting without loading -# a typed-decoder and then later flipping the switch over to -# load to the native struct types once all runtime usage has -# been adjusted appropriately. -# -# ''' -# return ( -# # enc_to_dict, -# dec_from_dict, -# ) +# TODO: some kinda (decorator) API for built-in subtypes +# that builds this implicitly by inspecting the `mro()`? +class TypeCodec(Protocol): + ''' + A per-custom-type wire-transport serialization translator + description type. + + ''' + src_type: Type + wire_type: WireT + + def encode(obj: Type) -> WireT: + ... + + def decode( + obj_type: Type[WireT], + obj: WireT, + ) -> Type: + ... + + +class MsgpackTypeCodec(TypeCodec): + ... + + +def mk_codec_hooks( + type_codecs: list[TypeCodec], + +) -> tuple[Callable, Callable]: + ''' + Deliver a `enc_hook()`/`dec_hook()` pair which handle + manual convertion from an input `Type` set such that whenever + the `TypeCodec.filter()` predicate matches the + `TypeCodec.decode()` is called on the input native object by + the `dec_hook()` and whenever the + `isiinstance(obj, TypeCodec.type)` matches against an + `enc_hook(obj=obj)` the return value is taken from a + `TypeCodec.encode(obj)` callback. + + ''' + ... diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 91c0dde..d397ee4 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -580,12 +580,15 @@ async def drain_to_final_msg( # 2. WE DID NOT REQUEST that cancel and thus # SHOULD RAISE HERE! except trio.Cancelled as taskc: + # from tractor.devx._debug import pause + # await pause(shield=True) # CASE 2: mask the local cancelled-error(s) # only when we are sure the remote error is # the source cause of this local task's # cancellation. ctx.maybe_raise( + hide_tb=hide_tb, # TODO: when use this/ # from_src_exc=taskc, ) diff --git a/tractor/msg/pretty_struct.py b/tractor/msg/pretty_struct.py index f27fb89..15e469e 100644 --- a/tractor/msg/pretty_struct.py +++ b/tractor/msg/pretty_struct.py @@ -34,6 +34,9 @@ from pprint import ( saferepr, ) +from tractor.log import get_logger + +log = get_logger() # TODO: auto-gen type sig for input func both for # type-msgs and logging of RPC tasks? # taken and modified from: @@ -143,7 +146,13 @@ def pformat( else: # the `pprint` recursion-safe format: # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr - val_str: str = saferepr(v) + try: + val_str: str = saferepr(v) + except Exception: + log.exception( + 'Failed to `saferepr({type(struct)})` !?\n' + ) + return _Struct.__repr__(struct) # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') @@ -194,12 +203,20 @@ class Struct( return sin_props pformat = pformat + # __repr__ = pformat # __str__ = __repr__ = pformat # TODO: use a pprint.PrettyPrinter instance around ONLY rendering # inside a known tty? # def __repr__(self) -> str: # ... - __repr__ = pformat + def __repr__(self) -> str: + try: + return pformat(self) + except Exception: + log.exception( + f'Failed to `pformat({type(self)})` !?\n' + ) + return _Struct.__repr__(self) def copy( self, diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index d1451b4..e041721 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -18,11 +18,13 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' +from __future__ import annotations import asyncio from asyncio.exceptions import CancelledError from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect +import traceback from typing import ( Any, Callable, @@ -30,20 +32,21 @@ from typing import ( Awaitable, ) -import trio -from outcome import Error - -from tractor.log import get_logger +import tractor from tractor._state import ( - current_actor, debug_mode, ) +from tractor.log import get_logger from tractor.devx import _debug -from tractor._exceptions import AsyncioCancelled from tractor.trionics._broadcast import ( broadcast_receiver, BroadcastReceiver, ) +import trio +from outcome import ( + Error, + Outcome, +) log = get_logger(__name__) @@ -161,7 +164,7 @@ def _run_asyncio_task( ''' __tracebackhide__ = True - if not current_actor().is_infected_aio(): + if not tractor.current_actor().is_infected_aio(): raise RuntimeError( "`infect_asyncio` mode is not enabled!?" ) @@ -172,7 +175,6 @@ def _run_asyncio_task( to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore args = tuple(inspect.getfullargspec(func).args) - if getattr(func, '_tractor_steam_function', None): # the assumption is that the target async routine accepts the # send channel then it intends to yield more then one return @@ -346,13 +348,22 @@ def _run_asyncio_task( # on a checkpoint. cancel_scope.cancel() - # raise any ``asyncio`` side error. + # raise any `asyncio` side error. raise aio_err task.add_done_callback(cancel_trio) return chan +class AsyncioCancelled(CancelledError): + ''' + Asyncio cancelled translation (non-base) error + for use with the ``to_asyncio`` module + to be raised in the ``trio`` side task + + ''' + + @acm async def translate_aio_errors( @@ -516,7 +527,6 @@ async def open_channel_from( def run_as_asyncio_guest( - trio_main: Callable, ) -> None: @@ -548,6 +558,11 @@ def run_as_asyncio_guest( loop = asyncio.get_running_loop() trio_done_fut = asyncio.Future() + startup_msg: str = ( + 'Starting `asyncio` guest-loop-run\n' + '-> got running loop\n' + '-> built a `trio`-done future\n' + ) if debug_mode(): # XXX make it obvi we know this isn't supported yet! @@ -562,34 +577,120 @@ def run_as_asyncio_guest( def trio_done_callback(main_outcome): if isinstance(main_outcome, Error): - error = main_outcome.error + error: BaseException = main_outcome.error + + # show an dedicated `asyncio`-side tb from the error + tb_str: str = ''.join(traceback.format_exception(error)) + log.exception( + 'Guest-run errored!?\n\n' + f'{main_outcome}\n' + f'{error}\n\n' + f'{tb_str}\n' + ) trio_done_fut.set_exception(error) - # TODO: explicit asyncio tb? - # traceback.print_exception(error) - - # XXX: do we need this? - # actor.cancel_soon() - + # raise inline main_outcome.unwrap() + else: trio_done_fut.set_result(main_outcome) - log.runtime(f"trio_main finished: {main_outcome!r}") + log.runtime(f'trio_main finished: {main_outcome!r}') + + startup_msg += ( + f'-> created {trio_done_callback!r}\n' + f'-> scheduling `trio_main`: {trio_main!r}\n' + ) # start the infection: run trio on the asyncio loop in "guest mode" log.runtime( - 'Infecting `asyncio`-process with a `trio` guest-run of\n\n' - f'{trio_main!r}\n\n' - - f'{trio_done_callback}\n' + f'{startup_msg}\n\n' + + + 'Infecting `asyncio`-process with a `trio` guest-run!\n' ) + trio.lowlevel.start_guest_run( trio_main, run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) - # NOTE `.unwrap()` will raise on error - return (await trio_done_fut).unwrap() + try: + # TODO: better SIGINT handling since shielding seems to + # make NO DIFFERENCE XD + # -[ ] maybe this is due to 3.11's recent SIGINT handling + # changes and we can better work with/around it? + # https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption + out: Outcome = await asyncio.shield(trio_done_fut) + # NOTE `Error.unwrap()` will raise + return out.unwrap() + + except asyncio.CancelledError: + actor: tractor.Actor = tractor.current_actor() + log.exception( + '`asyncio`-side main task was cancelled!\n' + 'Cancelling actor-runtime..\n' + f'c)>\n' + f' |_{actor}.cancel_soon()\n' + + ) + + # XXX NOTE XXX the next LOC is super important!!! + # => without it, we can get a guest-run abandonment case + # where asyncio will not trigger `trio` in a final event + # loop cycle! + # + # our test, + # `test_infected_asyncio.test_sigint_closes_lifetime_stack()` + # demonstrates how if when we raise a SIGINT-signal in an infected + # child we get a variable race condition outcome where + # either of the following can indeterminately happen, + # + # - "silent-abandon": `asyncio` abandons the `trio` + # guest-run task silently and no `trio`-guest-run or + # `tractor`-actor-runtime teardown happens whatsoever.. + # this is the WORST (race) case outcome. + # + # - OR, "loud-abandon": the guest run get's abaondoned "loudly" with + # `trio` reporting a console traceback and further tbs of all + # the failed shutdown routines also show on console.. + # + # our test can thus fail and (has been parametrized for) + # the 2 cases: + # + # - when the parent raises a KBI just after + # signalling the child, + # |_silent-abandon => the `Actor.lifetime_stack` will + # never be closed thus leaking a resource! + # -> FAIL! + # |_loud-abandon => despite the abandonment at least the + # stack will be closed out.. + # -> PASS + # + # - when the parent instead simply waits on `ctx.wait_for_result()` + # (i.e. DOES not raise a KBI itself), + # |_silent-abandon => test will just hang and thus the ctx + # and actor will never be closed/cancelled/shutdown + # resulting in leaking a (file) resource since the + # `trio`/`tractor` runtime never relays a ctxc back to + # the parent; the test's timeout will trigger.. + # -> FAIL! + # |_loud-abandon => this case seems to never happen?? + # + # XXX FIRST PART XXX, SO, this is a fix to the + # "silent-abandon" case, NOT the `trio`-guest-run + # abandonment issue in general, for which the NEXT LOC + # is apparently a working fix! + actor.cancel_soon() + + # XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to + # `trio`-guest-run to complete and teardown !! + # + # XXX WITHOUT THIS the guest-run gets race-conditionally + # abandoned by `asyncio`!! + # XD XD XD + await asyncio.shield( + asyncio.sleep(.1) # NOPE! it can't be 0 either XD + ) + raise # might as well if it's installed. try: @@ -599,4 +700,6 @@ def run_as_asyncio_guest( except ImportError: pass - return asyncio.run(aio_main(trio_main)) + return asyncio.run( + aio_main(trio_main), + ) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index a5d3187..977b682 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -156,11 +156,12 @@ class BroadcastState(Struct): class BroadcastReceiver(ReceiveChannel): ''' - A memory receive channel broadcaster which is non-lossy for the - fastest consumer. + A memory receive channel broadcaster which is non-lossy for + the fastest consumer. - Additional consumer tasks can receive all produced values by registering - with ``.subscribe()`` and receiving from the new instance it delivers. + Additional consumer tasks can receive all produced values by + registering with ``.subscribe()`` and receiving from the new + instance it delivers. ''' def __init__( diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 08e70ad..fd224d6 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -18,8 +18,12 @@ Async context manager primitives with hard ``trio``-aware semantics ''' -from contextlib import asynccontextmanager as acm +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, +) import inspect +from types import ModuleType from typing import ( Any, AsyncContextManager, @@ -30,13 +34,16 @@ from typing import ( Optional, Sequence, TypeVar, + TYPE_CHECKING, ) import trio - from tractor._state import current_actor from tractor.log import get_logger +if TYPE_CHECKING: + from tractor import ActorNursery + log = get_logger(__name__) @@ -46,8 +53,10 @@ T = TypeVar("T") @acm async def maybe_open_nursery( - nursery: trio.Nursery | None = None, + nursery: trio.Nursery|ActorNursery|None = None, shield: bool = False, + lib: ModuleType = trio, + ) -> AsyncGenerator[trio.Nursery, Any]: ''' Create a new nursery if None provided. @@ -58,13 +67,12 @@ async def maybe_open_nursery( if nursery is not None: yield nursery else: - async with trio.open_nursery() as nursery: + async with lib.open_nursery() as nursery: nursery.cancel_scope.shield = shield yield nursery async def _enter_and_wait( - mngr: AsyncContextManager[T], unwrapped: dict[int, T], all_entered: trio.Event, @@ -91,7 +99,6 @@ async def _enter_and_wait( @acm async def gather_contexts( - mngrs: Sequence[AsyncContextManager[T]], ) -> AsyncGenerator[ @@ -102,15 +109,17 @@ async def gather_contexts( None, ]: ''' - Concurrently enter a sequence of async context managers, each in - a separate ``trio`` task and deliver the unwrapped values in the - same order once all managers have entered. On exit all contexts are - subsequently and concurrently exited. + Concurrently enter a sequence of async context managers (acms), + each from a separate `trio` task and deliver the unwrapped + `yield`-ed values in the same order once all managers have entered. - This function is somewhat similar to common usage of - ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in - combo with ``asyncio.gather()`` except the managers are concurrently - entered and exited, and cancellation just works. + On exit, all acms are subsequently and concurrently exited. + + This function is somewhat similar to a batch of non-blocking + calls to `contextlib.AsyncExitStack.enter_async_context()` + (inside a loop) *in combo with* a `asyncio.gather()` to get the + `.__aenter__()`-ed values, except the managers are both + concurrently entered and exited and *cancellation just works*(R). ''' seed: int = id(mngrs) @@ -210,9 +219,10 @@ async def maybe_open_context( ) -> AsyncIterator[tuple[bool, T]]: ''' - Maybe open a context manager if there is not already a _Cached - version for the provided ``key`` for *this* actor. Return the - _Cached instance on a _Cache hit. + Maybe open an async-context-manager (acm) if there is not already + a `_Cached` version for the provided (input) `key` for *this* actor. + + Return the `_Cached` instance on a _Cache hit. ''' fid = id(acm_func) @@ -273,8 +283,13 @@ async def maybe_open_context( else: _Cache.users += 1 log.runtime( - f'Reusing resource for `_Cache` user {_Cache.users}\n\n' - f'{ctx_key!r} -> {yielded!r}\n' + f'Re-using cached resource for user {_Cache.users}\n\n' + f'{ctx_key!r} -> {type(yielded)}\n' + + # TODO: make this work with values but without + # `msgspec.Struct` causing frickin crashes on field-type + # lookups.. + # f'{ctx_key!r} -> {yielded!r}\n' ) lock.release() yield True, yielded