From 284fa0340e79f056df083950854d8d64ac41eda8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 Jun 2024 13:52:19 -0400 Subject: [PATCH] Hack `asyncio` to not abandon a guest-mode run? Took me a while to figure out what the heck was going on but, turns out `asyncio` changed their SIGINT handling in 3.11 as per: https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption I'm not entirely sure if it's the 3.11 changes or possibly wtv further updates were made in 3.12 but more or less due to the way our current main task was written the `trio` guest-run was getting abandoned on SIGINTs sent from the OS to the infected child proc.. Note that much of the bug and soln cases are layed out in very detailed comment-notes both in the new test and `run_as_asyncio_guest()`, right above the final "fix" lines. Add new `test_infected_aio.test_sigint_closes_lifetime_stack()` test suite which reliably triggers all abandonment issues with multiple cases of different parent behaviour post-sending-SIGINT-to-child: 1. briefly sleep then raise a KBI in the parent which was originally demonstrating the file leak not being cleaned up by `Actor.lifetime_stack.close()` and simulates a ctl-c from the console (relayed in tandem by the OS to the parent and child processes). 2. do `Context.wait_for_result()` on the child context which would hang and timeout since the actor runtime would never complete and thus never relay a `ContextCancelled`. 3. both with and without running a `asyncio` task in the `manage_file` child actor; originally it seemed that with an aio task scheduled in the child actor the guest-run abandonment always was the "loud" case where there seemed to be some actor teardown but with tbs from python failing to gracefully exit the `trio` runtime.. The (seemingly working) "fix" required 2 lines of code to be run inside a `asyncio.CancelledError` handler around the call to `await trio_done_fut`: - `Actor.cancel_soon()` which schedules the actor runtime to cancel on the next `trio` runner cycle and results in a "self cancellation" of the actor. - "pumping the `asyncio` event loop" with a non-0 `.sleep(0.1)` XD |_ seems that a "shielded" pump with some actual `delay: float >= 0` did the trick to get `asyncio` to allow the `trio` runner/loop to fully complete its guest-run without abandonment. Other supporting changes: - move `._exceptions.AsyncioCancelled`, our renamed `asyncio.CancelledError` error-sub-type-wrapper, to `.to_asyncio` and make it derive from `CancelledError` so as to be sure when raised by our `asyncio` x-> `trio` exception relay machinery that `asyncio` is getting the specific type it expects during cancellation. - do "summary status" style logging in `run_as_asyncio_guest()` wherein we compile the eventual `startup_msg: str` emitted just before waiting on the `trio_done_fut`. - shield-wait with `out: Outcome = await asyncio.shield(trio_done_fut)` even though it seems to do nothing in the SIGINT handling case..(I presume it might help avoid abandonment in a `asyncio.Task.cancel()` case maybe?) --- tests/conftest.py | 16 +++ tests/test_caps_based_msging.py | 41 +++--- tests/test_debugger.py | 17 +++ tests/test_docs_examples.py | 2 +- tests/test_infected_asyncio.py | 225 ++++++++++++++++++++++++++++++-- tractor/_context.py | 99 ++++++++++---- tractor/_discovery.py | 12 +- tractor/_entry.py | 83 +++++++++--- tractor/_exceptions.py | 13 +- tractor/_ipc.py | 53 ++++++-- tractor/_portal.py | 42 +++++- tractor/_root.py | 30 +++-- tractor/_rpc.py | 59 +++++---- tractor/_runtime.py | 87 +++++++++--- tractor/_state.py | 2 +- tractor/_streaming.py | 2 +- tractor/_supervise.py | 44 +++++-- tractor/devx/__init__.py | 1 + tractor/devx/_debug.py | 69 ++++++---- tractor/devx/_frame_stack.py | 17 ++- tractor/hilevel/__init__.py | 0 tractor/hilevel/_service.py | 134 +++++++++++++++++++ tractor/log.py | 5 +- tractor/msg/_codec.py | 91 +++++++++---- tractor/msg/_ops.py | 3 + tractor/msg/pretty_struct.py | 21 ++- tractor/to_asyncio.py | 153 ++++++++++++++++++---- tractor/trionics/_broadcast.py | 9 +- tractor/trionics/_mngrs.py | 53 +++++--- 29 files changed, 1098 insertions(+), 285 deletions(-) create mode 100644 tractor/hilevel/__init__.py create mode 100644 tractor/hilevel/_service.py diff --git a/tests/conftest.py b/tests/conftest.py index 5ce8442..7f53cbd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ """ ``tractor`` testing!! """ +from functools import partial import sys import subprocess import os @@ -8,6 +9,9 @@ import random import signal import platform import time +from typing import ( + AsyncContextManager, +) import pytest import tractor @@ -150,6 +154,18 @@ def pytest_generate_tests(metafunc): metafunc.parametrize("start_method", [spawn_backend], scope='module') +# TODO: a way to let test scripts (like from `examples/`) +# guarantee they won't registry addr collide! +@pytest.fixture +def open_test_runtime( + reg_addr: tuple, +) -> AsyncContextManager: + return partial( + tractor.open_nursery, + registry_addrs=[reg_addr], + ) + + def sig_prog(proc, sig): "Kill the actor-process with ``sig``." proc.send_signal(sig) diff --git a/tests/test_caps_based_msging.py b/tests/test_caps_based_msging.py index 9a73ba8..ea626a5 100644 --- a/tests/test_caps_based_msging.py +++ b/tests/test_caps_based_msging.py @@ -41,7 +41,7 @@ from tractor.msg import ( from tractor.msg.types import ( _payload_msgs, log, - Msg, + PayloadMsg, Started, mk_msg_spec, ) @@ -61,7 +61,7 @@ def mk_custom_codec( uid: tuple[str, str] = tractor.current_actor().uid # XXX NOTE XXX: despite defining `NamespacePath` as a type - # field on our `Msg.pld`, we still need a enc/dec_hook() pair + # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair # to cast to/from that type on the wire. See the docs: # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types @@ -321,12 +321,12 @@ def dec_type_union( import importlib types: list[Type] = [] for type_name in type_names: - for ns in [ + for mod in [ typing, importlib.import_module(__name__), ]: if type_ref := getattr( - ns, + mod, type_name, False, ): @@ -744,7 +744,7 @@ def chk_pld_type( # 'Error', .pld: ErrorData codec: MsgCodec = mk_codec( - # NOTE: this ONLY accepts `Msg.pld` fields of a specified + # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified # type union. ipc_pld_spec=payload_spec, ) @@ -752,7 +752,7 @@ def chk_pld_type( # make a one-off dec to compare with our `MsgCodec` instance # which does the below `mk_msg_spec()` call internally ipc_msg_spec: Union[Type[Struct]] - msg_types: list[Msg[payload_spec]] + msg_types: list[PayloadMsg[payload_spec]] ( ipc_msg_spec, msg_types, @@ -761,7 +761,7 @@ def chk_pld_type( ) _enc = msgpack.Encoder() _dec = msgpack.Decoder( - type=ipc_msg_spec or Any, # like `Msg[Any]` + type=ipc_msg_spec or Any, # like `PayloadMsg[Any]` ) assert ( @@ -806,7 +806,7 @@ def chk_pld_type( 'cid': '666', 'pld': pld, } - enc_msg: Msg = typedef(**kwargs) + enc_msg: PayloadMsg = typedef(**kwargs) _wire_bytes: bytes = _enc.encode(enc_msg) wire_bytes: bytes = codec.enc.encode(enc_msg) @@ -883,25 +883,16 @@ def test_limit_msgspec(): debug_mode=True ): - # ensure we can round-trip a boxing `Msg` + # ensure we can round-trip a boxing `PayloadMsg` assert chk_pld_type( - # Msg, - Any, - None, + payload_spec=Any, + pld=None, expect_roundtrip=True, ) - # TODO: don't need this any more right since - # `msgspec>=0.15` has the nice generics stuff yah?? - # - # manually override the type annot of the payload - # field and ensure it propagates to all msg-subtypes. - # Msg.__annotations__['pld'] = Any - # verify that a mis-typed payload value won't decode assert not chk_pld_type( - # Msg, - int, + payload_spec=int, pld='doggy', ) @@ -913,18 +904,16 @@ def test_limit_msgspec(): value: Any assert not chk_pld_type( - # Msg, - CustomPayload, + payload_spec=CustomPayload, pld='doggy', ) assert chk_pld_type( - # Msg, - CustomPayload, + payload_spec=CustomPayload, pld=CustomPayload(name='doggy', value='urmom') ) - # uhh bc we can `.pause_from_sync()` now! :surfer: + # yah, we can `.pause_from_sync()` now! # breakpoint() trio.run(main) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 43dadbb..71c691a 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -1336,6 +1336,23 @@ def test_shield_pause( child.expect(pexpect.EOF) +# TODO: better error for "non-ideal" usage from the root actor. +# -[ ] if called from an async scope emit a message that suggests +# using `await tractor.pause()` instead since it's less overhead +# (in terms of `greenback` and/or extra threads) and if it's from +# a sync scope suggest that usage must first call +# `ensure_portal()` in the (eventual parent) async calling scope? +def test_sync_pause_from_bg_task_in_root_actor_(): + ''' + When used from the root actor, normally we can only implicitly + support `.pause_from_sync()` from the main-parent-task (that + opens the runtime via `open_root_actor()`) since `greenback` + requires a `.ensure_portal()` call per `trio.Task` where it is + used. + + ''' + ... + # TODO: needs ANSI code stripping tho, see `assert_before()` # above! def test_correct_frames_below_hidden(): ''' diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 79a2200..3a1d2f2 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -19,7 +19,7 @@ from tractor._testing import ( @pytest.fixture def run_example_in_subproc( loglevel: str, - testdir, + testdir: pytest.Testdir, reg_addr: tuple[str, int], ): diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 45722a6..8d4697f 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -2,16 +2,25 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'. ''' -from typing import Optional, Iterable, Union import asyncio import builtins +from contextlib import ExitStack import itertools import importlib +import os +from pathlib import Path +import signal +from typing import ( + Callable, + Iterable, + Union, +) import pytest import trio import tractor from tractor import ( + current_actor, to_asyncio, RemoteActorError, ContextCancelled, @@ -25,8 +34,8 @@ async def sleep_and_err( # just signature placeholders for compat with # ``to_asyncio.open_channel_from()`` - to_trio: Optional[trio.MemorySendChannel] = None, - from_trio: Optional[asyncio.Queue] = None, + to_trio: trio.MemorySendChannel|None = None, + from_trio: asyncio.Queue|None = None, ): if to_trio: @@ -36,7 +45,7 @@ async def sleep_and_err( assert 0 -async def sleep_forever(): +async def aio_sleep_forever(): await asyncio.sleep(float('inf')) @@ -44,7 +53,7 @@ async def trio_cancels_single_aio_task(): # spawn an ``asyncio`` task to run a func and return result with trio.move_on_after(.2): - await tractor.to_asyncio.run_task(sleep_forever) + await tractor.to_asyncio.run_task(aio_sleep_forever) def test_trio_cancels_aio_on_actor_side(reg_addr): @@ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr): async def asyncio_actor( - target: str, expect_err: Exception|None = None ) -> None: assert tractor.current_actor().is_infected_aio() - target = globals()[target] + target: Callable = globals()[target] if '.' in expect_err: modpath, _, name = expect_err.rpartition('.') @@ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr): async with tractor.open_nursery() as n: portal = await n.run_in_actor( asyncio_actor, - target='sleep_forever', + target='aio_sleep_forever', expect_err='trio.Cancelled', infect_asyncio=True, ) @@ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr): async with tractor.open_nursery() as n: await n.run_in_actor( asyncio_actor, - target='sleep_forever', + target='aio_sleep_forever', expect_err='trio.Cancelled', infect_asyncio=True, ) @@ -195,7 +203,7 @@ async def trio_ctx( # spawn another asyncio task for the cuck of it. n.start_soon( tractor.to_asyncio.run_task, - sleep_forever, + aio_sleep_forever, ) await trio.sleep_forever() @@ -285,7 +293,7 @@ async def aio_cancel(): # cancel and enter sleep task.cancel() - await sleep_forever() + await aio_sleep_forever() def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): @@ -355,7 +363,6 @@ async def push_from_aio_task( async def stream_from_aio( - exit_early: bool = False, raise_err: bool = False, aio_raise_err: bool = False, @@ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics( trio.run(main) + +@tractor.context +async def manage_file( + ctx: tractor.Context, + tmp_path_str: str, + bg_aio_task: bool = False, +): + ''' + Start an `asyncio` task that just sleeps after registering a context + with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree + and ensure the stack is closed in the infected mode child. + + To verify the teardown state just write a tmpfile to the `testdir` + and delete it on actor close. + + ''' + + tmp_path: Path = Path(tmp_path_str) + tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file' + + # create a the tmp file and tell the parent where it's at + assert not tmp_file.is_file() + tmp_file.touch() + + stack: ExitStack = current_actor().lifetime_stack + stack.callback(tmp_file.unlink) + + await ctx.started(( + str(tmp_file), + os.getpid(), + )) + + # expect to be cancelled from here! + try: + + # NOTE: turns out you don't even need to sched an aio task + # since the original issue, even though seemingly was due to + # the guest-run being abandoned + a `._debug.pause()` inside + # `._runtime._async_main()` (which was originally trying to + # debug the `.lifetime_stack` not closing), IS NOT actually + # the core issue? + # + # further notes: + # + # - `trio` only issues the " RuntimeWarning: Trio guest run + # got abandoned without properly finishing... weird stuff + # might happen" IFF you DO run a asyncio task here, BUT + # - the original issue of the `.lifetime_stack` not closing + # will still happen even if you don't run an `asyncio` task + # here even though the "abandon" messgage won't be shown.. + # + # => ????? honestly i'm lost but it seems to be some issue + # with `asyncio` and SIGINT.. + # + # XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and + # `run_as_asyncio_guest()` is written WITHOUT THE + # `.cancel_soon()` soln, both of these tests will pass ?? + # so maybe it has something to do with `asyncio` loop init + # state?!? + # honestly, this REALLY reminds me why i haven't used + # `asyncio` by choice in years.. XD + # + # await tractor.to_asyncio.run_task(aio_sleep_forever) + if bg_aio_task: + async with trio.open_nursery() as tn: + tn.start_soon( + tractor.to_asyncio.run_task, + aio_sleep_forever, + ) + + await trio.sleep_forever() + + # signalled manually at the OS level (aka KBI) by the parent actor. + except KeyboardInterrupt: + print('child raised KBI..') + assert tmp_file.exists() + raise + else: + raise RuntimeError('shoulda received a KBI?') + + +@pytest.mark.parametrize( + 'bg_aio_task', + [ + False, + + # NOTE: (and see notes in `manage_file()` above as well) if + # we FOR SURE SPAWN AN AIO TASK in the child it seems the + # "silent-abandon" case (as is described in detail in + # `to_asyncio.run_as_asyncio_guest()`) does not happen and + # `asyncio`'s loop will at least abandon the `trio` side + # loudly? .. prolly the state-spot to start looking for + # a soln that results in NO ABANDONMENT.. XD + True, + ], + ids=[ + 'bg_aio_task', + 'just_trio_slee', + ], +) +@pytest.mark.parametrize( + 'wait_for_ctx', + [ + False, + True, + ], + ids=[ + 'raise_KBI_in_rent', + 'wait_for_ctx', + ], +) +def test_sigint_closes_lifetime_stack( + tmp_path: Path, + wait_for_ctx: bool, + bg_aio_task: bool, +): + ''' + Ensure that an infected child can use the `Actor.lifetime_stack` + to make a file on boot and it's automatically cleaned up by the + actor-lifetime-linked exit stack closure. + + ''' + async def main(): + try: + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'file_mngr', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + manage_file, + tmp_path_str=str(tmp_path), + bg_aio_task=bg_aio_task, + ) as (ctx, first): + + path_str, cpid = first + tmp_file: Path = Path(path_str) + assert tmp_file.exists() + + # XXX originally to simulate what (hopefully) + # the below now triggers.. had to manually + # trigger a SIGINT from a ctl-c in the root. + # await trio.sleep_forever() + + # XXX NOTE XXX signal infected-`asyncio` child to + # OS-cancel with SIGINT; this should trigger the + # bad `asyncio` cancel behaviour that can cause + # a guest-run abandon as was seen causing + # shm-buffer leaks in `piker`'s live quote stream + # susbys! + # + # await trio.sleep(.5) + await trio.sleep(.2) + os.kill( + cpid, + signal.SIGINT, + ) + + # XXX CASE 1: without the bug fixed, in + # the non-KBI-raised-in-parent case, this + # timeout should trigger! + if wait_for_ctx: + print('waiting for ctx outcome in parent..') + try: + with trio.fail_after(.7): + await ctx.wait_for_result() + except tractor.ContextCancelled as ctxc: + assert ctxc.canceller == ctx.chan.uid + raise + + # XXX CASE 2: this seems to be the source of the + # original issue which exhibited BEFORE we put + # a `Actor.cancel_soon()` inside + # `run_as_asyncio_guest()`.. + else: + raise KeyboardInterrupt + + pytest.fail('should have raised some kinda error?!?') + + except ( + KeyboardInterrupt, + ContextCancelled, + ): + # XXX CASE 2: without the bug fixed, in the + # KBI-raised-in-parent case, the actor teardown should + # never get run (silently abaondoned by `asyncio`..) and + # thus the file should leak! + assert not tmp_file.exists() + assert ctx.maybe_error + + trio.run(main) + + # TODO: debug_mode tests once we get support for `asyncio`! # # -[ ] need tests to wrap both scripts: diff --git a/tractor/_context.py b/tractor/_context.py index 32acf83..9504d6f 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -121,10 +121,19 @@ class Unresolved: @dataclass class Context: ''' - An inter-actor, SC transitive, `Task` communication context. + An inter-actor, SC transitive, `trio.Task` (pair) + communication context. - NB: This class should **never be instatiated directly**, it is allocated - by the runtime in 2 ways: + (We've also considered other names and ideas: + - "communicating tasks scope": cts + - "distributed task scope": dts + - "communicating tasks context": ctc + + **Got a better idea for naming? Make an issue dawg!** + ) + + NB: This class should **never be instatiated directly**, it is + allocated by the runtime in 2 ways: - by entering `Portal.open_context()` which is the primary public API for any "parent" task or, - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg @@ -210,6 +219,16 @@ class Context: # more the the `Context` is needed? _portal: Portal | None = None + @property + def portal(self) -> Portal|None: + ''' + Return any wrapping memory-`Portal` if this is + a 'parent'-side task which called `Portal.open_context()`, + otherwise `None`. + + ''' + return self._portal + # NOTE: each side of the context has its own cancel scope # which is exactly the primitive that allows for # cross-actor-task-supervision and thus SC. @@ -299,6 +318,8 @@ class Context: # boxed exception. NOW, it's used for spawning overrun queuing # tasks when `.allow_overruns == True` !!! _scope_nursery: trio.Nursery|None = None + # ^-TODO-^ change name? + # -> `._scope_tn` "scope task nursery" # streaming overrun state tracking _in_overrun: bool = False @@ -408,10 +429,23 @@ class Context: ''' return self._cancel_called + @cancel_called.setter + def cancel_called(self, val: bool) -> None: + ''' + Set the self-cancelled request `bool` value. + + ''' + # to debug who frickin sets it.. + # if val: + # from .devx import pause_from_sync + # pause_from_sync() + + self._cancel_called = val + @property def canceller(self) -> tuple[str, str]|None: ''' - ``Actor.uid: tuple[str, str]`` of the (remote) + `Actor.uid: tuple[str, str]` of the (remote) actor-process who's task was cancelled thus causing this (side of the) context to also be cancelled. @@ -515,7 +549,7 @@ class Context: # the local scope was never cancelled # and instead likely we received a remote side - # # cancellation that was raised inside `.result()` + # # cancellation that was raised inside `.wait_for_result()` # or ( # (se := self._local_error) # and se is re @@ -585,6 +619,8 @@ class Context: self, error: BaseException, + set_cancel_called: bool = False, + ) -> None: ''' (Maybe) cancel this local scope due to a received remote @@ -603,7 +639,7 @@ class Context: - `Portal.open_context()` - `Portal.result()` - `Context.open_stream()` - - `Context.result()` + - `Context.wait_for_result()` when called/closed by actor local task(s). @@ -729,7 +765,7 @@ class Context: # Cancel the local `._scope`, catch that # `._scope.cancelled_caught` and re-raise any remote error - # once exiting (or manually calling `.result()`) the + # once exiting (or manually calling `.wait_for_result()`) the # `.open_context()` block. cs: trio.CancelScope = self._scope if ( @@ -764,8 +800,9 @@ class Context: # `trio.Cancelled` subtype here ;) # https://github.com/goodboy/tractor/issues/368 message: str = 'Cancelling `Context._scope` !\n\n' + # from .devx import pause_from_sync + # pause_from_sync() self._scope.cancel() - else: message: str = 'NOT cancelling `Context._scope` !\n\n' # from .devx import mk_pdb @@ -889,7 +926,7 @@ class Context: ''' side: str = self.side - self._cancel_called: bool = True + self.cancel_called: bool = True header: str = ( f'Cancelling ctx with peer from {side.upper()} side\n\n' @@ -912,7 +949,7 @@ class Context: # `._scope.cancel()` since we expect the eventual # `ContextCancelled` from the other side to trigger this # when the runtime finally receives it during teardown - # (normally in `.result()` called from + # (normally in `.wait_for_result()` called from # `Portal.open_context().__aexit__()`) if side == 'parent': if not self._portal: @@ -1025,10 +1062,10 @@ class Context: ''' __tracebackhide__: bool = hide_tb - our_uid: tuple = self.chan.uid + peer_uid: tuple = self.chan.uid # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption - # for "graceful cancellation" case: + # for "graceful cancellation" case(s): # # Whenever a "side" of a context (a `Task` running in # an actor) **is** the side which requested ctx @@ -1045,9 +1082,11 @@ class Context: # set to the `Actor.uid` of THIS task (i.e. the # cancellation requesting task's actor is the actor # checking whether it should absorb the ctxc). + self_ctxc: bool = self._is_self_cancelled(remote_error) if ( + self_ctxc + and not raise_ctxc_from_self_call - and self._is_self_cancelled(remote_error) # TODO: ?potentially it is useful to emit certain # warning/cancel logs for the cases where the @@ -1077,8 +1116,8 @@ class Context: and isinstance(remote_error, RemoteActorError) and remote_error.boxed_type is StreamOverrun - # and tuple(remote_error.msgdata['sender']) == our_uid - and tuple(remote_error.sender) == our_uid + # and tuple(remote_error.msgdata['sender']) == peer_uid + and tuple(remote_error.sender) == peer_uid ): # NOTE: we set the local scope error to any "self # cancellation" error-response thus "absorbing" @@ -1140,9 +1179,9 @@ class Context: of the remote cancellation. ''' - __tracebackhide__ = hide_tb + __tracebackhide__: bool = False assert self._portal, ( - "Context.result() can not be called from callee side!" + '`Context.wait_for_result()` can not be called from callee side!' ) if self._final_result_is_set(): return self._result @@ -1169,7 +1208,8 @@ class Context: drained_msgs, ) = await msgops.drain_to_final_msg( ctx=self, - hide_tb=hide_tb, + # hide_tb=hide_tb, + hide_tb=False, ) drained_status: str = ( @@ -1185,6 +1225,8 @@ class Context: log.cancel(drained_status) + # __tracebackhide__: bool = hide_tb + self.maybe_raise( # NOTE: obvi we don't care if we # overran the far end if we're already @@ -1197,7 +1239,8 @@ class Context: # raising something we know might happen # during cancellation ;) (not self._cancel_called) - ) + ), + hide_tb=hide_tb, ) # TODO: eventually make `.outcome: Outcome` and thus return # `self.outcome.unwrap()` here! @@ -1583,7 +1626,7 @@ class Context: - NEVER `return` early before delivering the msg! bc if the error is a ctxc and there is a task waiting on - `.result()` we need the msg to be + `.wait_for_result()` we need the msg to be `send_chan.send_nowait()`-ed over the `._rx_chan` so that the error is relayed to that waiter task and thus raised in user code! @@ -1828,7 +1871,7 @@ async def open_context_from_portal( When the "callee" (side that is "called"/started by a call to *this* method) returns, the caller side (this) unblocks and any final value delivered from the other end can be - retrieved using the `Contex.result()` api. + retrieved using the `Contex.wait_for_result()` api. The yielded ``Context`` instance further allows for opening bidirectional streams, explicit cancellation and @@ -1965,14 +2008,14 @@ async def open_context_from_portal( yield ctx, first # ??TODO??: do we still want to consider this or is - # the `else:` block handling via a `.result()` + # the `else:` block handling via a `.wait_for_result()` # call below enough?? # - # -[ ] pretty sure `.result()` internals do the + # -[ ] pretty sure `.wait_for_result()` internals do the # same as our ctxc handler below so it ended up # being same (repeated?) behaviour, but ideally we # wouldn't have that duplication either by somehow - # factoring the `.result()` handler impl in a way + # factoring the `.wait_for_result()` handler impl in a way # that we can re-use it around the `yield` ^ here # or vice versa? # @@ -2110,7 +2153,7 @@ async def open_context_from_portal( # AND a group-exc is only raised if there was > 1 # tasks started *here* in the "caller" / opener # block. If any one of those tasks calls - # `.result()` or `MsgStream.receive()` + # `.wait_for_result()` or `MsgStream.receive()` # `._maybe_raise_remote_err()` will be transitively # called and the remote error raised causing all # tasks to be cancelled. @@ -2180,7 +2223,7 @@ async def open_context_from_portal( f'|_{ctx._task}\n' ) # XXX NOTE XXX: the below call to - # `Context.result()` will ALWAYS raise + # `Context.wait_for_result()` will ALWAYS raise # a `ContextCancelled` (via an embedded call to # `Context._maybe_raise_remote_err()`) IFF # a `Context._remote_error` was set by the runtime @@ -2190,10 +2233,10 @@ async def open_context_from_portal( # ALWAYS SET any time "callee" side fails and causes "caller # side" cancellation via a `ContextCancelled` here. try: - result_or_err: Exception|Any = await ctx.result() + result_or_err: Exception|Any = await ctx.wait_for_result() except BaseException as berr: # on normal teardown, if we get some error - # raised in `Context.result()` we still want to + # raised in `Context.wait_for_result()` we still want to # save that error on the ctx's state to # determine things like `.cancelled_caught` for # cases where there was remote cancellation but diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 99a4dd6..522d4b8 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -56,14 +56,12 @@ async def get_registry( ]: ''' Return a portal instance connected to a local or remote - arbiter. + registry-service actor; if a connection already exists re-use it + (presumably to call a `.register_actor()` registry runtime RPC + ep). ''' - actor = current_actor() - - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - + actor: Actor = current_actor() if actor.is_registrar: # we're already the arbiter # (likely a re-entrant call from the arbiter actor) @@ -72,6 +70,8 @@ async def get_registry( Channel((host, port)) ) else: + # TODO: try to look pre-existing connection from + # `Actor._peers` and use it instead? async with ( _connect_chan(host, port) as chan, open_portal(chan) as regstr_ptl, diff --git a/tractor/_entry.py b/tractor/_entry.py index e22a4f1..4199a92 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -20,7 +20,8 @@ Sub-process entry points. """ from __future__ import annotations from functools import partial -# import textwrap +import os +import textwrap from typing import ( Any, TYPE_CHECKING, @@ -58,7 +59,7 @@ def _mp_main( ) -> None: ''' - The routine called *after fork* which invokes a fresh ``trio.run`` + The routine called *after fork* which invokes a fresh `trio.run()` ''' actor._forkserver_info = forkserver_info @@ -96,6 +97,35 @@ def _mp_main( log.info(f"Subactor {actor.uid} terminated") +# TODO: move this to some kinda `.devx._conc_lang.py` eventually +# as we work out our multi-domain state-flow-syntax! +def nest_from_op( + input_op: str, + tree_str: str, + + back_from_op: int = 1, +) -> str: + ''' + Depth-increment the input (presumably hierarchy/supervision) + input "tree string" below the provided `input_op` execution + operator, so injecting a `"\n|_{input_op}\n"`and indenting the + `tree_str` to nest content aligned with the ops last char. + + ''' + return ( + f'{input_op}\n' + + + textwrap.indent( + tree_str, + prefix=( + len(input_op) + - + back_from_op + ) *' ', + ) + ) + + def _trio_main( actor: Actor, *, @@ -119,7 +149,6 @@ def _trio_main( if actor.loglevel is not None: get_console_log(actor.loglevel) - import os actor_info: str = ( f'|_{actor}\n' f' uid: {actor.uid}\n' @@ -128,13 +157,29 @@ def _trio_main( f' loglevel: {actor.loglevel}\n' ) log.info( - 'Started new trio subactor:\n' + 'Started new `trio` subactor:\n' + - '>\n' # like a "started/play"-icon from super perspective - + - actor_info, + nest_from_op( + input_op='(>', # like a "started/play"-icon from super perspective + tree_str=actor_info, + ) + # '>(\n' # like a "started/play"-icon from super perspective + # + + # actor_info, ) - + logmeth = log.info + message: str = ( + # log.info( + 'Subactor terminated\n' + + + nest_from_op( + input_op=')>', # like a "started/play"-icon from super perspective + tree_str=actor_info, + ) + # 'x\n' # like a "crossed-out/killed" from super perspective + # + + # actor_info + ) try: if infect_asyncio: actor._infected_aio = True @@ -143,16 +188,18 @@ def _trio_main( trio.run(trio_main) except KeyboardInterrupt: - log.cancel( - 'Actor received KBI\n' + logmeth = log.cancel + message: str = ( + 'Actor received KBI (aka an OS-cancel)\n' + - actor_info + nest_from_op( + input_op='c)>', # like a "started/play"-icon from super perspective + tree_str=actor_info, + ) ) + except BaseException: + log.exception('Actor crashed exit?') + raise + finally: - log.info( - 'Subactor terminated\n' - + - 'x\n' # like a "crossed-out/killed" from super perspective - + - actor_info - ) + logmeth(message) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 7164d6a..46b64c1 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -922,15 +922,6 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" - -class AsyncioCancelled(Exception): - ''' - Asyncio cancelled translation (non-base) error - for use with the ``to_asyncio`` module - to be raised in the ``trio`` side task - - ''' - class MessagingError(Exception): ''' IPC related msg (typing), transaction (ordering) or dialog @@ -1324,7 +1315,9 @@ def _mk_recv_mte( any_pld: Any = msgpack.decode(msg.pld) message: str = ( f'invalid `{msg_type.__qualname__}` msg payload\n\n' - f'value: `{any_pld!r}` does not match type-spec: ' + f'{any_pld!r}\n\n' + f'has type {type(any_pld)!r}\n\n' + f'and does not match type-spec ' f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' ) bad_msg = msg diff --git a/tractor/_ipc.py b/tractor/_ipc.py index e5e3d10..492602b 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -40,6 +40,7 @@ from typing import ( TypeVar, ) +# import pdbp import msgspec from tricycle import BufferedReceiveStream import trio @@ -290,12 +291,14 @@ class MsgpackTCPStream(MsgTransport): else: raise + # @pdbp.hideframe async def send( self, msg: msgtypes.MsgType, strict_types: bool = True, - # hide_tb: bool = False, + hide_tb: bool = False, + ) -> None: ''' Send a msgpack encoded py-object-blob-as-msg over TCP. @@ -304,7 +307,10 @@ class MsgpackTCPStream(MsgTransport): invalid msg type ''' - # __tracebackhide__: bool = hide_tb + __tracebackhide__: bool = hide_tb + # try: + # XXX see `trio._sync.AsyncContextManagerMixin` for details + # on the `.acquire()`/`.release()` sequencing.. async with self._send_lock: # NOTE: lookup the `trio.Task.context`'s var for @@ -352,6 +358,14 @@ class MsgpackTCPStream(MsgTransport): size: bytes = struct.pack(" tuple[str, int]: return self._laddr @@ -560,27 +574,40 @@ class Channel: ) return transport + # TODO: something like, + # `pdbp.hideframe_on(errors=[MsgTypeError])` + # @pdbp.hideframe async def send( self, payload: Any, - # hide_tb: bool = False, + hide_tb: bool = False, ) -> None: ''' Send a coded msg-blob over the transport. ''' - # __tracebackhide__: bool = hide_tb - log.transport( - '=> send IPC msg:\n\n' - f'{pformat(payload)}\n' - ) # type: ignore - assert self._transport - await self._transport.send( - payload, - # hide_tb=hide_tb, - ) + __tracebackhide__: bool = hide_tb + try: + log.transport( + '=> send IPC msg:\n\n' + f'{pformat(payload)}\n' + ) + # assert self._transport # but why typing? + await self._transport.send( + payload, + hide_tb=hide_tb, + ) + except BaseException as _err: + err = _err # bind for introspection + if not isinstance(_err, MsgTypeError): + # assert err + __tracebackhide__: bool = False + else: + assert err.cid + + raise async def recv(self) -> Any: assert self._transport diff --git a/tractor/_portal.py b/tractor/_portal.py index 2c676e1..38ce8dc 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -121,7 +121,8 @@ class Portal: ) return self.chan - # TODO: factor this out into an `ActorNursery` wrapper + # TODO: factor this out into a `.highlevel` API-wrapper that uses + # a single `.open_context()` call underneath. async def _submit_for_result( self, ns: str, @@ -141,13 +142,22 @@ class Portal: portal=self, ) + # TODO: we should deprecate this API right? since if we remove + # `.run_in_actor()` (and instead move it to a `.highlevel` + # wrapper api (around a single `.open_context()` call) we don't + # really have any notion of a "main" remote task any more? + # # @api_frame - async def result(self) -> Any: + async def wait_for_result( + self, + hide_tb: bool = True, + ) -> Any: ''' - Return the result(s) from the remote actor's "main" task. + Return the final result delivered by a `Return`-msg from the + remote peer actor's "main" task's `return` statement. ''' - __tracebackhide__ = True + __tracebackhide__: bool = hide_tb # Check for non-rpc errors slapped on the # channel for which we always raise exc = self.channel._exc @@ -182,6 +192,23 @@ class Portal: return self._final_result_pld + # TODO: factor this out into a `.highlevel` API-wrapper that uses + # a single `.open_context()` call underneath. + async def result( + self, + *args, + **kwargs, + ) -> Any|Exception: + typname: str = type(self).__name__ + log.warning( + f'`{typname}.result()` is DEPRECATED!\n' + 'Use `{typname.wait_for_result()` instead!\n' + ) + return await self.wait_for_result( + *args, + **kwargs, + ) + async def _cancel_streams(self): # terminate all locally running async generator # IPC calls @@ -240,6 +267,7 @@ class Portal: f'{reminfo}' ) + # XXX the one spot we set it? self.channel._cancel_called: bool = True try: # send cancel cmd - might not get response @@ -279,6 +307,8 @@ class Portal: ) return False + # TODO: do we still need this for low level `Actor`-runtime + # method calls or can we also remove it? async def run_from_ns( self, namespace_path: str, @@ -316,6 +346,8 @@ class Portal: expect_msg=Return, ) + # TODO: factor this out into a `.highlevel` API-wrapper that uses + # a single `.open_context()` call underneath. async def run( self, func: str, @@ -370,6 +402,8 @@ class Portal: expect_msg=Return, ) + # TODO: factor this out into a `.highlevel` API-wrapper that uses + # a single `.open_context()` call underneath. @acm async def open_stream_from( self, diff --git a/tractor/_root.py b/tractor/_root.py index 7cdef60..3cfa253 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -21,6 +21,7 @@ Root actor runtime ignition(s). from contextlib import asynccontextmanager as acm from functools import partial import importlib +import inspect import logging import os import signal @@ -115,10 +116,16 @@ async def open_root_actor( if ( debug_mode and maybe_enable_greenback - and await _debug.maybe_init_greenback( - raise_not_found=False, + and ( + maybe_mod := await _debug.maybe_init_greenback( + raise_not_found=False, + ) ) ): + logger.info( + f'Found `greenback` installed @ {maybe_mod}\n' + 'Enabling `tractor.pause_from_sync()` support!\n' + ) os.environ['PYTHONBREAKPOINT'] = ( 'tractor.devx._debug._sync_pause_from_builtin' ) @@ -264,7 +271,10 @@ async def open_root_actor( except OSError: # TODO: make this a "discovery" log level? - logger.warning(f'No actor registry found @ {addr}') + logger.info( + f'No actor registry found @ {addr}\n' + # 'Registry will be initialized in local actor..' + ) async with trio.open_nursery() as tn: for addr in registry_addrs: @@ -365,23 +375,25 @@ async def open_root_actor( ) try: yield actor - except ( Exception, BaseExceptionGroup, ) as err: - - import inspect + # XXX NOTE XXX see equiv note inside + # `._runtime.Actor._stream_handler()` where in the + # non-root or root-that-opened-this-mahually case we + # wait for the local actor-nursery to exit before + # exiting the transport channel handler. entered: bool = await _debug._maybe_enter_pm( err, api_frame=inspect.currentframe(), ) - if ( not entered - and not is_multi_cancelled(err) + and + not is_multi_cancelled(err) ): - logger.exception('Root actor crashed:\n') + logger.exception('Root actor crashed\n') # ALWAYS re-raise any error bubbled up from the # runtime! diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 166ee96..f901150 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -89,6 +89,15 @@ if TYPE_CHECKING: log = get_logger('tractor') +# TODO: move to a `tractor.lowlevel.rpc` with the below +# func-type-cases implemented "on top of" `@context` defs. +# -[ ] std async func +# -[ ] `Portal.open_stream_from()` with async-gens? +# |_ possibly a duplex form of this with a +# `sent_from_peer = yield send_to_peer` form, which would require +# syncing the send/recv side with possibly `.receive_nowait()` +# on each `yield`? +# -[ ] async def _invoke_non_context( actor: Actor, cancel_scope: CancelScope, @@ -108,6 +117,7 @@ async def _invoke_non_context( ] = trio.TASK_STATUS_IGNORED, ): __tracebackhide__: bool = True + cs: CancelScope|None = None # ref when activated # TODO: can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): @@ -160,10 +170,6 @@ async def _invoke_non_context( functype='asyncgen', ) ) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above with cancel_scope as cs: ctx._scope = cs task_status.started(ctx) @@ -175,15 +181,13 @@ async def _invoke_non_context( await chan.send( Stop(cid=cid) ) + + # simplest function/method request-response pattern + # XXX: in the most minimally used case, just a scheduled internal runtime + # call to `Actor._cancel_task()` from the ctx-peer task since we + # don't (yet) have a dedicated IPC msg. + # ------ - ------ else: - # regular async function/method - # XXX: possibly just a scheduled `Actor._cancel_task()` - # from a remote request to cancel some `Context`. - # ------ - ------ - # TODO: ideally we unify this with the above `context=True` - # block such that for any remote invocation ftype, we - # always invoke the far end RPC task scheduling the same - # way: using the linked IPC context machinery. failed_resp: bool = False try: ack = StartAck( @@ -354,8 +358,14 @@ async def _errors_relayed_via_ipc( # channel. task_status.started(err) - # always reraise KBIs so they propagate at the sys-process level. - if isinstance(err, KeyboardInterrupt): + # always reraise KBIs so they propagate at the sys-process + # level. + # XXX LOL, except when running in asyncio mode XD + # cmon guys, wtf.. + if ( + isinstance(err, KeyboardInterrupt) + # and not actor.is_infected_aio() + ): raise # RPC task bookeeping. @@ -458,7 +468,6 @@ async def _invoke( # tb: TracebackType = None cancel_scope = CancelScope() - cs: CancelScope|None = None # ref when activated ctx = actor.get_context( chan=chan, cid=cid, @@ -607,6 +616,8 @@ async def _invoke( # `@context` marked RPC function. # - `._portal` is never set. try: + tn: trio.Nursery + rpc_ctx_cs: CancelScope async with ( trio.open_nursery() as tn, msgops.maybe_limit_plds( @@ -616,7 +627,7 @@ async def _invoke( ), ): ctx._scope_nursery = tn - ctx._scope = tn.cancel_scope + rpc_ctx_cs = ctx._scope = tn.cancel_scope task_status.started(ctx) # TODO: better `trionics` tooling: @@ -642,7 +653,7 @@ async def _invoke( # itself calls `ctx._maybe_cancel_and_set_remote_error()` # which cancels the scope presuming the input error # is not a `.cancel_acked` pleaser. - if ctx._scope.cancelled_caught: + if rpc_ctx_cs.cancelled_caught: our_uid: tuple = actor.uid # first check for and raise any remote error @@ -652,9 +663,7 @@ async def _invoke( if re := ctx._remote_error: ctx._maybe_raise_remote_err(re) - cs: CancelScope = ctx._scope - - if cs.cancel_called: + if rpc_ctx_cs.cancel_called: canceller: tuple = ctx.canceller explain: str = f'{ctx.side!r}-side task was cancelled by ' @@ -680,9 +689,15 @@ async def _invoke( elif canceller == ctx.chan.uid: explain += f'its {ctx.peer_side!r}-side peer' - else: + elif canceller == our_uid: + explain += 'itself' + + elif canceller: explain += 'a remote peer' + else: + explain += 'an unknown cause?' + explain += ( add_div(message=explain) + @@ -1238,7 +1253,7 @@ async def process_messages( 'Exiting IPC msg loop with final msg\n\n' f'<= peer: {chan.uid}\n' f' |_{chan}\n\n' - f'{pretty_struct.pformat(msg)}' + # f'{pretty_struct.pformat(msg)}' ) log.runtime(message) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 3cf35ff..604d1e6 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1046,6 +1046,10 @@ class Actor: # TODO: another `Struct` for rtvs.. rvs: dict[str, Any] = spawnspec._runtime_vars if rvs['_debug_mode']: + from .devx import ( + enable_stack_on_sig, + maybe_init_greenback, + ) try: # TODO: maybe return some status msgs upward # to that we can emit them in `con_status` @@ -1053,13 +1057,27 @@ class Actor: log.devx( 'Enabling `stackscope` traces on SIGUSR1' ) - from .devx import enable_stack_on_sig enable_stack_on_sig() + except ImportError: log.warning( '`stackscope` not installed for use in debug mode!' ) + if rvs.get('use_greenback', False): + maybe_mod: ModuleType|None = await maybe_init_greenback() + if maybe_mod: + log.devx( + 'Activated `greenback` ' + 'for `tractor.pause_from_sync()` support!' + ) + else: + rvs['use_greenback'] = False + log.warning( + '`greenback` not installed for use in debug mode!\n' + '`tractor.pause_from_sync()` not available!' + ) + rvs['_is_root'] = False _state._runtime_vars.update(rvs) @@ -1717,8 +1735,8 @@ async def async_main( # Register with the arbiter if we're told its addr log.runtime( - f'Registering `{actor.name}` ->\n' - f'{pformat(accept_addrs)}' + f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' + # ^-TODO-^ we should instead show the maddr here^^ ) # TODO: ideally we don't fan out to all registrars @@ -1776,9 +1794,15 @@ async def async_main( # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception as err: - log.runtime("Closing all actor lifetime contexts") - actor.lifetime_stack.close() + except Exception as internal_err: + # ls: ExitStack = actor.lifetime_stack + # log.cancel( + # 'Closing all actor-lifetime exec scopes\n\n' + # f'|_{ls}\n' + # ) + # # _debug.pause_from_sync() + # # await _debug.pause(shield=True) + # ls.close() if not is_registered: # TODO: I guess we could try to connect back @@ -1786,7 +1810,8 @@ async def async_main( # once we have that all working with std streams locking? log.exception( f"Actor errored and failed to register with arbiter " - f"@ {actor.reg_addrs[0]}?") + f"@ {actor.reg_addrs[0]}?" + ) log.error( "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" @@ -1799,25 +1824,44 @@ async def async_main( if actor._parent_chan: await try_ship_error_to_remote( actor._parent_chan, - err, + internal_err, ) # always! - match err: + match internal_err: case ContextCancelled(): log.cancel( f'Actor: {actor.uid} was task-context-cancelled with,\n' - f'str(err)' + f'str(internal_err)' ) case _: log.exception("Actor errored:") raise finally: - log.runtime( + teardown_msg: str = ( 'Runtime nursery complete' - '-> Closing all actor lifetime contexts..' ) + + ls: ExitStack = actor.lifetime_stack + cbs: list[Callable] = [ + repr(tup[1].__wrapped__) + for tup in ls._exit_callbacks + ] + if cbs: + cbs_str: str = '\n'.join(cbs) + teardown_msg += ( + '-> Closing all actor-lifetime callbacks\n\n' + f'|_{cbs_str}\n' + ) + # XXX NOTE XXX this will cause an error which + # prevents any `infected_aio` actor from continuing + # and any callbacks in the `ls` here WILL NOT be + # called!! + # await _debug.pause(shield=True) + + ls.close() + # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? actor.lifetime_stack.close() @@ -1856,23 +1900,28 @@ async def async_main( failed = True if failed: - log.warning( - f'Failed to unregister {actor.name} from ' - f'registar @ {addr}' + teardown_msg += ( + f'-> Failed to unregister {actor.name} from ' + f'registar @ {addr}\n' ) + # log.warning( # Ensure all peers (actors connected to us as clients) are finished if not actor._no_more_peers.is_set(): if any( chan.connected() for chan in chain(*actor._peers.values()) ): - log.runtime( - f"Waiting for remaining peers {actor._peers} to clear") + teardown_msg += ( + f'-> Waiting for remaining peers {actor._peers} to clear..\n' + ) + log.runtime(teardown_msg) with CancelScope(shield=True): await actor._no_more_peers.wait() - log.runtime("All peer channels are complete") - log.runtime("Runtime completed") + teardown_msg += ('-> All peer channels are complete\n') + + teardown_msg += ('Actor runtime completed') + log.info(teardown_msg) # TODO: rename to `Registry` and move to `._discovery`! diff --git a/tractor/_state.py b/tractor/_state.py index 8c5cca1..9f89600 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = { '_root_mailbox': (None, None), '_registry_addrs': [], - # for `breakpoint()` support + # for `tractor.pause_from_sync()` & `breakpoint()` support 'use_greenback': False, } diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 314a93b..5e1a6bf 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel): @property def ctx(self) -> Context: ''' - This stream's IPC `Context` ref. + A read-only ref to this stream's inter-actor-task `Context`. ''' return self._ctx diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 8f3574b..fb737c1 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -80,6 +80,7 @@ class ActorNursery: ''' def __init__( self, + # TODO: maybe def these as fields of a struct looking type? actor: Actor, ria_nursery: trio.Nursery, da_nursery: trio.Nursery, @@ -88,8 +89,10 @@ class ActorNursery: ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor - self._ria_nursery = ria_nursery + + # TODO: rename to `._tn` for our conventional "task-nursery" self._da_nursery = da_nursery + self._children: dict[ tuple[str, str], tuple[ @@ -98,15 +101,13 @@ class ActorNursery: Portal | None, ] ] = {} - # portals spawned with ``run_in_actor()`` are - # cancelled when their "main" result arrives - self._cancel_after_result_on_exit: set = set() + self.cancelled: bool = False self._join_procs = trio.Event() self._at_least_one_child_in_debug: bool = False self.errors = errors - self.exited = trio.Event() self._scope_error: BaseException|None = None + self.exited = trio.Event() # NOTE: when no explicit call is made to # `.open_root_actor()` by application code, @@ -116,6 +117,13 @@ class ActorNursery: # and syncing purposes to any actor opened nurseries. self._implicit_runtime_started: bool = False + # TODO: remove the `.run_in_actor()` API and thus this 2ndary + # nursery when that API get's moved outside this primitive! + self._ria_nursery = ria_nursery + # portals spawned with ``run_in_actor()`` are + # cancelled when their "main" result arrives + self._cancel_after_result_on_exit: set = set() + async def start_actor( self, name: str, @@ -126,10 +134,14 @@ class ActorNursery: rpc_module_paths: list[str]|None = None, enable_modules: list[str]|None = None, loglevel: str|None = None, # set log level per subactor - nursery: trio.Nursery|None = None, debug_mode: bool|None = None, infect_asyncio: bool = False, + # TODO: ideally we can rm this once we no longer have + # a `._ria_nursery` since the dependent APIs have been + # removed! + nursery: trio.Nursery|None = None, + ) -> Portal: ''' Start a (daemon) actor: an process that has no designated @@ -200,6 +212,7 @@ class ActorNursery: # |_ dynamic @context decoration on child side # |_ implicit `Portal.open_context() as (ctx, first):` # and `return first` on parent side. + # |_ mention how it's similar to `trio-parallel` API? # -[ ] use @api_frame on the wrapper async def run_in_actor( self, @@ -269,11 +282,14 @@ class ActorNursery: ) -> None: ''' - Cancel this nursery by instructing each subactor to cancel - itself and wait for all subactors to terminate. + Cancel this actor-nursery by instructing each subactor's + runtime to cancel and wait for all underlying sub-processes + to terminate. - If ``hard_killl`` is set to ``True`` then kill the processes - directly without any far end graceful ``trio`` cancellation. + If `hard_kill` is set then kill the processes directly using + the spawning-backend's API/OS-machinery without any attempt + at (graceful) `trio`-style cancellation using our + `Actor.cancel()`. ''' __runtimeframe__: int = 1 # noqa @@ -629,8 +645,12 @@ async def open_nursery( f'|_{an}\n' ) - # shutdown runtime if it was started if implicit_runtime: + # shutdown runtime if it was started and report noisly + # that we're did so. msg += '=> Shutting down actor runtime <=\n' + log.info(msg) - log.info(msg) + else: + # keep noise low during std operation. + log.runtime(msg) diff --git a/tractor/devx/__init__.py b/tractor/devx/__init__.py index ab9d2d1..86dd128 100644 --- a/tractor/devx/__init__.py +++ b/tractor/devx/__init__.py @@ -29,6 +29,7 @@ from ._debug import ( shield_sigint_handler as shield_sigint_handler, open_crash_handler as open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler, + maybe_init_greenback as maybe_init_greenback, post_mortem as post_mortem, mk_pdb as mk_pdb, ) diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index ccf57d6..b067275 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -69,6 +69,7 @@ from trio import ( import tractor from tractor.log import get_logger from tractor._context import Context +from tractor import _state from tractor._state import ( current_actor, is_root_process, @@ -87,9 +88,6 @@ if TYPE_CHECKING: from tractor._runtime import ( Actor, ) - from tractor.msg import ( - _codec, - ) log = get_logger(__name__) @@ -1599,11 +1597,13 @@ async def _pause( try: task: Task = current_task() except RuntimeError as rte: + __tracebackhide__: bool = False log.exception('Failed to get current task?') if actor.is_infected_aio(): + # mk_pdb().set_trace() raise RuntimeError( '`tractor.pause[_from_sync]()` not yet supported ' - 'for infected `asyncio` mode!' + 'directly (infected) `asyncio` tasks!' ) from rte raise @@ -2163,22 +2163,22 @@ def maybe_import_greenback( return False -async def maybe_init_greenback( - **kwargs, -) -> None|ModuleType: - - if mod := maybe_import_greenback(**kwargs): - await mod.ensure_portal() - log.devx( - '`greenback` portal opened!\n' - 'Sync debug support activated!\n' - ) - return mod +async def maybe_init_greenback(**kwargs) -> None|ModuleType: + try: + if mod := maybe_import_greenback(**kwargs): + await mod.ensure_portal() + log.devx( + '`greenback` portal opened!\n' + 'Sync debug support activated!\n' + ) + return mod + except BaseException: + log.exception('Failed to init `greenback`..') + raise return None - async def _pause_from_bg_root_thread( behalf_of_thread: Thread, repl: PdbREPL, @@ -2399,18 +2399,37 @@ def pause_from_sync( else: # we are presumably the `trio.run()` + main thread # raises on not-found by default greenback: ModuleType = maybe_import_greenback() + + # TODO: how to ensure this is either dynamically (if + # needed) called here (in some bg tn??) or that the + # subactor always already called it? + # greenback: ModuleType = await maybe_init_greenback() + message += f'-> imported {greenback}\n' repl_owner: Task = current_task() message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' - out = greenback.await_( - _pause( - debug_func=None, - repl=repl, - hide_tb=hide_tb, - called_from_sync=True, - **_pause_kwargs, + try: + out = greenback.await_( + _pause( + debug_func=None, + repl=repl, + hide_tb=hide_tb, + called_from_sync=True, + **_pause_kwargs, + ) ) - ) + except RuntimeError as rte: + if not _state._runtime_vars.get( + 'use_greenback', + False, + ): + raise RuntimeError( + '`greenback` was never initialized in this actor!?\n\n' + f'{_state._runtime_vars}\n' + ) from rte + + raise + if out: bg_task, repl = out assert repl is repl @@ -2801,10 +2820,10 @@ def open_crash_handler( `trio.run()`. ''' + err: BaseException try: yield except tuple(catch) as err: - if type(err) not in ignore: pdbp.xpm() diff --git a/tractor/devx/_frame_stack.py b/tractor/devx/_frame_stack.py index 89a9e84..8e9bf46 100644 --- a/tractor/devx/_frame_stack.py +++ b/tractor/devx/_frame_stack.py @@ -234,7 +234,7 @@ def find_caller_info( _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} -# TODO: -[x] move all this into new `.devx._code`! +# TODO: -[x] move all this into new `.devx._frame_stack`! # -[ ] consider rename to _callstack? # -[ ] prolly create a `@runtime_api` dec? # |_ @api_frame seems better? @@ -286,3 +286,18 @@ def api_frame( wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache wrapped.__api_func__: bool = True return wrapper(wrapped) + + +# TODO: something like this instead of the adhoc frame-unhiding +# blocks all over the runtime!! XD +# -[ ] ideally we can expect a certain error (set) and if something +# else is raised then all frames below the wrapped one will be +# un-hidden via `__tracebackhide__: bool = False`. +# |_ might need to dynamically mutate the code objs like +# `pdbp.hideframe()` does? +# -[ ] use this as a `@acm` decorator as introed in 3.10? +# @acm +# async def unhide_frame_when_not( +# error_set: set[BaseException], +# ) -> TracebackType: +# ... diff --git a/tractor/hilevel/__init__.py b/tractor/hilevel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py new file mode 100644 index 0000000..3ec7535 --- /dev/null +++ b/tractor/hilevel/_service.py @@ -0,0 +1,134 @@ +# tractor: structured concurrent "actors". +# Copyright 2024-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Daemon subactor as service(s) management and supervision primitives +and API. + +''' +from __future__ import annotations +from contextlib import ( + # asynccontextmanager as acm, + contextmanager as cm, +) +from collections import defaultdict +from typing import ( + Callable, + Any, +) + +import trio +from trio import TaskStatus +from tractor import ( + ActorNursery, + current_actor, + ContextCancelled, + Context, + Portal, +) + +from ._util import ( + log, # sub-sys logger +) + + +# TODO: implement a `@singleton` deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# -[ ] go through the options peeps on SO did? +# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python +# * including @mikenerone's answer +# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 +# +# -[ ] put it in `tractor.lowlevel._globals` ? +# * fits with our oustanding actor-local/global feat req? +# |_ https://github.com/goodboy/tractor/issues/55 +# * how can it relate to the `Actor.lifetime_stack` that was +# silently patched in? +# |_ we could implicitly call both of these in the same +# spot in the runtime using the lifetime stack? +# - `open_singleton_cm().__exit__()` +# -`del_singleton()` +# |_ gives SC fixtue semantics to sync code oriented around +# sub-process lifetime? +# * what about with `trio.RunVar`? +# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar +# - which we'll need for no-GIL cpython (right?) presuming +# multiple `trio.run()` calls in process? +# +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + +# a deletion API for explicit instance de-allocation? +# @open_service_mngr.deleter +# def del_service_mngr() -> None: +# mngr = open_service_mngr._singleton[0] +# open_service_mngr._singleton[0] = None +# del mngr + + + +# TODO: singleton factory API instead of a class API +@cm +def open_service_mngr( + *, + _singleton: list[ServiceMngr|None] = [None], + # NOTE; since default values for keyword-args are effectively + # module-vars/globals as per the note from, + # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values + # + # > "The default value is evaluated only once. This makes + # a difference when the default is a mutable object such as + # a list, dictionary, or instances of most classes" + # + **init_kwargs, + +) -> ServiceMngr: + ''' + Open a multi-subactor-as-service-daemon tree supervisor. + + The delivered `ServiceMngr` is a singleton instance for each + actor-process and is allocated on first open and never + de-allocated unless explicitly deleted by al call to + `del_service_mngr()`. + + ''' + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + else: + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + with mngr: + yield mngr + + diff --git a/tractor/log.py b/tractor/log.py index edb058e..47f1f25 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -54,11 +54,12 @@ LOG_FORMAT = ( DATE_FORMAT = '%b %d %H:%M:%S' # FYI, ERROR is 40 +# TODO: use a `bidict` to avoid the :155 check? CUSTOM_LEVELS: dict[str, int] = { 'TRANSPORT': 5, 'RUNTIME': 15, 'DEVX': 17, - 'CANCEL': 18, + 'CANCEL': 22, 'PDB': 500, } STD_PALETTE = { @@ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter): Delegate a log call to the underlying logger, after adding contextual information from this adapter instance. + NOTE: all custom level methods (above) delegate to this! + ''' if self.isEnabledFor(level): stacklevel: int = 3 diff --git a/tractor/msg/_codec.py b/tractor/msg/_codec.py index c1301bd..6d0e3b5 100644 --- a/tractor/msg/_codec.py +++ b/tractor/msg/_codec.py @@ -41,8 +41,10 @@ import textwrap from typing import ( Any, Callable, + Protocol, Type, TYPE_CHECKING, + TypeVar, Union, ) from types import ModuleType @@ -181,7 +183,11 @@ def mk_dec( dec_hook: Callable|None = None, ) -> MsgDec: + ''' + Create an IPC msg decoder, normally used as the + `PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`. + ''' return MsgDec( _dec=msgpack.Decoder( type=spec, # like `MsgType[Any]` @@ -227,6 +233,13 @@ def pformat_msgspec( join_char: str = '\n', ) -> str: + ''' + Pretty `str` format the `msgspec.msgpack.Decoder.type` attributed + for display in log messages as a nice (maybe multiline) + presentation of all the supported `Struct`s availed for typed + decoding. + + ''' dec: msgpack.Decoder = getattr(codec, 'dec', codec) return join_char.join( mk_msgspec_table( @@ -630,31 +643,57 @@ def limit_msg_spec( # # import pdbp; pdbp.set_trace() # assert ext_codec.pld_spec == extended_spec # yield ext_codec +# +# ^-TODO-^ is it impossible to make something like this orr!? + +# TODO: make an auto-custom hook generator from a set of input custom +# types? +# -[ ] below is a proto design using a `TypeCodec` idea? +# +# type var for the expected interchange-lib's +# IPC-transport type when not available as a built-in +# serialization output. +WireT = TypeVar('WireT') -# TODO: make something similar to this inside `._codec` such that -# user can just pass a type table of some sort? -# -[ ] we would need to decode all msgs to `pretty_struct.Struct` -# and then call `.to_dict()` on them? -# -[x] we're going to need to re-impl all the stuff changed in the -# runtime port such that it can handle dicts or `Msg`s? -# -# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: -# ''' -# Deliver a `enc_hook()`/`dec_hook()` pair which does -# manual convertion from our above native `Msg` set -# to `dict` equivalent (wire msgs) in order to keep legacy compat -# with the original runtime implementation. -# -# Note: this is is/was primarly used while moving the core -# runtime over to using native `Msg`-struct types wherein we -# start with the send side emitting without loading -# a typed-decoder and then later flipping the switch over to -# load to the native struct types once all runtime usage has -# been adjusted appropriately. -# -# ''' -# return ( -# # enc_to_dict, -# dec_from_dict, -# ) +# TODO: some kinda (decorator) API for built-in subtypes +# that builds this implicitly by inspecting the `mro()`? +class TypeCodec(Protocol): + ''' + A per-custom-type wire-transport serialization translator + description type. + + ''' + src_type: Type + wire_type: WireT + + def encode(obj: Type) -> WireT: + ... + + def decode( + obj_type: Type[WireT], + obj: WireT, + ) -> Type: + ... + + +class MsgpackTypeCodec(TypeCodec): + ... + + +def mk_codec_hooks( + type_codecs: list[TypeCodec], + +) -> tuple[Callable, Callable]: + ''' + Deliver a `enc_hook()`/`dec_hook()` pair which handle + manual convertion from an input `Type` set such that whenever + the `TypeCodec.filter()` predicate matches the + `TypeCodec.decode()` is called on the input native object by + the `dec_hook()` and whenever the + `isiinstance(obj, TypeCodec.type)` matches against an + `enc_hook(obj=obj)` the return value is taken from a + `TypeCodec.encode(obj)` callback. + + ''' + ... diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 91c0dde..d397ee4 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -580,12 +580,15 @@ async def drain_to_final_msg( # 2. WE DID NOT REQUEST that cancel and thus # SHOULD RAISE HERE! except trio.Cancelled as taskc: + # from tractor.devx._debug import pause + # await pause(shield=True) # CASE 2: mask the local cancelled-error(s) # only when we are sure the remote error is # the source cause of this local task's # cancellation. ctx.maybe_raise( + hide_tb=hide_tb, # TODO: when use this/ # from_src_exc=taskc, ) diff --git a/tractor/msg/pretty_struct.py b/tractor/msg/pretty_struct.py index f27fb89..15e469e 100644 --- a/tractor/msg/pretty_struct.py +++ b/tractor/msg/pretty_struct.py @@ -34,6 +34,9 @@ from pprint import ( saferepr, ) +from tractor.log import get_logger + +log = get_logger() # TODO: auto-gen type sig for input func both for # type-msgs and logging of RPC tasks? # taken and modified from: @@ -143,7 +146,13 @@ def pformat( else: # the `pprint` recursion-safe format: # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr - val_str: str = saferepr(v) + try: + val_str: str = saferepr(v) + except Exception: + log.exception( + 'Failed to `saferepr({type(struct)})` !?\n' + ) + return _Struct.__repr__(struct) # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') @@ -194,12 +203,20 @@ class Struct( return sin_props pformat = pformat + # __repr__ = pformat # __str__ = __repr__ = pformat # TODO: use a pprint.PrettyPrinter instance around ONLY rendering # inside a known tty? # def __repr__(self) -> str: # ... - __repr__ = pformat + def __repr__(self) -> str: + try: + return pformat(self) + except Exception: + log.exception( + f'Failed to `pformat({type(self)})` !?\n' + ) + return _Struct.__repr__(self) def copy( self, diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index d1451b4..e041721 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -18,11 +18,13 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' +from __future__ import annotations import asyncio from asyncio.exceptions import CancelledError from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect +import traceback from typing import ( Any, Callable, @@ -30,20 +32,21 @@ from typing import ( Awaitable, ) -import trio -from outcome import Error - -from tractor.log import get_logger +import tractor from tractor._state import ( - current_actor, debug_mode, ) +from tractor.log import get_logger from tractor.devx import _debug -from tractor._exceptions import AsyncioCancelled from tractor.trionics._broadcast import ( broadcast_receiver, BroadcastReceiver, ) +import trio +from outcome import ( + Error, + Outcome, +) log = get_logger(__name__) @@ -161,7 +164,7 @@ def _run_asyncio_task( ''' __tracebackhide__ = True - if not current_actor().is_infected_aio(): + if not tractor.current_actor().is_infected_aio(): raise RuntimeError( "`infect_asyncio` mode is not enabled!?" ) @@ -172,7 +175,6 @@ def _run_asyncio_task( to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore args = tuple(inspect.getfullargspec(func).args) - if getattr(func, '_tractor_steam_function', None): # the assumption is that the target async routine accepts the # send channel then it intends to yield more then one return @@ -346,13 +348,22 @@ def _run_asyncio_task( # on a checkpoint. cancel_scope.cancel() - # raise any ``asyncio`` side error. + # raise any `asyncio` side error. raise aio_err task.add_done_callback(cancel_trio) return chan +class AsyncioCancelled(CancelledError): + ''' + Asyncio cancelled translation (non-base) error + for use with the ``to_asyncio`` module + to be raised in the ``trio`` side task + + ''' + + @acm async def translate_aio_errors( @@ -516,7 +527,6 @@ async def open_channel_from( def run_as_asyncio_guest( - trio_main: Callable, ) -> None: @@ -548,6 +558,11 @@ def run_as_asyncio_guest( loop = asyncio.get_running_loop() trio_done_fut = asyncio.Future() + startup_msg: str = ( + 'Starting `asyncio` guest-loop-run\n' + '-> got running loop\n' + '-> built a `trio`-done future\n' + ) if debug_mode(): # XXX make it obvi we know this isn't supported yet! @@ -562,34 +577,120 @@ def run_as_asyncio_guest( def trio_done_callback(main_outcome): if isinstance(main_outcome, Error): - error = main_outcome.error + error: BaseException = main_outcome.error + + # show an dedicated `asyncio`-side tb from the error + tb_str: str = ''.join(traceback.format_exception(error)) + log.exception( + 'Guest-run errored!?\n\n' + f'{main_outcome}\n' + f'{error}\n\n' + f'{tb_str}\n' + ) trio_done_fut.set_exception(error) - # TODO: explicit asyncio tb? - # traceback.print_exception(error) - - # XXX: do we need this? - # actor.cancel_soon() - + # raise inline main_outcome.unwrap() + else: trio_done_fut.set_result(main_outcome) - log.runtime(f"trio_main finished: {main_outcome!r}") + log.runtime(f'trio_main finished: {main_outcome!r}') + + startup_msg += ( + f'-> created {trio_done_callback!r}\n' + f'-> scheduling `trio_main`: {trio_main!r}\n' + ) # start the infection: run trio on the asyncio loop in "guest mode" log.runtime( - 'Infecting `asyncio`-process with a `trio` guest-run of\n\n' - f'{trio_main!r}\n\n' - - f'{trio_done_callback}\n' + f'{startup_msg}\n\n' + + + 'Infecting `asyncio`-process with a `trio` guest-run!\n' ) + trio.lowlevel.start_guest_run( trio_main, run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) - # NOTE `.unwrap()` will raise on error - return (await trio_done_fut).unwrap() + try: + # TODO: better SIGINT handling since shielding seems to + # make NO DIFFERENCE XD + # -[ ] maybe this is due to 3.11's recent SIGINT handling + # changes and we can better work with/around it? + # https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption + out: Outcome = await asyncio.shield(trio_done_fut) + # NOTE `Error.unwrap()` will raise + return out.unwrap() + + except asyncio.CancelledError: + actor: tractor.Actor = tractor.current_actor() + log.exception( + '`asyncio`-side main task was cancelled!\n' + 'Cancelling actor-runtime..\n' + f'c)>\n' + f' |_{actor}.cancel_soon()\n' + + ) + + # XXX NOTE XXX the next LOC is super important!!! + # => without it, we can get a guest-run abandonment case + # where asyncio will not trigger `trio` in a final event + # loop cycle! + # + # our test, + # `test_infected_asyncio.test_sigint_closes_lifetime_stack()` + # demonstrates how if when we raise a SIGINT-signal in an infected + # child we get a variable race condition outcome where + # either of the following can indeterminately happen, + # + # - "silent-abandon": `asyncio` abandons the `trio` + # guest-run task silently and no `trio`-guest-run or + # `tractor`-actor-runtime teardown happens whatsoever.. + # this is the WORST (race) case outcome. + # + # - OR, "loud-abandon": the guest run get's abaondoned "loudly" with + # `trio` reporting a console traceback and further tbs of all + # the failed shutdown routines also show on console.. + # + # our test can thus fail and (has been parametrized for) + # the 2 cases: + # + # - when the parent raises a KBI just after + # signalling the child, + # |_silent-abandon => the `Actor.lifetime_stack` will + # never be closed thus leaking a resource! + # -> FAIL! + # |_loud-abandon => despite the abandonment at least the + # stack will be closed out.. + # -> PASS + # + # - when the parent instead simply waits on `ctx.wait_for_result()` + # (i.e. DOES not raise a KBI itself), + # |_silent-abandon => test will just hang and thus the ctx + # and actor will never be closed/cancelled/shutdown + # resulting in leaking a (file) resource since the + # `trio`/`tractor` runtime never relays a ctxc back to + # the parent; the test's timeout will trigger.. + # -> FAIL! + # |_loud-abandon => this case seems to never happen?? + # + # XXX FIRST PART XXX, SO, this is a fix to the + # "silent-abandon" case, NOT the `trio`-guest-run + # abandonment issue in general, for which the NEXT LOC + # is apparently a working fix! + actor.cancel_soon() + + # XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to + # `trio`-guest-run to complete and teardown !! + # + # XXX WITHOUT THIS the guest-run gets race-conditionally + # abandoned by `asyncio`!! + # XD XD XD + await asyncio.shield( + asyncio.sleep(.1) # NOPE! it can't be 0 either XD + ) + raise # might as well if it's installed. try: @@ -599,4 +700,6 @@ def run_as_asyncio_guest( except ImportError: pass - return asyncio.run(aio_main(trio_main)) + return asyncio.run( + aio_main(trio_main), + ) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index a5d3187..977b682 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -156,11 +156,12 @@ class BroadcastState(Struct): class BroadcastReceiver(ReceiveChannel): ''' - A memory receive channel broadcaster which is non-lossy for the - fastest consumer. + A memory receive channel broadcaster which is non-lossy for + the fastest consumer. - Additional consumer tasks can receive all produced values by registering - with ``.subscribe()`` and receiving from the new instance it delivers. + Additional consumer tasks can receive all produced values by + registering with ``.subscribe()`` and receiving from the new + instance it delivers. ''' def __init__( diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 08e70ad..fd224d6 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -18,8 +18,12 @@ Async context manager primitives with hard ``trio``-aware semantics ''' -from contextlib import asynccontextmanager as acm +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, +) import inspect +from types import ModuleType from typing import ( Any, AsyncContextManager, @@ -30,13 +34,16 @@ from typing import ( Optional, Sequence, TypeVar, + TYPE_CHECKING, ) import trio - from tractor._state import current_actor from tractor.log import get_logger +if TYPE_CHECKING: + from tractor import ActorNursery + log = get_logger(__name__) @@ -46,8 +53,10 @@ T = TypeVar("T") @acm async def maybe_open_nursery( - nursery: trio.Nursery | None = None, + nursery: trio.Nursery|ActorNursery|None = None, shield: bool = False, + lib: ModuleType = trio, + ) -> AsyncGenerator[trio.Nursery, Any]: ''' Create a new nursery if None provided. @@ -58,13 +67,12 @@ async def maybe_open_nursery( if nursery is not None: yield nursery else: - async with trio.open_nursery() as nursery: + async with lib.open_nursery() as nursery: nursery.cancel_scope.shield = shield yield nursery async def _enter_and_wait( - mngr: AsyncContextManager[T], unwrapped: dict[int, T], all_entered: trio.Event, @@ -91,7 +99,6 @@ async def _enter_and_wait( @acm async def gather_contexts( - mngrs: Sequence[AsyncContextManager[T]], ) -> AsyncGenerator[ @@ -102,15 +109,17 @@ async def gather_contexts( None, ]: ''' - Concurrently enter a sequence of async context managers, each in - a separate ``trio`` task and deliver the unwrapped values in the - same order once all managers have entered. On exit all contexts are - subsequently and concurrently exited. + Concurrently enter a sequence of async context managers (acms), + each from a separate `trio` task and deliver the unwrapped + `yield`-ed values in the same order once all managers have entered. - This function is somewhat similar to common usage of - ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in - combo with ``asyncio.gather()`` except the managers are concurrently - entered and exited, and cancellation just works. + On exit, all acms are subsequently and concurrently exited. + + This function is somewhat similar to a batch of non-blocking + calls to `contextlib.AsyncExitStack.enter_async_context()` + (inside a loop) *in combo with* a `asyncio.gather()` to get the + `.__aenter__()`-ed values, except the managers are both + concurrently entered and exited and *cancellation just works*(R). ''' seed: int = id(mngrs) @@ -210,9 +219,10 @@ async def maybe_open_context( ) -> AsyncIterator[tuple[bool, T]]: ''' - Maybe open a context manager if there is not already a _Cached - version for the provided ``key`` for *this* actor. Return the - _Cached instance on a _Cache hit. + Maybe open an async-context-manager (acm) if there is not already + a `_Cached` version for the provided (input) `key` for *this* actor. + + Return the `_Cached` instance on a _Cache hit. ''' fid = id(acm_func) @@ -273,8 +283,13 @@ async def maybe_open_context( else: _Cache.users += 1 log.runtime( - f'Reusing resource for `_Cache` user {_Cache.users}\n\n' - f'{ctx_key!r} -> {yielded!r}\n' + f'Re-using cached resource for user {_Cache.users}\n\n' + f'{ctx_key!r} -> {type(yielded)}\n' + + # TODO: make this work with values but without + # `msgspec.Struct` causing frickin crashes on field-type + # lookups.. + # f'{ctx_key!r} -> {yielded!r}\n' ) lock.release() yield True, yielded