Compare commits
	
		
			1 Commits 
		
	
	
		
			5ed30dec40
			...
			284fa0340e
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 284fa0340e | 
|  | @ -1,6 +1,7 @@ | |||
| """ | ||||
| ``tractor`` testing!! | ||||
| """ | ||||
| from functools import partial | ||||
| import sys | ||||
| import subprocess | ||||
| import os | ||||
|  | @ -8,6 +9,9 @@ import random | |||
| import signal | ||||
| import platform | ||||
| import time | ||||
| from typing import ( | ||||
|     AsyncContextManager, | ||||
| ) | ||||
| 
 | ||||
| import pytest | ||||
| import tractor | ||||
|  | @ -150,6 +154,18 @@ def pytest_generate_tests(metafunc): | |||
|         metafunc.parametrize("start_method", [spawn_backend], scope='module') | ||||
| 
 | ||||
| 
 | ||||
| # TODO: a way to let test scripts (like from `examples/`) | ||||
| # guarantee they won't registry addr collide! | ||||
| @pytest.fixture | ||||
| def open_test_runtime( | ||||
|     reg_addr: tuple, | ||||
| ) -> AsyncContextManager: | ||||
|     return partial( | ||||
|         tractor.open_nursery, | ||||
|         registry_addrs=[reg_addr], | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def sig_prog(proc, sig): | ||||
|     "Kill the actor-process with ``sig``." | ||||
|     proc.send_signal(sig) | ||||
|  |  | |||
|  | @ -41,7 +41,7 @@ from tractor.msg import ( | |||
| from tractor.msg.types import ( | ||||
|     _payload_msgs, | ||||
|     log, | ||||
|     Msg, | ||||
|     PayloadMsg, | ||||
|     Started, | ||||
|     mk_msg_spec, | ||||
| ) | ||||
|  | @ -61,7 +61,7 @@ def mk_custom_codec( | |||
|     uid: tuple[str, str] = tractor.current_actor().uid | ||||
| 
 | ||||
|     # XXX NOTE XXX: despite defining `NamespacePath` as a type | ||||
|     # field on our `Msg.pld`, we still need a enc/dec_hook() pair | ||||
|     # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair | ||||
|     # to cast to/from that type on the wire. See the docs: | ||||
|     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||
| 
 | ||||
|  | @ -321,12 +321,12 @@ def dec_type_union( | |||
|     import importlib | ||||
|     types: list[Type] = [] | ||||
|     for type_name in type_names: | ||||
|         for ns in [ | ||||
|         for mod in [ | ||||
|             typing, | ||||
|             importlib.import_module(__name__), | ||||
|         ]: | ||||
|             if type_ref := getattr( | ||||
|                 ns, | ||||
|                 mod, | ||||
|                 type_name, | ||||
|                 False, | ||||
|             ): | ||||
|  | @ -744,7 +744,7 @@ def chk_pld_type( | |||
|     # 'Error',  .pld: ErrorData | ||||
| 
 | ||||
|     codec: MsgCodec = mk_codec( | ||||
|         # NOTE: this ONLY accepts `Msg.pld` fields of a specified | ||||
|         # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified | ||||
|         # type union. | ||||
|         ipc_pld_spec=payload_spec, | ||||
|     ) | ||||
|  | @ -752,7 +752,7 @@ def chk_pld_type( | |||
|     # make a one-off dec to compare with our `MsgCodec` instance | ||||
|     # which does the below `mk_msg_spec()` call internally | ||||
|     ipc_msg_spec: Union[Type[Struct]] | ||||
|     msg_types: list[Msg[payload_spec]] | ||||
|     msg_types: list[PayloadMsg[payload_spec]] | ||||
|     ( | ||||
|         ipc_msg_spec, | ||||
|         msg_types, | ||||
|  | @ -761,7 +761,7 @@ def chk_pld_type( | |||
|     ) | ||||
|     _enc = msgpack.Encoder() | ||||
|     _dec = msgpack.Decoder( | ||||
|         type=ipc_msg_spec or Any,  # like `Msg[Any]` | ||||
|         type=ipc_msg_spec or Any,  # like `PayloadMsg[Any]` | ||||
|     ) | ||||
| 
 | ||||
|     assert ( | ||||
|  | @ -806,7 +806,7 @@ def chk_pld_type( | |||
|             'cid': '666', | ||||
|             'pld': pld, | ||||
|         } | ||||
|         enc_msg: Msg = typedef(**kwargs) | ||||
|         enc_msg: PayloadMsg = typedef(**kwargs) | ||||
| 
 | ||||
|         _wire_bytes: bytes = _enc.encode(enc_msg) | ||||
|         wire_bytes: bytes = codec.enc.encode(enc_msg) | ||||
|  | @ -883,25 +883,16 @@ def test_limit_msgspec(): | |||
|             debug_mode=True | ||||
|         ): | ||||
| 
 | ||||
|             # ensure we can round-trip a boxing `Msg` | ||||
|             # ensure we can round-trip a boxing `PayloadMsg` | ||||
|             assert chk_pld_type( | ||||
|                 # Msg, | ||||
|                 Any, | ||||
|                 None, | ||||
|                 payload_spec=Any, | ||||
|                 pld=None, | ||||
|                 expect_roundtrip=True, | ||||
|             ) | ||||
| 
 | ||||
|             # TODO: don't need this any more right since | ||||
|             # `msgspec>=0.15` has the nice generics stuff yah?? | ||||
|             # | ||||
|             # manually override the type annot of the payload | ||||
|             # field and ensure it propagates to all msg-subtypes. | ||||
|             # Msg.__annotations__['pld'] = Any | ||||
| 
 | ||||
|             # verify that a mis-typed payload value won't decode | ||||
|             assert not chk_pld_type( | ||||
|                 # Msg, | ||||
|                 int, | ||||
|                 payload_spec=int, | ||||
|                 pld='doggy', | ||||
|             ) | ||||
| 
 | ||||
|  | @ -913,18 +904,16 @@ def test_limit_msgspec(): | |||
|                 value: Any | ||||
| 
 | ||||
|             assert not chk_pld_type( | ||||
|                 # Msg, | ||||
|                 CustomPayload, | ||||
|                 payload_spec=CustomPayload, | ||||
|                 pld='doggy', | ||||
|             ) | ||||
| 
 | ||||
|             assert chk_pld_type( | ||||
|                 # Msg, | ||||
|                 CustomPayload, | ||||
|                 payload_spec=CustomPayload, | ||||
|                 pld=CustomPayload(name='doggy', value='urmom') | ||||
|             ) | ||||
| 
 | ||||
|             # uhh bc we can `.pause_from_sync()` now! :surfer: | ||||
|             # yah, we can `.pause_from_sync()` now! | ||||
|             # breakpoint() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  |  | |||
|  | @ -1336,6 +1336,23 @@ def test_shield_pause( | |||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: better error for "non-ideal" usage from the root actor. | ||||
| # -[ ] if called from an async scope emit a message that suggests | ||||
| #    using `await tractor.pause()` instead since it's less overhead | ||||
| #    (in terms of `greenback` and/or extra threads) and if it's from | ||||
| #    a sync scope suggest that usage must first call | ||||
| #    `ensure_portal()` in the (eventual parent) async calling scope? | ||||
| def test_sync_pause_from_bg_task_in_root_actor_(): | ||||
|     ''' | ||||
|     When used from the root actor, normally we can only implicitly | ||||
|     support `.pause_from_sync()` from the main-parent-task (that | ||||
|     opens the runtime via `open_root_actor()`) since `greenback` | ||||
|     requires a `.ensure_portal()` call per `trio.Task` where it is | ||||
|     used. | ||||
| 
 | ||||
|     ''' | ||||
|     ... | ||||
| 
 | ||||
| # TODO: needs ANSI code stripping tho, see `assert_before()` # above! | ||||
| def test_correct_frames_below_hidden(): | ||||
|     ''' | ||||
|  |  | |||
|  | @ -19,7 +19,7 @@ from tractor._testing import ( | |||
| @pytest.fixture | ||||
| def run_example_in_subproc( | ||||
|     loglevel: str, | ||||
|     testdir, | ||||
|     testdir: pytest.Testdir, | ||||
|     reg_addr: tuple[str, int], | ||||
| ): | ||||
| 
 | ||||
|  |  | |||
|  | @ -2,16 +2,25 @@ | |||
| The hipster way to force SC onto the stdlib's "async": 'infection mode'. | ||||
| 
 | ||||
| ''' | ||||
| from typing import Optional, Iterable, Union | ||||
| import asyncio | ||||
| import builtins | ||||
| from contextlib import ExitStack | ||||
| import itertools | ||||
| import importlib | ||||
| import os | ||||
| from pathlib import Path | ||||
| import signal | ||||
| from typing import ( | ||||
|     Callable, | ||||
|     Iterable, | ||||
|     Union, | ||||
| ) | ||||
| 
 | ||||
| import pytest | ||||
| import trio | ||||
| import tractor | ||||
| from tractor import ( | ||||
|     current_actor, | ||||
|     to_asyncio, | ||||
|     RemoteActorError, | ||||
|     ContextCancelled, | ||||
|  | @ -25,8 +34,8 @@ async def sleep_and_err( | |||
| 
 | ||||
|     # just signature placeholders for compat with | ||||
|     # ``to_asyncio.open_channel_from()`` | ||||
|     to_trio: Optional[trio.MemorySendChannel] = None, | ||||
|     from_trio: Optional[asyncio.Queue] = None, | ||||
|     to_trio: trio.MemorySendChannel|None = None, | ||||
|     from_trio: asyncio.Queue|None = None, | ||||
| 
 | ||||
| ): | ||||
|     if to_trio: | ||||
|  | @ -36,7 +45,7 @@ async def sleep_and_err( | |||
|     assert 0 | ||||
| 
 | ||||
| 
 | ||||
| async def sleep_forever(): | ||||
| async def aio_sleep_forever(): | ||||
|     await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -44,7 +53,7 @@ async def trio_cancels_single_aio_task(): | |||
| 
 | ||||
|     # spawn an ``asyncio`` task to run a func and return result | ||||
|     with trio.move_on_after(.2): | ||||
|         await tractor.to_asyncio.run_task(sleep_forever) | ||||
|         await tractor.to_asyncio.run_task(aio_sleep_forever) | ||||
| 
 | ||||
| 
 | ||||
| def test_trio_cancels_aio_on_actor_side(reg_addr): | ||||
|  | @ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr): | |||
| 
 | ||||
| 
 | ||||
| async def asyncio_actor( | ||||
| 
 | ||||
|     target: str, | ||||
|     expect_err: Exception|None = None | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     assert tractor.current_actor().is_infected_aio() | ||||
|     target = globals()[target] | ||||
|     target: Callable = globals()[target] | ||||
| 
 | ||||
|     if '.' in expect_err: | ||||
|         modpath, _, name = expect_err.rpartition('.') | ||||
|  | @ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr): | |||
|         async with tractor.open_nursery() as n: | ||||
|             portal = await n.run_in_actor( | ||||
|                 asyncio_actor, | ||||
|                 target='sleep_forever', | ||||
|                 target='aio_sleep_forever', | ||||
|                 expect_err='trio.Cancelled', | ||||
|                 infect_asyncio=True, | ||||
|             ) | ||||
|  | @ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr): | |||
|             async with tractor.open_nursery() as n: | ||||
|                 await n.run_in_actor( | ||||
|                     asyncio_actor, | ||||
|                     target='sleep_forever', | ||||
|                     target='aio_sleep_forever', | ||||
|                     expect_err='trio.Cancelled', | ||||
|                     infect_asyncio=True, | ||||
|                 ) | ||||
|  | @ -195,7 +203,7 @@ async def trio_ctx( | |||
|             # spawn another asyncio task for the cuck of it. | ||||
|             n.start_soon( | ||||
|                 tractor.to_asyncio.run_task, | ||||
|                 sleep_forever, | ||||
|                 aio_sleep_forever, | ||||
|             ) | ||||
|             await trio.sleep_forever() | ||||
| 
 | ||||
|  | @ -285,7 +293,7 @@ async def aio_cancel(): | |||
| 
 | ||||
|     # cancel and enter sleep | ||||
|     task.cancel() | ||||
|     await sleep_forever() | ||||
|     await aio_sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): | ||||
|  | @ -355,7 +363,6 @@ async def push_from_aio_task( | |||
| 
 | ||||
| 
 | ||||
| async def stream_from_aio( | ||||
| 
 | ||||
|     exit_early: bool = False, | ||||
|     raise_err: bool = False, | ||||
|     aio_raise_err: bool = False, | ||||
|  | @ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics( | |||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def manage_file( | ||||
|     ctx: tractor.Context, | ||||
|     tmp_path_str: str, | ||||
|     bg_aio_task: bool = False, | ||||
| ): | ||||
|     ''' | ||||
|     Start an `asyncio` task that just sleeps after registering a context | ||||
|     with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree | ||||
|     and ensure the stack is closed in the infected mode child. | ||||
| 
 | ||||
|     To verify the teardown state just write a tmpfile to the `testdir` | ||||
|     and delete it on actor close. | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
|     tmp_path: Path = Path(tmp_path_str) | ||||
|     tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file' | ||||
| 
 | ||||
|     # create a the tmp file and tell the parent where it's at | ||||
|     assert not tmp_file.is_file() | ||||
|     tmp_file.touch() | ||||
| 
 | ||||
|     stack: ExitStack = current_actor().lifetime_stack | ||||
|     stack.callback(tmp_file.unlink) | ||||
| 
 | ||||
|     await ctx.started(( | ||||
|         str(tmp_file), | ||||
|         os.getpid(), | ||||
|     )) | ||||
| 
 | ||||
|     # expect to be cancelled from here! | ||||
|     try: | ||||
| 
 | ||||
|         # NOTE: turns out you don't even need to sched an aio task | ||||
|         # since the original issue, even though seemingly was due to | ||||
|         # the guest-run being abandoned + a `._debug.pause()` inside | ||||
|         # `._runtime._async_main()` (which was originally trying to | ||||
|         # debug the `.lifetime_stack` not closing), IS NOT actually | ||||
|         # the core issue? | ||||
|         # | ||||
|         # further notes: | ||||
|         # | ||||
|         # - `trio` only issues the " RuntimeWarning: Trio guest run | ||||
|         #   got abandoned without properly finishing... weird stuff | ||||
|         #   might happen" IFF you DO run a asyncio task here, BUT | ||||
|         # - the original issue of the `.lifetime_stack` not closing | ||||
|         #   will still happen even if you don't run an `asyncio` task | ||||
|         #   here even though the "abandon" messgage won't be shown.. | ||||
|         # | ||||
|         # => ????? honestly i'm lost but it seems to be some issue | ||||
|         #   with `asyncio` and SIGINT.. | ||||
|         # | ||||
|         # XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and | ||||
|         # `run_as_asyncio_guest()` is written WITHOUT THE | ||||
|         # `.cancel_soon()` soln, both of these tests will pass ?? | ||||
|         # so maybe it has something to do with `asyncio` loop init | ||||
|         # state?!? | ||||
|         # honestly, this REALLY reminds me why i haven't used | ||||
|         # `asyncio` by choice in years.. XD | ||||
|         # | ||||
|         # await tractor.to_asyncio.run_task(aio_sleep_forever) | ||||
|         if bg_aio_task: | ||||
|             async with trio.open_nursery() as tn: | ||||
|                 tn.start_soon( | ||||
|                     tractor.to_asyncio.run_task, | ||||
|                     aio_sleep_forever, | ||||
|                 ) | ||||
| 
 | ||||
|         await trio.sleep_forever() | ||||
| 
 | ||||
|     # signalled manually at the OS level (aka KBI) by the parent actor. | ||||
|     except KeyboardInterrupt: | ||||
|         print('child raised KBI..') | ||||
|         assert tmp_file.exists() | ||||
|         raise | ||||
|     else: | ||||
|         raise RuntimeError('shoulda received a KBI?') | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'bg_aio_task', | ||||
|     [ | ||||
|         False, | ||||
| 
 | ||||
|         # NOTE: (and see notes in `manage_file()` above as well) if | ||||
|         # we FOR SURE SPAWN AN AIO TASK in the child it seems the | ||||
|         # "silent-abandon" case (as is described in detail in | ||||
|         # `to_asyncio.run_as_asyncio_guest()`) does not happen and | ||||
|         # `asyncio`'s loop will at least abandon the `trio` side | ||||
|         # loudly? .. prolly the state-spot to start looking for | ||||
|         # a soln that results in NO ABANDONMENT.. XD | ||||
|         True, | ||||
|     ], | ||||
|     ids=[ | ||||
|         'bg_aio_task', | ||||
|         'just_trio_slee', | ||||
|     ], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'wait_for_ctx', | ||||
|     [ | ||||
|         False, | ||||
|         True, | ||||
|     ], | ||||
|     ids=[ | ||||
|         'raise_KBI_in_rent', | ||||
|         'wait_for_ctx', | ||||
|     ], | ||||
| ) | ||||
| def test_sigint_closes_lifetime_stack( | ||||
|     tmp_path: Path, | ||||
|     wait_for_ctx: bool, | ||||
|     bg_aio_task: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Ensure that an infected child can use the `Actor.lifetime_stack` | ||||
|     to make a file on boot and it's automatically cleaned up by the | ||||
|     actor-lifetime-linked exit stack closure. | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
|         try: | ||||
|             async with tractor.open_nursery() as n: | ||||
|                 p = await n.start_actor( | ||||
|                     'file_mngr', | ||||
|                     enable_modules=[__name__], | ||||
|                     infect_asyncio=True, | ||||
|                 ) | ||||
|                 async with p.open_context( | ||||
|                     manage_file, | ||||
|                     tmp_path_str=str(tmp_path), | ||||
|                     bg_aio_task=bg_aio_task, | ||||
|                 ) as (ctx, first): | ||||
| 
 | ||||
|                     path_str, cpid = first | ||||
|                     tmp_file: Path = Path(path_str) | ||||
|                     assert tmp_file.exists() | ||||
| 
 | ||||
|                     # XXX originally to simulate what (hopefully) | ||||
|                     # the below now triggers.. had to manually | ||||
|                     # trigger a SIGINT from a ctl-c in the root. | ||||
|                     # await trio.sleep_forever() | ||||
| 
 | ||||
|                     # XXX NOTE XXX signal infected-`asyncio` child to | ||||
|                     # OS-cancel with SIGINT; this should trigger the | ||||
|                     # bad `asyncio` cancel behaviour that can cause | ||||
|                     # a guest-run abandon as was seen causing | ||||
|                     # shm-buffer leaks in `piker`'s live quote stream | ||||
|                     # susbys! | ||||
|                     # | ||||
|                     # await trio.sleep(.5) | ||||
|                     await trio.sleep(.2) | ||||
|                     os.kill( | ||||
|                         cpid, | ||||
|                         signal.SIGINT, | ||||
|                     ) | ||||
| 
 | ||||
|                     # XXX CASE 1: without the bug fixed, in | ||||
|                     # the non-KBI-raised-in-parent case, this | ||||
|                     # timeout should trigger! | ||||
|                     if wait_for_ctx: | ||||
|                         print('waiting for ctx outcome in parent..') | ||||
|                         try: | ||||
|                             with trio.fail_after(.7): | ||||
|                                 await ctx.wait_for_result() | ||||
|                         except tractor.ContextCancelled as ctxc: | ||||
|                             assert ctxc.canceller == ctx.chan.uid | ||||
|                             raise | ||||
| 
 | ||||
|                     # XXX CASE 2: this seems to be the source of the | ||||
|                     # original issue which exhibited BEFORE we put | ||||
|                     # a `Actor.cancel_soon()` inside | ||||
|                     # `run_as_asyncio_guest()`.. | ||||
|                     else: | ||||
|                         raise KeyboardInterrupt | ||||
| 
 | ||||
|                 pytest.fail('should have raised some kinda error?!?') | ||||
| 
 | ||||
|         except ( | ||||
|             KeyboardInterrupt, | ||||
|             ContextCancelled, | ||||
|         ): | ||||
|             # XXX CASE 2: without the bug fixed, in the | ||||
|             # KBI-raised-in-parent case, the actor teardown should | ||||
|             # never get run (silently abaondoned by `asyncio`..) and | ||||
|             # thus the file should leak! | ||||
|             assert not tmp_file.exists() | ||||
|             assert ctx.maybe_error | ||||
| 
 | ||||
|     trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: debug_mode tests once we get support for `asyncio`! | ||||
| # | ||||
| # -[ ] need tests to wrap both scripts: | ||||
|  |  | |||
|  | @ -121,10 +121,19 @@ class Unresolved: | |||
| @dataclass | ||||
| class Context: | ||||
|     ''' | ||||
|     An inter-actor, SC transitive, `Task` communication context. | ||||
|     An inter-actor, SC transitive, `trio.Task` (pair) | ||||
|     communication context. | ||||
| 
 | ||||
|     NB: This class should **never be instatiated directly**, it is allocated | ||||
|     by the runtime in 2 ways: | ||||
|     (We've also considered other names and ideas: | ||||
|      - "communicating tasks scope": cts | ||||
|      - "distributed task scope": dts | ||||
|      - "communicating tasks context": ctc | ||||
| 
 | ||||
|      **Got a better idea for naming? Make an issue dawg!** | ||||
|     ) | ||||
| 
 | ||||
|     NB: This class should **never be instatiated directly**, it is | ||||
|     allocated by the runtime in 2 ways: | ||||
|      - by entering `Portal.open_context()` which is the primary | ||||
|        public API for any "parent" task or, | ||||
|      - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg | ||||
|  | @ -210,6 +219,16 @@ class Context: | |||
|     # more the the `Context` is needed? | ||||
|     _portal: Portal | None = None | ||||
| 
 | ||||
|     @property | ||||
|     def portal(self) -> Portal|None: | ||||
|         ''' | ||||
|         Return any wrapping memory-`Portal` if this is | ||||
|         a 'parent'-side task which called `Portal.open_context()`, | ||||
|         otherwise `None`. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._portal | ||||
| 
 | ||||
|     # NOTE: each side of the context has its own cancel scope | ||||
|     # which is exactly the primitive that allows for | ||||
|     # cross-actor-task-supervision and thus SC. | ||||
|  | @ -299,6 +318,8 @@ class Context: | |||
|     # boxed exception. NOW, it's used for spawning overrun queuing | ||||
|     # tasks when `.allow_overruns ==  True` !!! | ||||
|     _scope_nursery: trio.Nursery|None = None | ||||
|     # ^-TODO-^ change name? | ||||
|     # -> `._scope_tn` "scope task nursery" | ||||
| 
 | ||||
|     # streaming overrun state tracking | ||||
|     _in_overrun: bool = False | ||||
|  | @ -408,10 +429,23 @@ class Context: | |||
|         ''' | ||||
|         return self._cancel_called | ||||
| 
 | ||||
|     @cancel_called.setter | ||||
|     def cancel_called(self, val: bool) -> None: | ||||
|         ''' | ||||
|         Set the self-cancelled request `bool` value. | ||||
| 
 | ||||
|         ''' | ||||
|         # to debug who frickin sets it.. | ||||
|         # if val: | ||||
|         #     from .devx import pause_from_sync | ||||
|         #     pause_from_sync() | ||||
| 
 | ||||
|         self._cancel_called = val | ||||
| 
 | ||||
|     @property | ||||
|     def canceller(self) -> tuple[str, str]|None: | ||||
|         ''' | ||||
|         ``Actor.uid: tuple[str, str]`` of the (remote) | ||||
|         `Actor.uid: tuple[str, str]` of the (remote) | ||||
|         actor-process who's task was cancelled thus causing this | ||||
|         (side of the) context to also be cancelled. | ||||
| 
 | ||||
|  | @ -515,7 +549,7 @@ class Context: | |||
| 
 | ||||
|             # the local scope was never cancelled | ||||
|             # and instead likely we received a remote side | ||||
|             # # cancellation that was raised inside `.result()` | ||||
|             # # cancellation that was raised inside `.wait_for_result()` | ||||
|             # or ( | ||||
|             #     (se := self._local_error) | ||||
|             #     and se is re | ||||
|  | @ -585,6 +619,8 @@ class Context: | |||
|         self, | ||||
|         error: BaseException, | ||||
| 
 | ||||
|         set_cancel_called: bool = False, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         (Maybe) cancel this local scope due to a received remote | ||||
|  | @ -603,7 +639,7 @@ class Context: | |||
|         - `Portal.open_context()` | ||||
|         - `Portal.result()` | ||||
|         - `Context.open_stream()` | ||||
|         - `Context.result()` | ||||
|         - `Context.wait_for_result()` | ||||
| 
 | ||||
|         when called/closed by actor local task(s). | ||||
| 
 | ||||
|  | @ -729,7 +765,7 @@ class Context: | |||
| 
 | ||||
|         # Cancel the local `._scope`, catch that | ||||
|         # `._scope.cancelled_caught` and re-raise any remote error | ||||
|         # once exiting (or manually calling `.result()`) the | ||||
|         # once exiting (or manually calling `.wait_for_result()`) the | ||||
|         # `.open_context()`  block. | ||||
|         cs: trio.CancelScope = self._scope | ||||
|         if ( | ||||
|  | @ -764,8 +800,9 @@ class Context: | |||
|                 # `trio.Cancelled` subtype here ;) | ||||
|                 # https://github.com/goodboy/tractor/issues/368 | ||||
|                 message: str = 'Cancelling `Context._scope` !\n\n' | ||||
|                 # from .devx import pause_from_sync | ||||
|                 # pause_from_sync() | ||||
|                 self._scope.cancel() | ||||
| 
 | ||||
|         else: | ||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||
|             # from .devx import mk_pdb | ||||
|  | @ -889,7 +926,7 @@ class Context: | |||
| 
 | ||||
|         ''' | ||||
|         side: str = self.side | ||||
|         self._cancel_called: bool = True | ||||
|         self.cancel_called: bool = True | ||||
| 
 | ||||
|         header: str = ( | ||||
|             f'Cancelling ctx with peer from {side.upper()} side\n\n' | ||||
|  | @ -912,7 +949,7 @@ class Context: | |||
|         # `._scope.cancel()` since we expect the eventual | ||||
|         # `ContextCancelled` from the other side to trigger this | ||||
|         # when the runtime finally receives it during teardown | ||||
|         # (normally in `.result()` called from | ||||
|         # (normally in `.wait_for_result()` called from | ||||
|         # `Portal.open_context().__aexit__()`) | ||||
|         if side == 'parent': | ||||
|             if not self._portal: | ||||
|  | @ -1025,10 +1062,10 @@ class Context: | |||
| 
 | ||||
|         ''' | ||||
|         __tracebackhide__: bool = hide_tb | ||||
|         our_uid: tuple = self.chan.uid | ||||
|         peer_uid: tuple = self.chan.uid | ||||
| 
 | ||||
|         # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption | ||||
|         # for "graceful cancellation" case: | ||||
|         # for "graceful cancellation" case(s): | ||||
|         # | ||||
|         # Whenever a "side" of a context (a `Task` running in | ||||
|         # an actor) **is** the side which requested ctx | ||||
|  | @ -1045,9 +1082,11 @@ class Context: | |||
|         # set to the `Actor.uid` of THIS task (i.e. the | ||||
|         # cancellation requesting task's actor is the actor | ||||
|         # checking whether it should absorb the ctxc). | ||||
|         self_ctxc: bool = self._is_self_cancelled(remote_error) | ||||
|         if ( | ||||
|             self_ctxc | ||||
|             and | ||||
|             not raise_ctxc_from_self_call | ||||
|             and self._is_self_cancelled(remote_error) | ||||
| 
 | ||||
|             # TODO: ?potentially it is useful to emit certain | ||||
|             # warning/cancel logs for the cases where the | ||||
|  | @ -1077,8 +1116,8 @@ class Context: | |||
|             and isinstance(remote_error, RemoteActorError) | ||||
|             and remote_error.boxed_type is StreamOverrun | ||||
| 
 | ||||
|             # and tuple(remote_error.msgdata['sender']) == our_uid | ||||
|             and tuple(remote_error.sender) == our_uid | ||||
|             # and tuple(remote_error.msgdata['sender']) == peer_uid | ||||
|             and tuple(remote_error.sender) == peer_uid | ||||
|         ): | ||||
|             # NOTE: we set the local scope error to any "self | ||||
|             # cancellation" error-response thus "absorbing" | ||||
|  | @ -1140,9 +1179,9 @@ class Context: | |||
|         of the remote cancellation. | ||||
| 
 | ||||
|         ''' | ||||
|         __tracebackhide__ = hide_tb | ||||
|         __tracebackhide__: bool = False | ||||
|         assert self._portal, ( | ||||
|             "Context.result() can not be called from callee side!" | ||||
|             '`Context.wait_for_result()` can not be called from callee side!' | ||||
|         ) | ||||
|         if self._final_result_is_set(): | ||||
|             return self._result | ||||
|  | @ -1169,7 +1208,8 @@ class Context: | |||
|                 drained_msgs, | ||||
|             ) = await msgops.drain_to_final_msg( | ||||
|                 ctx=self, | ||||
|                 hide_tb=hide_tb, | ||||
|                 # hide_tb=hide_tb, | ||||
|                 hide_tb=False, | ||||
|             ) | ||||
| 
 | ||||
|             drained_status: str = ( | ||||
|  | @ -1185,6 +1225,8 @@ class Context: | |||
| 
 | ||||
|             log.cancel(drained_status) | ||||
| 
 | ||||
|         # __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|         self.maybe_raise( | ||||
|             # NOTE: obvi we don't care if we | ||||
|             # overran the far end if we're already | ||||
|  | @ -1197,7 +1239,8 @@ class Context: | |||
|                 # raising something we know might happen | ||||
|                 # during cancellation ;) | ||||
|                 (not self._cancel_called) | ||||
|             ) | ||||
|             ), | ||||
|             hide_tb=hide_tb, | ||||
|         ) | ||||
|         # TODO: eventually make `.outcome: Outcome` and thus return | ||||
|         # `self.outcome.unwrap()` here! | ||||
|  | @ -1583,7 +1626,7 @@ class Context: | |||
| 
 | ||||
|         - NEVER `return` early before delivering the msg! | ||||
|           bc if the error is a ctxc and there is a task waiting on | ||||
|           `.result()` we need the msg to be | ||||
|           `.wait_for_result()` we need the msg to be | ||||
|           `send_chan.send_nowait()`-ed over the `._rx_chan` so | ||||
|           that the error is relayed to that waiter task and thus | ||||
|           raised in user code! | ||||
|  | @ -1828,7 +1871,7 @@ async def open_context_from_portal( | |||
|     When the "callee" (side that is "called"/started by a call | ||||
|     to *this* method) returns, the caller side (this) unblocks | ||||
|     and any final value delivered from the other end can be | ||||
|     retrieved using the `Contex.result()` api. | ||||
|     retrieved using the `Contex.wait_for_result()` api. | ||||
| 
 | ||||
|     The yielded ``Context`` instance further allows for opening | ||||
|     bidirectional streams, explicit cancellation and | ||||
|  | @ -1965,14 +2008,14 @@ async def open_context_from_portal( | |||
|             yield ctx, first | ||||
| 
 | ||||
|             # ??TODO??: do we still want to consider this or is | ||||
|             # the `else:` block handling via a `.result()` | ||||
|             # the `else:` block handling via a `.wait_for_result()` | ||||
|             # call below enough?? | ||||
|             # | ||||
|             # -[ ] pretty sure `.result()` internals do the | ||||
|             # -[ ] pretty sure `.wait_for_result()` internals do the | ||||
|             # same as our ctxc handler below so it ended up | ||||
|             # being same (repeated?) behaviour, but ideally we | ||||
|             # wouldn't have that duplication either by somehow | ||||
|             # factoring the `.result()` handler impl in a way | ||||
|             # factoring the `.wait_for_result()` handler impl in a way | ||||
|             # that we can re-use it around the `yield` ^ here | ||||
|             # or vice versa? | ||||
|             # | ||||
|  | @ -2110,7 +2153,7 @@ async def open_context_from_portal( | |||
|         #    AND a group-exc is only raised if there was > 1 | ||||
|         #    tasks started *here* in the "caller" / opener | ||||
|         #    block. If any one of those tasks calls | ||||
|         #    `.result()` or `MsgStream.receive()` | ||||
|         #    `.wait_for_result()` or `MsgStream.receive()` | ||||
|         #    `._maybe_raise_remote_err()` will be transitively | ||||
|         #    called and the remote error raised causing all | ||||
|         #    tasks to be cancelled. | ||||
|  | @ -2180,7 +2223,7 @@ async def open_context_from_portal( | |||
|                 f'|_{ctx._task}\n' | ||||
|             ) | ||||
|             # XXX NOTE XXX: the below call to | ||||
|             # `Context.result()` will ALWAYS raise | ||||
|             # `Context.wait_for_result()` will ALWAYS raise | ||||
|             # a `ContextCancelled` (via an embedded call to | ||||
|             # `Context._maybe_raise_remote_err()`) IFF | ||||
|             # a `Context._remote_error` was set by the runtime | ||||
|  | @ -2190,10 +2233,10 @@ async def open_context_from_portal( | |||
|             # ALWAYS SET any time "callee" side fails and causes "caller | ||||
|             # side" cancellation via a `ContextCancelled` here. | ||||
|             try: | ||||
|                 result_or_err: Exception|Any = await ctx.result() | ||||
|                 result_or_err: Exception|Any = await ctx.wait_for_result() | ||||
|             except BaseException as berr: | ||||
|                 # on normal teardown, if we get some error | ||||
|                 # raised in `Context.result()` we still want to | ||||
|                 # raised in `Context.wait_for_result()` we still want to | ||||
|                 # save that error on the ctx's state to | ||||
|                 # determine things like `.cancelled_caught` for | ||||
|                 # cases where there was remote cancellation but | ||||
|  |  | |||
|  | @ -56,14 +56,12 @@ async def get_registry( | |||
| ]: | ||||
|     ''' | ||||
|     Return a portal instance connected to a local or remote | ||||
|     arbiter. | ||||
|     registry-service actor; if a connection already exists re-use it | ||||
|     (presumably to call a `.register_actor()` registry runtime RPC | ||||
|     ep). | ||||
| 
 | ||||
|     ''' | ||||
|     actor = current_actor() | ||||
| 
 | ||||
|     if not actor: | ||||
|         raise RuntimeError("No actor instance has been defined yet?") | ||||
| 
 | ||||
|     actor: Actor = current_actor() | ||||
|     if actor.is_registrar: | ||||
|         # we're already the arbiter | ||||
|         # (likely a re-entrant call from the arbiter actor) | ||||
|  | @ -72,6 +70,8 @@ async def get_registry( | |||
|             Channel((host, port)) | ||||
|         ) | ||||
|     else: | ||||
|         # TODO: try to look pre-existing connection from | ||||
|         # `Actor._peers` and use it instead? | ||||
|         async with ( | ||||
|             _connect_chan(host, port) as chan, | ||||
|             open_portal(chan) as regstr_ptl, | ||||
|  |  | |||
|  | @ -20,7 +20,8 @@ Sub-process entry points. | |||
| """ | ||||
| from __future__ import annotations | ||||
| from functools import partial | ||||
| # import textwrap | ||||
| import os | ||||
| import textwrap | ||||
| from typing import ( | ||||
|     Any, | ||||
|     TYPE_CHECKING, | ||||
|  | @ -58,7 +59,7 @@ def _mp_main( | |||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     The routine called *after fork* which invokes a fresh ``trio.run`` | ||||
|     The routine called *after fork* which invokes a fresh `trio.run()` | ||||
| 
 | ||||
|     ''' | ||||
|     actor._forkserver_info = forkserver_info | ||||
|  | @ -96,6 +97,35 @@ def _mp_main( | |||
|         log.info(f"Subactor {actor.uid} terminated") | ||||
| 
 | ||||
| 
 | ||||
| # TODO: move this to some kinda `.devx._conc_lang.py` eventually | ||||
| # as we work out our multi-domain state-flow-syntax! | ||||
| def nest_from_op( | ||||
|     input_op: str, | ||||
|     tree_str: str, | ||||
| 
 | ||||
|     back_from_op: int = 1, | ||||
| ) -> str: | ||||
|     ''' | ||||
|     Depth-increment the input (presumably hierarchy/supervision) | ||||
|     input "tree string" below the provided `input_op` execution | ||||
|     operator, so injecting a `"\n|_{input_op}\n"`and indenting the | ||||
|     `tree_str` to nest content aligned with the ops last char. | ||||
| 
 | ||||
|     ''' | ||||
|     return ( | ||||
|         f'{input_op}\n' | ||||
|         + | ||||
|         textwrap.indent( | ||||
|             tree_str, | ||||
|             prefix=( | ||||
|                 len(input_op) | ||||
|                 - | ||||
|                 back_from_op | ||||
|             ) *' ', | ||||
|         ) | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def _trio_main( | ||||
|     actor: Actor, | ||||
|     *, | ||||
|  | @ -119,7 +149,6 @@ def _trio_main( | |||
| 
 | ||||
|     if actor.loglevel is not None: | ||||
|         get_console_log(actor.loglevel) | ||||
|         import os | ||||
|         actor_info: str = ( | ||||
|             f'|_{actor}\n' | ||||
|             f'  uid: {actor.uid}\n' | ||||
|  | @ -128,13 +157,29 @@ def _trio_main( | |||
|             f'  loglevel: {actor.loglevel}\n' | ||||
|         ) | ||||
|         log.info( | ||||
|             'Started new trio subactor:\n' | ||||
|             'Started new `trio` subactor:\n' | ||||
|             + | ||||
|             '>\n'  # like a "started/play"-icon from super perspective | ||||
|             + | ||||
|             actor_info, | ||||
|             nest_from_op( | ||||
|                 input_op='(>',  # like a "started/play"-icon from super perspective | ||||
|                 tree_str=actor_info, | ||||
|             ) | ||||
|             # '>(\n'  # like a "started/play"-icon from super perspective | ||||
|             # + | ||||
|             # actor_info, | ||||
|         ) | ||||
|     logmeth = log.info | ||||
|     message: str = ( | ||||
|     # log.info( | ||||
|         'Subactor terminated\n' | ||||
|         + | ||||
|         nest_from_op( | ||||
|             input_op=')>',  # like a "started/play"-icon from super perspective | ||||
|             tree_str=actor_info, | ||||
|         ) | ||||
|         # 'x\n'  # like a "crossed-out/killed" from super perspective | ||||
|         # + | ||||
|         # actor_info | ||||
|     ) | ||||
| 
 | ||||
|     try: | ||||
|         if infect_asyncio: | ||||
|             actor._infected_aio = True | ||||
|  | @ -143,16 +188,18 @@ def _trio_main( | |||
|             trio.run(trio_main) | ||||
| 
 | ||||
|     except KeyboardInterrupt: | ||||
|         log.cancel( | ||||
|             'Actor received KBI\n' | ||||
|         logmeth = log.cancel | ||||
|         message: str = ( | ||||
|             'Actor received KBI (aka an OS-cancel)\n' | ||||
|             + | ||||
|             actor_info | ||||
|             nest_from_op( | ||||
|                 input_op='c)>',  # like a "started/play"-icon from super perspective | ||||
|                 tree_str=actor_info, | ||||
|             ) | ||||
|         ) | ||||
|     except BaseException: | ||||
|         log.exception('Actor crashed exit?') | ||||
|         raise | ||||
| 
 | ||||
|     finally: | ||||
|         log.info( | ||||
|             'Subactor terminated\n' | ||||
|             + | ||||
|             'x\n'  # like a "crossed-out/killed" from super perspective | ||||
|             + | ||||
|             actor_info | ||||
|         ) | ||||
|         logmeth(message) | ||||
|  |  | |||
|  | @ -922,15 +922,6 @@ class NoRuntime(RuntimeError): | |||
|     "The root actor has not been initialized yet" | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| class AsyncioCancelled(Exception): | ||||
|     ''' | ||||
|     Asyncio cancelled translation (non-base) error | ||||
|     for use with the ``to_asyncio`` module | ||||
|     to be raised in the ``trio`` side task | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
| class MessagingError(Exception): | ||||
|     ''' | ||||
|     IPC related msg (typing), transaction (ordering) or dialog | ||||
|  | @ -1324,7 +1315,9 @@ def _mk_recv_mte( | |||
|         any_pld: Any = msgpack.decode(msg.pld) | ||||
|         message: str = ( | ||||
|             f'invalid `{msg_type.__qualname__}` msg payload\n\n' | ||||
|             f'value: `{any_pld!r}` does not match type-spec: ' | ||||
|             f'{any_pld!r}\n\n' | ||||
|             f'has type {type(any_pld)!r}\n\n' | ||||
|             f'and does not match type-spec ' | ||||
|             f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' | ||||
|         ) | ||||
|         bad_msg = msg | ||||
|  |  | |||
|  | @ -40,6 +40,7 @@ from typing import ( | |||
|     TypeVar, | ||||
| ) | ||||
| 
 | ||||
| # import pdbp | ||||
| import msgspec | ||||
| from tricycle import BufferedReceiveStream | ||||
| import trio | ||||
|  | @ -290,12 +291,14 @@ class MsgpackTCPStream(MsgTransport): | |||
|                 else: | ||||
|                     raise | ||||
| 
 | ||||
|     # @pdbp.hideframe | ||||
|     async def send( | ||||
|         self, | ||||
|         msg: msgtypes.MsgType, | ||||
| 
 | ||||
|         strict_types: bool = True, | ||||
|         # hide_tb: bool = False, | ||||
|         hide_tb: bool = False, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Send a msgpack encoded py-object-blob-as-msg over TCP. | ||||
|  | @ -304,7 +307,10 @@ class MsgpackTCPStream(MsgTransport): | |||
|         invalid msg type | ||||
| 
 | ||||
|         ''' | ||||
|         # __tracebackhide__: bool = hide_tb | ||||
|         __tracebackhide__: bool = hide_tb | ||||
|         # try: | ||||
|         # XXX see `trio._sync.AsyncContextManagerMixin` for details | ||||
|         # on the `.acquire()`/`.release()` sequencing.. | ||||
|         async with self._send_lock: | ||||
| 
 | ||||
|             # NOTE: lookup the `trio.Task.context`'s var for | ||||
|  | @ -352,6 +358,14 @@ class MsgpackTCPStream(MsgTransport): | |||
|             size: bytes = struct.pack("<I", len(bytes_data)) | ||||
|             return await self.stream.send_all(size + bytes_data) | ||||
| 
 | ||||
|         # TODO: does it help ever to dynamically show this | ||||
|         # frame? | ||||
|         # except BaseException as _err: | ||||
|         #     err = _err | ||||
|         #     if not isinstance(err, MsgTypeError): | ||||
|         #         __tracebackhide__: bool = False | ||||
|         #     raise | ||||
| 
 | ||||
|     @property | ||||
|     def laddr(self) -> tuple[str, int]: | ||||
|         return self._laddr | ||||
|  | @ -560,27 +574,40 @@ class Channel: | |||
|         ) | ||||
|         return transport | ||||
| 
 | ||||
|     # TODO: something like, | ||||
|     # `pdbp.hideframe_on(errors=[MsgTypeError])` | ||||
|     # @pdbp.hideframe | ||||
|     async def send( | ||||
|         self, | ||||
|         payload: Any, | ||||
| 
 | ||||
|         # hide_tb: bool = False, | ||||
|         hide_tb: bool = False, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Send a coded msg-blob over the transport. | ||||
| 
 | ||||
|         ''' | ||||
|         # __tracebackhide__: bool = hide_tb | ||||
|         __tracebackhide__: bool = hide_tb | ||||
|         try: | ||||
|             log.transport( | ||||
|                 '=> send IPC msg:\n\n' | ||||
|                 f'{pformat(payload)}\n' | ||||
|         )  # type: ignore | ||||
|         assert self._transport | ||||
|             ) | ||||
|             # assert self._transport  # but why typing? | ||||
|             await self._transport.send( | ||||
|                 payload, | ||||
|             # hide_tb=hide_tb, | ||||
|                 hide_tb=hide_tb, | ||||
|             ) | ||||
|         except BaseException as _err: | ||||
|             err = _err  # bind for introspection | ||||
|             if not isinstance(_err, MsgTypeError): | ||||
|                 # assert err | ||||
|                 __tracebackhide__: bool = False | ||||
|             else: | ||||
|                 assert err.cid | ||||
| 
 | ||||
|             raise | ||||
| 
 | ||||
|     async def recv(self) -> Any: | ||||
|         assert self._transport | ||||
|  |  | |||
|  | @ -121,7 +121,8 @@ class Portal: | |||
|         ) | ||||
|         return self.chan | ||||
| 
 | ||||
|     # TODO: factor this out into an `ActorNursery` wrapper | ||||
|     # TODO: factor this out into a `.highlevel` API-wrapper that uses | ||||
|     # a single `.open_context()` call underneath. | ||||
|     async def _submit_for_result( | ||||
|         self, | ||||
|         ns: str, | ||||
|  | @ -141,13 +142,22 @@ class Portal: | |||
|             portal=self, | ||||
|         ) | ||||
| 
 | ||||
|     # TODO: we should deprecate this API right? since if we remove | ||||
|     # `.run_in_actor()` (and instead move it to a `.highlevel` | ||||
|     # wrapper api (around a single `.open_context()` call) we don't | ||||
|     # really have any notion of a "main" remote task any more? | ||||
|     # | ||||
|     # @api_frame | ||||
|     async def result(self) -> Any: | ||||
|     async def wait_for_result( | ||||
|         self, | ||||
|         hide_tb: bool = True, | ||||
|     ) -> Any: | ||||
|         ''' | ||||
|         Return the result(s) from the remote actor's "main" task. | ||||
|         Return the final result delivered by a `Return`-msg from the | ||||
|         remote peer actor's "main" task's `return` statement. | ||||
| 
 | ||||
|         ''' | ||||
|         __tracebackhide__ = True | ||||
|         __tracebackhide__: bool = hide_tb | ||||
|         # Check for non-rpc errors slapped on the | ||||
|         # channel for which we always raise | ||||
|         exc = self.channel._exc | ||||
|  | @ -182,6 +192,23 @@ class Portal: | |||
| 
 | ||||
|         return self._final_result_pld | ||||
| 
 | ||||
|     # TODO: factor this out into a `.highlevel` API-wrapper that uses | ||||
|     # a single `.open_context()` call underneath. | ||||
|     async def result( | ||||
|         self, | ||||
|         *args, | ||||
|         **kwargs, | ||||
|     ) -> Any|Exception: | ||||
|         typname: str = type(self).__name__ | ||||
|         log.warning( | ||||
|             f'`{typname}.result()` is DEPRECATED!\n' | ||||
|             'Use `{typname.wait_for_result()` instead!\n' | ||||
|         ) | ||||
|         return await self.wait_for_result( | ||||
|             *args, | ||||
|             **kwargs, | ||||
|         ) | ||||
| 
 | ||||
|     async def _cancel_streams(self): | ||||
|         # terminate all locally running async generator | ||||
|         # IPC calls | ||||
|  | @ -240,6 +267,7 @@ class Portal: | |||
|             f'{reminfo}' | ||||
|         ) | ||||
| 
 | ||||
|         # XXX the one spot we set it? | ||||
|         self.channel._cancel_called: bool = True | ||||
|         try: | ||||
|             # send cancel cmd - might not get response | ||||
|  | @ -279,6 +307,8 @@ class Portal: | |||
|             ) | ||||
|             return False | ||||
| 
 | ||||
|     # TODO: do we still need this for low level `Actor`-runtime | ||||
|     # method calls or can we also remove it? | ||||
|     async def run_from_ns( | ||||
|         self, | ||||
|         namespace_path: str, | ||||
|  | @ -316,6 +346,8 @@ class Portal: | |||
|             expect_msg=Return, | ||||
|         ) | ||||
| 
 | ||||
|     # TODO: factor this out into a `.highlevel` API-wrapper that uses | ||||
|     # a single `.open_context()` call underneath. | ||||
|     async def run( | ||||
|         self, | ||||
|         func: str, | ||||
|  | @ -370,6 +402,8 @@ class Portal: | |||
|             expect_msg=Return, | ||||
|         ) | ||||
| 
 | ||||
|     # TODO: factor this out into a `.highlevel` API-wrapper that uses | ||||
|     # a single `.open_context()` call underneath. | ||||
|     @acm | ||||
|     async def open_stream_from( | ||||
|         self, | ||||
|  |  | |||
|  | @ -21,6 +21,7 @@ Root actor runtime ignition(s). | |||
| from contextlib import asynccontextmanager as acm | ||||
| from functools import partial | ||||
| import importlib | ||||
| import inspect | ||||
| import logging | ||||
| import os | ||||
| import signal | ||||
|  | @ -115,10 +116,16 @@ async def open_root_actor( | |||
|     if ( | ||||
|         debug_mode | ||||
|         and maybe_enable_greenback | ||||
|         and await _debug.maybe_init_greenback( | ||||
|         and ( | ||||
|             maybe_mod := await _debug.maybe_init_greenback( | ||||
|                 raise_not_found=False, | ||||
|             ) | ||||
|         ) | ||||
|     ): | ||||
|         logger.info( | ||||
|             f'Found `greenback` installed @ {maybe_mod}\n' | ||||
|             'Enabling `tractor.pause_from_sync()` support!\n' | ||||
|         ) | ||||
|         os.environ['PYTHONBREAKPOINT'] = ( | ||||
|             'tractor.devx._debug._sync_pause_from_builtin' | ||||
|         ) | ||||
|  | @ -264,7 +271,10 @@ async def open_root_actor( | |||
| 
 | ||||
|         except OSError: | ||||
|             # TODO: make this a "discovery" log level? | ||||
|             logger.warning(f'No actor registry found @ {addr}') | ||||
|             logger.info( | ||||
|                 f'No actor registry found @ {addr}\n' | ||||
|                 # 'Registry will be initialized in local actor..' | ||||
|             ) | ||||
| 
 | ||||
|     async with trio.open_nursery() as tn: | ||||
|         for addr in registry_addrs: | ||||
|  | @ -365,23 +375,25 @@ async def open_root_actor( | |||
|             ) | ||||
|             try: | ||||
|                 yield actor | ||||
| 
 | ||||
|             except ( | ||||
|                 Exception, | ||||
|                 BaseExceptionGroup, | ||||
|             ) as err: | ||||
| 
 | ||||
|                 import inspect | ||||
|                 # XXX NOTE XXX see equiv note inside | ||||
|                 # `._runtime.Actor._stream_handler()` where in the | ||||
|                 # non-root or root-that-opened-this-mahually case we | ||||
|                 # wait for the local actor-nursery to exit before | ||||
|                 # exiting the transport channel handler. | ||||
|                 entered: bool = await _debug._maybe_enter_pm( | ||||
|                     err, | ||||
|                     api_frame=inspect.currentframe(), | ||||
|                 ) | ||||
| 
 | ||||
|                 if ( | ||||
|                     not entered | ||||
|                     and not is_multi_cancelled(err) | ||||
|                     and | ||||
|                     not is_multi_cancelled(err) | ||||
|                 ): | ||||
|                     logger.exception('Root actor crashed:\n') | ||||
|                     logger.exception('Root actor crashed\n') | ||||
| 
 | ||||
|                 # ALWAYS re-raise any error bubbled up from the | ||||
|                 # runtime! | ||||
|  |  | |||
|  | @ -89,6 +89,15 @@ if TYPE_CHECKING: | |||
| log = get_logger('tractor') | ||||
| 
 | ||||
| 
 | ||||
| # TODO: move to a `tractor.lowlevel.rpc` with the below | ||||
| # func-type-cases implemented "on top of" `@context` defs. | ||||
| # -[ ] std async func | ||||
| # -[ ] `Portal.open_stream_from()` with async-gens? | ||||
| #  |_ possibly a duplex form of this with a  | ||||
| #    `sent_from_peer = yield send_to_peer` form, which would require | ||||
| #    syncing the send/recv side with possibly `.receive_nowait()` | ||||
| #    on each `yield`? | ||||
| # -[ ]  | ||||
| async def _invoke_non_context( | ||||
|     actor: Actor, | ||||
|     cancel_scope: CancelScope, | ||||
|  | @ -108,6 +117,7 @@ async def _invoke_non_context( | |||
|     ] = trio.TASK_STATUS_IGNORED, | ||||
| ): | ||||
|     __tracebackhide__: bool = True | ||||
|     cs: CancelScope|None = None  # ref when activated | ||||
| 
 | ||||
|     # TODO: can we unify this with the `context=True` impl below? | ||||
|     if inspect.isasyncgen(coro): | ||||
|  | @ -160,10 +170,6 @@ async def _invoke_non_context( | |||
|                 functype='asyncgen', | ||||
|             ) | ||||
|         ) | ||||
|         # XXX: the async-func may spawn further tasks which push | ||||
|         # back values like an async-generator would but must | ||||
|         # manualy construct the response dict-packet-responses as | ||||
|         # above | ||||
|         with cancel_scope as cs: | ||||
|             ctx._scope = cs | ||||
|             task_status.started(ctx) | ||||
|  | @ -175,15 +181,13 @@ async def _invoke_non_context( | |||
|             await chan.send( | ||||
|                 Stop(cid=cid) | ||||
|             ) | ||||
|     else: | ||||
|         # regular async function/method | ||||
|         # XXX: possibly just a scheduled `Actor._cancel_task()` | ||||
|         # from a remote request to cancel some `Context`. | ||||
| 
 | ||||
|     # simplest function/method request-response pattern | ||||
|     # XXX: in the most minimally used case, just a scheduled internal runtime | ||||
|     # call to `Actor._cancel_task()` from the ctx-peer task since we | ||||
|     # don't (yet) have a dedicated IPC msg. | ||||
|     # ------ - ------ | ||||
|         # TODO: ideally we unify this with the above `context=True` | ||||
|         # block such that for any remote invocation ftype, we | ||||
|         # always invoke the far end RPC task scheduling the same | ||||
|         # way: using the linked IPC context machinery. | ||||
|     else: | ||||
|         failed_resp: bool = False | ||||
|         try: | ||||
|             ack = StartAck( | ||||
|  | @ -354,8 +358,14 @@ async def _errors_relayed_via_ipc( | |||
|             # channel. | ||||
|             task_status.started(err) | ||||
| 
 | ||||
|         # always reraise KBIs so they propagate at the sys-process level. | ||||
|         if isinstance(err, KeyboardInterrupt): | ||||
|         # always reraise KBIs so they propagate at the sys-process | ||||
|         # level. | ||||
|         # XXX LOL, except when running in asyncio mode XD | ||||
|         # cmon guys, wtf.. | ||||
|         if ( | ||||
|             isinstance(err, KeyboardInterrupt) | ||||
|             # and not actor.is_infected_aio() | ||||
|         ): | ||||
|             raise | ||||
| 
 | ||||
|     # RPC task bookeeping. | ||||
|  | @ -458,7 +468,6 @@ async def _invoke( | |||
|     # tb: TracebackType = None | ||||
| 
 | ||||
|     cancel_scope = CancelScope() | ||||
|     cs: CancelScope|None = None  # ref when activated | ||||
|     ctx = actor.get_context( | ||||
|         chan=chan, | ||||
|         cid=cid, | ||||
|  | @ -607,6 +616,8 @@ async def _invoke( | |||
|         #     `@context` marked RPC function. | ||||
|         # - `._portal` is never set. | ||||
|         try: | ||||
|             tn: trio.Nursery | ||||
|             rpc_ctx_cs: CancelScope | ||||
|             async with ( | ||||
|                 trio.open_nursery() as tn, | ||||
|                 msgops.maybe_limit_plds( | ||||
|  | @ -616,7 +627,7 @@ async def _invoke( | |||
|                 ), | ||||
|             ): | ||||
|                 ctx._scope_nursery = tn | ||||
|                 ctx._scope = tn.cancel_scope | ||||
|                 rpc_ctx_cs = ctx._scope = tn.cancel_scope | ||||
|                 task_status.started(ctx) | ||||
| 
 | ||||
|                 # TODO: better `trionics` tooling: | ||||
|  | @ -642,7 +653,7 @@ async def _invoke( | |||
|             #   itself calls `ctx._maybe_cancel_and_set_remote_error()` | ||||
|             #   which cancels the scope presuming the input error | ||||
|             #   is not a `.cancel_acked` pleaser. | ||||
|             if ctx._scope.cancelled_caught: | ||||
|             if rpc_ctx_cs.cancelled_caught: | ||||
|                 our_uid: tuple = actor.uid | ||||
| 
 | ||||
|                 # first check for and raise any remote error | ||||
|  | @ -652,9 +663,7 @@ async def _invoke( | |||
|                 if re := ctx._remote_error: | ||||
|                     ctx._maybe_raise_remote_err(re) | ||||
| 
 | ||||
|                 cs: CancelScope = ctx._scope | ||||
| 
 | ||||
|                 if cs.cancel_called: | ||||
|                 if rpc_ctx_cs.cancel_called: | ||||
|                     canceller: tuple = ctx.canceller | ||||
|                     explain: str = f'{ctx.side!r}-side task was cancelled by ' | ||||
| 
 | ||||
|  | @ -680,9 +689,15 @@ async def _invoke( | |||
|                     elif canceller == ctx.chan.uid: | ||||
|                         explain += f'its {ctx.peer_side!r}-side peer' | ||||
| 
 | ||||
|                     else: | ||||
|                     elif canceller == our_uid: | ||||
|                         explain += 'itself' | ||||
| 
 | ||||
|                     elif canceller: | ||||
|                         explain += 'a remote peer' | ||||
| 
 | ||||
|                     else: | ||||
|                         explain += 'an unknown cause?' | ||||
| 
 | ||||
|                     explain += ( | ||||
|                         add_div(message=explain) | ||||
|                         + | ||||
|  | @ -1238,7 +1253,7 @@ async def process_messages( | |||
|                 'Exiting IPC msg loop with final msg\n\n' | ||||
|                 f'<= peer: {chan.uid}\n' | ||||
|                 f'  |_{chan}\n\n' | ||||
|                 f'{pretty_struct.pformat(msg)}' | ||||
|                 # f'{pretty_struct.pformat(msg)}' | ||||
|             ) | ||||
| 
 | ||||
|         log.runtime(message) | ||||
|  |  | |||
|  | @ -1046,6 +1046,10 @@ class Actor: | |||
|                 # TODO: another `Struct` for rtvs.. | ||||
|                 rvs: dict[str, Any] = spawnspec._runtime_vars | ||||
|                 if rvs['_debug_mode']: | ||||
|                     from .devx import ( | ||||
|                         enable_stack_on_sig, | ||||
|                         maybe_init_greenback, | ||||
|                     ) | ||||
|                     try: | ||||
|                         # TODO: maybe return some status msgs upward | ||||
|                         # to that we can emit them in `con_status` | ||||
|  | @ -1053,13 +1057,27 @@ class Actor: | |||
|                         log.devx( | ||||
|                             'Enabling `stackscope` traces on SIGUSR1' | ||||
|                         ) | ||||
|                         from .devx import enable_stack_on_sig | ||||
|                         enable_stack_on_sig() | ||||
| 
 | ||||
|                     except ImportError: | ||||
|                         log.warning( | ||||
|                             '`stackscope` not installed for use in debug mode!' | ||||
|                         ) | ||||
| 
 | ||||
|                     if rvs.get('use_greenback', False): | ||||
|                         maybe_mod: ModuleType|None = await maybe_init_greenback() | ||||
|                         if maybe_mod: | ||||
|                             log.devx( | ||||
|                                 'Activated `greenback` ' | ||||
|                                 'for `tractor.pause_from_sync()` support!' | ||||
|                             ) | ||||
|                         else: | ||||
|                             rvs['use_greenback'] = False | ||||
|                             log.warning( | ||||
|                                 '`greenback` not installed for use in debug mode!\n' | ||||
|                                 '`tractor.pause_from_sync()` not available!' | ||||
|                             ) | ||||
| 
 | ||||
|                 rvs['_is_root'] = False | ||||
|                 _state._runtime_vars.update(rvs) | ||||
| 
 | ||||
|  | @ -1717,8 +1735,8 @@ async def async_main( | |||
| 
 | ||||
|                 # Register with the arbiter if we're told its addr | ||||
|                 log.runtime( | ||||
|                     f'Registering `{actor.name}` ->\n' | ||||
|                     f'{pformat(accept_addrs)}' | ||||
|                     f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' | ||||
|                     # ^-TODO-^ we should instead show the maddr here^^ | ||||
|                 ) | ||||
| 
 | ||||
|                 # TODO: ideally we don't fan out to all registrars | ||||
|  | @ -1776,9 +1794,15 @@ async def async_main( | |||
| 
 | ||||
|         # Blocks here as expected until the root nursery is | ||||
|         # killed (i.e. this actor is cancelled or signalled by the parent) | ||||
|     except Exception as err: | ||||
|         log.runtime("Closing all actor lifetime contexts") | ||||
|         actor.lifetime_stack.close() | ||||
|     except Exception as internal_err: | ||||
|         # ls: ExitStack = actor.lifetime_stack | ||||
|         # log.cancel( | ||||
|         #     'Closing all actor-lifetime exec scopes\n\n' | ||||
|         #     f'|_{ls}\n' | ||||
|         # ) | ||||
|         # # _debug.pause_from_sync() | ||||
|         # # await _debug.pause(shield=True) | ||||
|         # ls.close() | ||||
| 
 | ||||
|         if not is_registered: | ||||
|             # TODO: I guess we could try to connect back | ||||
|  | @ -1786,7 +1810,8 @@ async def async_main( | |||
|             # once we have that all working with std streams locking? | ||||
|             log.exception( | ||||
|                 f"Actor errored and failed to register with arbiter " | ||||
|                 f"@ {actor.reg_addrs[0]}?") | ||||
|                 f"@ {actor.reg_addrs[0]}?" | ||||
|             ) | ||||
|             log.error( | ||||
|                 "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" | ||||
|                 "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" | ||||
|  | @ -1799,25 +1824,44 @@ async def async_main( | |||
|         if actor._parent_chan: | ||||
|             await try_ship_error_to_remote( | ||||
|                 actor._parent_chan, | ||||
|                 err, | ||||
|                 internal_err, | ||||
|             ) | ||||
| 
 | ||||
|         # always! | ||||
|         match err: | ||||
|         match internal_err: | ||||
|             case ContextCancelled(): | ||||
|                 log.cancel( | ||||
|                     f'Actor: {actor.uid} was task-context-cancelled with,\n' | ||||
|                     f'str(err)' | ||||
|                     f'str(internal_err)' | ||||
|                 ) | ||||
|             case _: | ||||
|                 log.exception("Actor errored:") | ||||
|         raise | ||||
| 
 | ||||
|     finally: | ||||
|         log.runtime( | ||||
|         teardown_msg: str = ( | ||||
|             'Runtime nursery complete' | ||||
|             '-> Closing all actor lifetime contexts..' | ||||
|         ) | ||||
| 
 | ||||
|         ls: ExitStack = actor.lifetime_stack | ||||
|         cbs: list[Callable] = [ | ||||
|             repr(tup[1].__wrapped__) | ||||
|             for tup in ls._exit_callbacks | ||||
|         ] | ||||
|         if cbs: | ||||
|             cbs_str: str = '\n'.join(cbs) | ||||
|             teardown_msg += ( | ||||
|                 '-> Closing all actor-lifetime callbacks\n\n' | ||||
|                 f'|_{cbs_str}\n' | ||||
|             ) | ||||
|             # XXX NOTE XXX this will cause an error which | ||||
|             # prevents any `infected_aio` actor from continuing | ||||
|             # and any callbacks in the `ls` here WILL NOT be | ||||
|             # called!! | ||||
|             # await _debug.pause(shield=True) | ||||
| 
 | ||||
|         ls.close() | ||||
| 
 | ||||
|         # tear down all lifetime contexts if not in guest mode | ||||
|         # XXX: should this just be in the entrypoint? | ||||
|         actor.lifetime_stack.close() | ||||
|  | @ -1856,23 +1900,28 @@ async def async_main( | |||
|                     failed = True | ||||
| 
 | ||||
|                 if failed: | ||||
|                     log.warning( | ||||
|                         f'Failed to unregister {actor.name} from ' | ||||
|                         f'registar @ {addr}' | ||||
|                     teardown_msg += ( | ||||
|                         f'-> Failed to unregister {actor.name} from ' | ||||
|                         f'registar @ {addr}\n' | ||||
|                     ) | ||||
|                     # log.warning( | ||||
| 
 | ||||
|         # Ensure all peers (actors connected to us as clients) are finished | ||||
|         if not actor._no_more_peers.is_set(): | ||||
|             if any( | ||||
|                 chan.connected() for chan in chain(*actor._peers.values()) | ||||
|             ): | ||||
|                 log.runtime( | ||||
|                     f"Waiting for remaining peers {actor._peers} to clear") | ||||
|                 teardown_msg += ( | ||||
|                     f'-> Waiting for remaining peers {actor._peers} to clear..\n' | ||||
|                 ) | ||||
|                 log.runtime(teardown_msg) | ||||
|                 with CancelScope(shield=True): | ||||
|                     await actor._no_more_peers.wait() | ||||
|         log.runtime("All peer channels are complete") | ||||
| 
 | ||||
|     log.runtime("Runtime completed") | ||||
|         teardown_msg += ('-> All peer channels are complete\n') | ||||
| 
 | ||||
|     teardown_msg += ('Actor runtime completed') | ||||
|     log.info(teardown_msg) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: rename to `Registry` and move to `._discovery`! | ||||
|  |  | |||
|  | @ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = { | |||
|     '_root_mailbox': (None, None), | ||||
|     '_registry_addrs': [], | ||||
| 
 | ||||
|     # for `breakpoint()` support | ||||
|     # for `tractor.pause_from_sync()` & `breakpoint()` support | ||||
|     'use_greenback': False, | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel): | |||
|     @property | ||||
|     def ctx(self) -> Context: | ||||
|         ''' | ||||
|         This stream's IPC `Context` ref. | ||||
|         A read-only ref to this stream's inter-actor-task `Context`. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._ctx | ||||
|  |  | |||
|  | @ -80,6 +80,7 @@ class ActorNursery: | |||
|     ''' | ||||
|     def __init__( | ||||
|         self, | ||||
|         # TODO: maybe def these as fields of a struct looking type? | ||||
|         actor: Actor, | ||||
|         ria_nursery: trio.Nursery, | ||||
|         da_nursery: trio.Nursery, | ||||
|  | @ -88,8 +89,10 @@ class ActorNursery: | |||
|     ) -> None: | ||||
|         # self.supervisor = supervisor  # TODO | ||||
|         self._actor: Actor = actor | ||||
|         self._ria_nursery = ria_nursery | ||||
| 
 | ||||
|         # TODO: rename to `._tn` for our conventional "task-nursery" | ||||
|         self._da_nursery = da_nursery | ||||
| 
 | ||||
|         self._children: dict[ | ||||
|             tuple[str, str], | ||||
|             tuple[ | ||||
|  | @ -98,15 +101,13 @@ class ActorNursery: | |||
|                 Portal | None, | ||||
|             ] | ||||
|         ] = {} | ||||
|         # portals spawned with ``run_in_actor()`` are | ||||
|         # cancelled when their "main" result arrives | ||||
|         self._cancel_after_result_on_exit: set = set() | ||||
| 
 | ||||
|         self.cancelled: bool = False | ||||
|         self._join_procs = trio.Event() | ||||
|         self._at_least_one_child_in_debug: bool = False | ||||
|         self.errors = errors | ||||
|         self.exited = trio.Event() | ||||
|         self._scope_error: BaseException|None = None | ||||
|         self.exited = trio.Event() | ||||
| 
 | ||||
|         # NOTE: when no explicit call is made to | ||||
|         # `.open_root_actor()` by application code, | ||||
|  | @ -116,6 +117,13 @@ class ActorNursery: | |||
|         # and syncing purposes to any actor opened nurseries. | ||||
|         self._implicit_runtime_started: bool = False | ||||
| 
 | ||||
|         # TODO: remove the `.run_in_actor()` API and thus this 2ndary | ||||
|         # nursery when that API get's moved outside this primitive! | ||||
|         self._ria_nursery = ria_nursery | ||||
|         # portals spawned with ``run_in_actor()`` are | ||||
|         # cancelled when their "main" result arrives | ||||
|         self._cancel_after_result_on_exit: set = set() | ||||
| 
 | ||||
|     async def start_actor( | ||||
|         self, | ||||
|         name: str, | ||||
|  | @ -126,10 +134,14 @@ class ActorNursery: | |||
|         rpc_module_paths: list[str]|None = None, | ||||
|         enable_modules: list[str]|None = None, | ||||
|         loglevel: str|None = None,  # set log level per subactor | ||||
|         nursery: trio.Nursery|None = None, | ||||
|         debug_mode: bool|None = None, | ||||
|         infect_asyncio: bool = False, | ||||
| 
 | ||||
|         # TODO: ideally we can rm this once we no longer have | ||||
|         # a `._ria_nursery` since the dependent APIs have been | ||||
|         # removed! | ||||
|         nursery: trio.Nursery|None = None, | ||||
| 
 | ||||
|     ) -> Portal: | ||||
|         ''' | ||||
|         Start a (daemon) actor: an process that has no designated | ||||
|  | @ -200,6 +212,7 @@ class ActorNursery: | |||
|     #  |_ dynamic @context decoration on child side | ||||
|     #  |_ implicit `Portal.open_context() as (ctx, first):` | ||||
|     #    and `return first` on parent side. | ||||
|     #  |_ mention how it's similar to `trio-parallel` API? | ||||
|     # -[ ] use @api_frame on the wrapper | ||||
|     async def run_in_actor( | ||||
|         self, | ||||
|  | @ -269,11 +282,14 @@ class ActorNursery: | |||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Cancel this nursery by instructing each subactor to cancel | ||||
|         itself and wait for all subactors to terminate. | ||||
|         Cancel this actor-nursery by instructing each subactor's | ||||
|         runtime to cancel and wait for all underlying sub-processes | ||||
|         to terminate. | ||||
| 
 | ||||
|         If ``hard_killl`` is set to ``True`` then kill the processes | ||||
|         directly without any far end graceful ``trio`` cancellation. | ||||
|         If `hard_kill` is set then kill the processes directly using | ||||
|         the spawning-backend's API/OS-machinery without any attempt | ||||
|         at (graceful) `trio`-style cancellation using our | ||||
|         `Actor.cancel()`. | ||||
| 
 | ||||
|         ''' | ||||
|         __runtimeframe__: int = 1  # noqa | ||||
|  | @ -629,8 +645,12 @@ async def open_nursery( | |||
|             f'|_{an}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # shutdown runtime if it was started | ||||
|         if implicit_runtime: | ||||
|             # shutdown runtime if it was started and report noisly | ||||
|             # that we're did so. | ||||
|             msg += '=> Shutting down actor runtime <=\n' | ||||
| 
 | ||||
|             log.info(msg) | ||||
| 
 | ||||
|         else: | ||||
|             # keep noise low during std operation. | ||||
|             log.runtime(msg) | ||||
|  |  | |||
|  | @ -29,6 +29,7 @@ from ._debug import ( | |||
|     shield_sigint_handler as shield_sigint_handler, | ||||
|     open_crash_handler as open_crash_handler, | ||||
|     maybe_open_crash_handler as maybe_open_crash_handler, | ||||
|     maybe_init_greenback as maybe_init_greenback, | ||||
|     post_mortem as post_mortem, | ||||
|     mk_pdb as mk_pdb, | ||||
| ) | ||||
|  |  | |||
|  | @ -69,6 +69,7 @@ from trio import ( | |||
| import tractor | ||||
| from tractor.log import get_logger | ||||
| from tractor._context import Context | ||||
| from tractor import _state | ||||
| from tractor._state import ( | ||||
|     current_actor, | ||||
|     is_root_process, | ||||
|  | @ -87,9 +88,6 @@ if TYPE_CHECKING: | |||
|     from tractor._runtime import ( | ||||
|         Actor, | ||||
|     ) | ||||
|     from tractor.msg import ( | ||||
|         _codec, | ||||
|     ) | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
|  | @ -1599,11 +1597,13 @@ async def _pause( | |||
|     try: | ||||
|         task: Task = current_task() | ||||
|     except RuntimeError as rte: | ||||
|         __tracebackhide__: bool = False | ||||
|         log.exception('Failed to get current task?') | ||||
|         if actor.is_infected_aio(): | ||||
|             # mk_pdb().set_trace() | ||||
|             raise RuntimeError( | ||||
|                 '`tractor.pause[_from_sync]()` not yet supported ' | ||||
|                 'for infected `asyncio` mode!' | ||||
|                 'directly (infected) `asyncio` tasks!' | ||||
|             ) from rte | ||||
| 
 | ||||
|         raise | ||||
|  | @ -2163,10 +2163,8 @@ def maybe_import_greenback( | |||
|         return False | ||||
| 
 | ||||
| 
 | ||||
| async def maybe_init_greenback( | ||||
|     **kwargs, | ||||
| ) -> None|ModuleType: | ||||
| 
 | ||||
| async def maybe_init_greenback(**kwargs) -> None|ModuleType: | ||||
|     try: | ||||
|         if mod := maybe_import_greenback(**kwargs): | ||||
|             await mod.ensure_portal() | ||||
|             log.devx( | ||||
|  | @ -2174,11 +2172,13 @@ async def maybe_init_greenback( | |||
|                 'Sync debug support activated!\n' | ||||
|             ) | ||||
|             return mod | ||||
|     except BaseException: | ||||
|         log.exception('Failed to init `greenback`..') | ||||
|         raise | ||||
| 
 | ||||
|     return None | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| async def _pause_from_bg_root_thread( | ||||
|     behalf_of_thread: Thread, | ||||
|     repl: PdbREPL, | ||||
|  | @ -2399,9 +2399,16 @@ def pause_from_sync( | |||
|         else:  # we are presumably the `trio.run()` + main thread | ||||
|             # raises on not-found by default | ||||
|             greenback: ModuleType = maybe_import_greenback() | ||||
| 
 | ||||
|             # TODO: how to ensure this is either dynamically (if | ||||
|             # needed) called here (in some bg tn??) or that the | ||||
|             # subactor always already called it? | ||||
|             # greenback: ModuleType = await maybe_init_greenback() | ||||
| 
 | ||||
|             message += f'-> imported {greenback}\n' | ||||
|             repl_owner: Task = current_task() | ||||
|             message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' | ||||
|             try: | ||||
|                 out = greenback.await_( | ||||
|                     _pause( | ||||
|                         debug_func=None, | ||||
|  | @ -2411,6 +2418,18 @@ def pause_from_sync( | |||
|                         **_pause_kwargs, | ||||
|                     ) | ||||
|                 ) | ||||
|             except RuntimeError as rte: | ||||
|                 if not _state._runtime_vars.get( | ||||
|                         'use_greenback', | ||||
|                         False, | ||||
|                 ): | ||||
|                     raise RuntimeError( | ||||
|                         '`greenback` was never initialized in this actor!?\n\n' | ||||
|                         f'{_state._runtime_vars}\n' | ||||
|                     ) from rte | ||||
| 
 | ||||
|                 raise | ||||
| 
 | ||||
|             if out: | ||||
|                 bg_task, repl = out | ||||
|                 assert repl is repl | ||||
|  | @ -2801,10 +2820,10 @@ def open_crash_handler( | |||
|       `trio.run()`. | ||||
| 
 | ||||
|     ''' | ||||
|     err: BaseException | ||||
|     try: | ||||
|         yield | ||||
|     except tuple(catch) as err: | ||||
| 
 | ||||
|         if type(err) not in ignore: | ||||
|             pdbp.xpm() | ||||
| 
 | ||||
|  |  | |||
|  | @ -234,7 +234,7 @@ def find_caller_info( | |||
| _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} | ||||
| 
 | ||||
| 
 | ||||
| # TODO: -[x] move all this into new `.devx._code`! | ||||
| # TODO: -[x] move all this into new `.devx._frame_stack`! | ||||
| # -[ ] consider rename to _callstack? | ||||
| # -[ ] prolly create a `@runtime_api` dec? | ||||
| #   |_ @api_frame seems better? | ||||
|  | @ -286,3 +286,18 @@ def api_frame( | |||
|     wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache | ||||
|     wrapped.__api_func__: bool = True | ||||
|     return wrapper(wrapped) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: something like this instead of the adhoc frame-unhiding | ||||
| # blocks all over the runtime!! XD | ||||
| # -[ ] ideally we can expect a certain error (set) and if something | ||||
| #     else is raised then all frames below the wrapped one will be | ||||
| #     un-hidden via `__tracebackhide__: bool = False`. | ||||
| # |_ might need to dynamically mutate the code objs like | ||||
| #    `pdbp.hideframe()` does? | ||||
| # -[ ] use this as a `@acm` decorator as introed in 3.10? | ||||
| # @acm | ||||
| # async def unhide_frame_when_not( | ||||
| #     error_set: set[BaseException], | ||||
| # ) -> TracebackType: | ||||
| #     ... | ||||
|  |  | |||
|  | @ -0,0 +1,134 @@ | |||
| # tractor: structured concurrent "actors". | ||||
| # Copyright 2024-eternity Tyler Goodlet. | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| ''' | ||||
| Daemon subactor as service(s) management and supervision primitives | ||||
| and API. | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextlib import ( | ||||
|     # asynccontextmanager as acm, | ||||
|     contextmanager as cm, | ||||
| ) | ||||
| from collections import defaultdict | ||||
| from typing import ( | ||||
|     Callable, | ||||
|     Any, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| from trio import TaskStatus | ||||
| from tractor import ( | ||||
|     ActorNursery, | ||||
|     current_actor, | ||||
|     ContextCancelled, | ||||
|     Context, | ||||
|     Portal, | ||||
| ) | ||||
| 
 | ||||
| from ._util import ( | ||||
|     log,  # sub-sys logger | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: implement a `@singleton` deco-API for wrapping the below | ||||
| # factory's impl for general actor-singleton use? | ||||
| # | ||||
| # -[ ] go through the options peeps on SO did? | ||||
| #  * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python | ||||
| #  * including @mikenerone's answer | ||||
| #   |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 | ||||
| # | ||||
| # -[ ] put it in `tractor.lowlevel._globals` ? | ||||
| #  * fits with our oustanding actor-local/global feat req? | ||||
| #   |_ https://github.com/goodboy/tractor/issues/55 | ||||
| #  * how can it relate to the `Actor.lifetime_stack` that was | ||||
| #    silently patched in? | ||||
| #   |_ we could implicitly call both of these in the same | ||||
| #     spot in the runtime using the lifetime stack? | ||||
| #    - `open_singleton_cm().__exit__()` | ||||
| #    -`del_singleton()` | ||||
| #   |_ gives SC fixtue semantics to sync code oriented around | ||||
| #     sub-process lifetime? | ||||
| #  * what about with `trio.RunVar`? | ||||
| #   |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar | ||||
| #    - which we'll need for no-GIL cpython (right?) presuming | ||||
| #      multiple `trio.run()` calls in process? | ||||
| # | ||||
| # | ||||
| # @singleton | ||||
| # async def open_service_mngr( | ||||
| #     **init_kwargs, | ||||
| # ) -> ServiceMngr: | ||||
| #     ''' | ||||
| #     Note this function body is invoke IFF no existing singleton instance already | ||||
| #     exists in this proc's memory. | ||||
| 
 | ||||
| #     ''' | ||||
| #     # setup | ||||
| #     yield ServiceMngr(**init_kwargs) | ||||
| #     # teardown | ||||
| 
 | ||||
| 
 | ||||
| # a deletion API for explicit instance de-allocation? | ||||
| # @open_service_mngr.deleter | ||||
| # def del_service_mngr() -> None: | ||||
| #     mngr = open_service_mngr._singleton[0] | ||||
| #     open_service_mngr._singleton[0] = None | ||||
| #     del mngr | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # TODO: singleton factory API instead of a class API | ||||
| @cm | ||||
| def open_service_mngr( | ||||
|     *, | ||||
|     _singleton: list[ServiceMngr|None] = [None], | ||||
|     # NOTE; since default values for keyword-args are effectively | ||||
|     # module-vars/globals as per the note from, | ||||
|     # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values | ||||
|     # | ||||
|     # > "The default value is evaluated only once. This makes | ||||
|     #   a difference when the default is a mutable object such as | ||||
|     #   a list, dictionary, or instances of most classes" | ||||
|     # | ||||
|     **init_kwargs, | ||||
| 
 | ||||
| ) -> ServiceMngr: | ||||
|     ''' | ||||
|     Open a multi-subactor-as-service-daemon tree supervisor. | ||||
| 
 | ||||
|     The delivered `ServiceMngr` is a singleton instance for each | ||||
|     actor-process and is allocated on first open and never | ||||
|     de-allocated unless explicitly deleted by al call to | ||||
|     `del_service_mngr()`. | ||||
| 
 | ||||
|     ''' | ||||
|     mngr: ServiceMngr|None | ||||
|     if (mngr := _singleton[0]) is None: | ||||
|         log.info('Allocating a new service mngr!') | ||||
|         mngr = _singleton[0] = ServiceMngr(**init_kwargs) | ||||
|     else: | ||||
|         log.info( | ||||
|             'Using extant service mngr!\n\n' | ||||
|             f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state | ||||
|         ) | ||||
| 
 | ||||
|     with mngr: | ||||
|         yield mngr | ||||
| 
 | ||||
| 
 | ||||
|  | @ -54,11 +54,12 @@ LOG_FORMAT = ( | |||
| DATE_FORMAT = '%b %d %H:%M:%S' | ||||
| 
 | ||||
| # FYI, ERROR is 40 | ||||
| # TODO: use a `bidict` to avoid the :155 check? | ||||
| CUSTOM_LEVELS: dict[str, int] = { | ||||
|     'TRANSPORT': 5, | ||||
|     'RUNTIME': 15, | ||||
|     'DEVX': 17, | ||||
|     'CANCEL': 18, | ||||
|     'CANCEL': 22, | ||||
|     'PDB': 500, | ||||
| } | ||||
| STD_PALETTE = { | ||||
|  | @ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter): | |||
|         Delegate a log call to the underlying logger, after adding | ||||
|         contextual information from this adapter instance. | ||||
| 
 | ||||
|         NOTE: all custom level methods (above) delegate to this! | ||||
| 
 | ||||
|         ''' | ||||
|         if self.isEnabledFor(level): | ||||
|             stacklevel: int = 3 | ||||
|  |  | |||
|  | @ -41,8 +41,10 @@ import textwrap | |||
| from typing import ( | ||||
|     Any, | ||||
|     Callable, | ||||
|     Protocol, | ||||
|     Type, | ||||
|     TYPE_CHECKING, | ||||
|     TypeVar, | ||||
|     Union, | ||||
| ) | ||||
| from types import ModuleType | ||||
|  | @ -181,7 +183,11 @@ def mk_dec( | |||
|     dec_hook: Callable|None = None, | ||||
| 
 | ||||
| ) -> MsgDec: | ||||
|     ''' | ||||
|     Create an IPC msg decoder, normally used as the | ||||
|     `PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`. | ||||
| 
 | ||||
|     ''' | ||||
|     return MsgDec( | ||||
|         _dec=msgpack.Decoder( | ||||
|             type=spec,  # like `MsgType[Any]` | ||||
|  | @ -227,6 +233,13 @@ def pformat_msgspec( | |||
|     join_char: str = '\n', | ||||
| 
 | ||||
| ) -> str: | ||||
|     ''' | ||||
|     Pretty `str` format the `msgspec.msgpack.Decoder.type` attributed | ||||
|     for display in log messages as a nice (maybe multiline) | ||||
|     presentation of all the supported `Struct`s availed for typed | ||||
|     decoding. | ||||
| 
 | ||||
|     ''' | ||||
|     dec: msgpack.Decoder = getattr(codec, 'dec', codec) | ||||
|     return join_char.join( | ||||
|         mk_msgspec_table( | ||||
|  | @ -630,31 +643,57 @@ def limit_msg_spec( | |||
| #         # import pdbp; pdbp.set_trace() | ||||
| #         assert ext_codec.pld_spec == extended_spec | ||||
| #         yield ext_codec | ||||
| # | ||||
| # ^-TODO-^ is it impossible to make something like this orr!? | ||||
| 
 | ||||
| # TODO: make an auto-custom hook generator from a set of input custom | ||||
| # types? | ||||
| # -[ ] below is a proto design using a `TypeCodec` idea? | ||||
| # | ||||
| # type var for the expected interchange-lib's | ||||
| # IPC-transport type when not available as a built-in | ||||
| # serialization output. | ||||
| WireT = TypeVar('WireT') | ||||
| 
 | ||||
| 
 | ||||
| # TODO: make something similar to this inside `._codec` such that | ||||
| # user can just pass a type table of some sort? | ||||
| # -[ ] we would need to decode all msgs to `pretty_struct.Struct` | ||||
| #     and then call `.to_dict()` on them? | ||||
| # -[x] we're going to need to re-impl all the stuff changed in the | ||||
| #    runtime port such that it can handle dicts or `Msg`s? | ||||
| # | ||||
| # def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: | ||||
| #     ''' | ||||
| #     Deliver a `enc_hook()`/`dec_hook()` pair which does | ||||
| #     manual convertion from our above native `Msg` set | ||||
| #     to `dict` equivalent (wire msgs) in order to keep legacy compat | ||||
| #     with the original runtime implementation. | ||||
| # | ||||
| #     Note: this is is/was primarly used while moving the core | ||||
| #     runtime over to using native `Msg`-struct types wherein we | ||||
| #     start with the send side emitting without loading | ||||
| #     a typed-decoder and then later flipping the switch over to | ||||
| #     load to the native struct types once all runtime usage has | ||||
| #     been adjusted appropriately. | ||||
| # | ||||
| #     ''' | ||||
| #     return ( | ||||
| #         # enc_to_dict, | ||||
| #         dec_from_dict, | ||||
| #     ) | ||||
| # TODO: some kinda (decorator) API for built-in subtypes | ||||
| # that builds this implicitly by inspecting the `mro()`? | ||||
| class TypeCodec(Protocol): | ||||
|     ''' | ||||
|     A per-custom-type wire-transport serialization translator | ||||
|     description type. | ||||
| 
 | ||||
|     ''' | ||||
|     src_type: Type | ||||
|     wire_type: WireT | ||||
| 
 | ||||
|     def encode(obj: Type) -> WireT: | ||||
|         ... | ||||
| 
 | ||||
|     def decode( | ||||
|         obj_type: Type[WireT], | ||||
|         obj: WireT, | ||||
|     ) -> Type: | ||||
|         ... | ||||
| 
 | ||||
| 
 | ||||
| class MsgpackTypeCodec(TypeCodec): | ||||
|     ... | ||||
| 
 | ||||
| 
 | ||||
| def mk_codec_hooks( | ||||
|     type_codecs: list[TypeCodec], | ||||
| 
 | ||||
| ) -> tuple[Callable, Callable]: | ||||
|     ''' | ||||
|     Deliver a `enc_hook()`/`dec_hook()` pair which handle | ||||
|     manual convertion from an input `Type` set such that whenever | ||||
|     the `TypeCodec.filter()` predicate matches the | ||||
|     `TypeCodec.decode()` is called on the input native object by | ||||
|     the `dec_hook()` and whenever the | ||||
|     `isiinstance(obj, TypeCodec.type)` matches against an | ||||
|     `enc_hook(obj=obj)` the return value is taken from a | ||||
|     `TypeCodec.encode(obj)` callback. | ||||
| 
 | ||||
|     ''' | ||||
|     ... | ||||
|  |  | |||
|  | @ -580,12 +580,15 @@ async def drain_to_final_msg( | |||
|         # 2. WE DID NOT REQUEST that cancel and thus | ||||
|         #    SHOULD RAISE HERE! | ||||
|         except trio.Cancelled as taskc: | ||||
|             # from tractor.devx._debug import pause | ||||
|             # await pause(shield=True) | ||||
| 
 | ||||
|             # CASE 2: mask the local cancelled-error(s) | ||||
|             # only when we are sure the remote error is | ||||
|             # the source cause of this local task's | ||||
|             # cancellation. | ||||
|             ctx.maybe_raise( | ||||
|                 hide_tb=hide_tb, | ||||
|                 # TODO: when use this/ | ||||
|                 # from_src_exc=taskc, | ||||
|             ) | ||||
|  |  | |||
|  | @ -34,6 +34,9 @@ from pprint import ( | |||
|     saferepr, | ||||
| ) | ||||
| 
 | ||||
| from tractor.log import get_logger | ||||
| 
 | ||||
| log = get_logger() | ||||
| # TODO: auto-gen type sig for input func both for | ||||
| # type-msgs and logging of RPC tasks? | ||||
| # taken and modified from: | ||||
|  | @ -143,7 +146,13 @@ def pformat( | |||
| 
 | ||||
|         else:  # the `pprint` recursion-safe format: | ||||
|             # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr | ||||
|             try: | ||||
|                 val_str: str = saferepr(v) | ||||
|             except Exception: | ||||
|                 log.exception( | ||||
|                     'Failed to `saferepr({type(struct)})` !?\n' | ||||
|                 ) | ||||
|             return _Struct.__repr__(struct) | ||||
| 
 | ||||
|         # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! | ||||
|         obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') | ||||
|  | @ -194,12 +203,20 @@ class Struct( | |||
|         return sin_props | ||||
| 
 | ||||
|     pformat = pformat | ||||
|     # __repr__ = pformat | ||||
|     # __str__ = __repr__ = pformat | ||||
|     # TODO: use a pprint.PrettyPrinter instance around ONLY rendering | ||||
|     # inside a known tty? | ||||
|     # def __repr__(self) -> str: | ||||
|     #     ... | ||||
|     __repr__ = pformat | ||||
|     def __repr__(self) -> str: | ||||
|         try: | ||||
|             return pformat(self) | ||||
|         except Exception: | ||||
|             log.exception( | ||||
|                 f'Failed to `pformat({type(self)})` !?\n' | ||||
|             ) | ||||
|             return _Struct.__repr__(self) | ||||
| 
 | ||||
|     def copy( | ||||
|         self, | ||||
|  |  | |||
|  | @ -18,11 +18,13 @@ | |||
| Infection apis for ``asyncio`` loops running ``trio`` using guest mode. | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| import asyncio | ||||
| from asyncio.exceptions import CancelledError | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from dataclasses import dataclass | ||||
| import inspect | ||||
| import traceback | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Callable, | ||||
|  | @ -30,20 +32,21 @@ from typing import ( | |||
|     Awaitable, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| from outcome import Error | ||||
| 
 | ||||
| from tractor.log import get_logger | ||||
| import tractor | ||||
| from tractor._state import ( | ||||
|     current_actor, | ||||
|     debug_mode, | ||||
| ) | ||||
| from tractor.log import get_logger | ||||
| from tractor.devx import _debug | ||||
| from tractor._exceptions import AsyncioCancelled | ||||
| from tractor.trionics._broadcast import ( | ||||
|     broadcast_receiver, | ||||
|     BroadcastReceiver, | ||||
| ) | ||||
| import trio | ||||
| from outcome import ( | ||||
|     Error, | ||||
|     Outcome, | ||||
| ) | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
|  | @ -161,7 +164,7 @@ def _run_asyncio_task( | |||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__ = True | ||||
|     if not current_actor().is_infected_aio(): | ||||
|     if not tractor.current_actor().is_infected_aio(): | ||||
|         raise RuntimeError( | ||||
|             "`infect_asyncio` mode is not enabled!?" | ||||
|         ) | ||||
|  | @ -172,7 +175,6 @@ def _run_asyncio_task( | |||
|     to_trio, from_aio = trio.open_memory_channel(qsize)  # type: ignore | ||||
| 
 | ||||
|     args = tuple(inspect.getfullargspec(func).args) | ||||
| 
 | ||||
|     if getattr(func, '_tractor_steam_function', None): | ||||
|         # the assumption is that the target async routine accepts the | ||||
|         # send channel then it intends to yield more then one return | ||||
|  | @ -346,13 +348,22 @@ def _run_asyncio_task( | |||
|             # on a checkpoint. | ||||
|             cancel_scope.cancel() | ||||
| 
 | ||||
|             # raise any ``asyncio`` side error. | ||||
|             # raise any `asyncio` side error. | ||||
|             raise aio_err | ||||
| 
 | ||||
|     task.add_done_callback(cancel_trio) | ||||
|     return chan | ||||
| 
 | ||||
| 
 | ||||
| class AsyncioCancelled(CancelledError): | ||||
|     ''' | ||||
|     Asyncio cancelled translation (non-base) error | ||||
|     for use with the ``to_asyncio`` module | ||||
|     to be raised in the ``trio`` side task | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def translate_aio_errors( | ||||
| 
 | ||||
|  | @ -516,7 +527,6 @@ async def open_channel_from( | |||
| 
 | ||||
| 
 | ||||
| def run_as_asyncio_guest( | ||||
| 
 | ||||
|     trio_main: Callable, | ||||
| 
 | ||||
| ) -> None: | ||||
|  | @ -548,6 +558,11 @@ def run_as_asyncio_guest( | |||
| 
 | ||||
|         loop = asyncio.get_running_loop() | ||||
|         trio_done_fut = asyncio.Future() | ||||
|         startup_msg: str = ( | ||||
|             'Starting `asyncio` guest-loop-run\n' | ||||
|             '-> got running loop\n' | ||||
|             '-> built a `trio`-done future\n' | ||||
|         ) | ||||
| 
 | ||||
|         if debug_mode(): | ||||
|             # XXX make it obvi we know this isn't supported yet! | ||||
|  | @ -562,34 +577,120 @@ def run_as_asyncio_guest( | |||
|         def trio_done_callback(main_outcome): | ||||
| 
 | ||||
|             if isinstance(main_outcome, Error): | ||||
|                 error = main_outcome.error | ||||
|                 error: BaseException = main_outcome.error | ||||
| 
 | ||||
|                 # show an dedicated `asyncio`-side tb from the error | ||||
|                 tb_str: str = ''.join(traceback.format_exception(error)) | ||||
|                 log.exception( | ||||
|                     'Guest-run errored!?\n\n' | ||||
|                     f'{main_outcome}\n' | ||||
|                     f'{error}\n\n' | ||||
|                     f'{tb_str}\n' | ||||
|                 ) | ||||
|                 trio_done_fut.set_exception(error) | ||||
| 
 | ||||
|                 # TODO: explicit asyncio tb? | ||||
|                 # traceback.print_exception(error) | ||||
| 
 | ||||
|                 # XXX: do we need this? | ||||
|                 # actor.cancel_soon() | ||||
| 
 | ||||
|                 # raise inline | ||||
|                 main_outcome.unwrap() | ||||
| 
 | ||||
|             else: | ||||
|                 trio_done_fut.set_result(main_outcome) | ||||
|                 log.runtime(f"trio_main finished: {main_outcome!r}") | ||||
|                 log.runtime(f'trio_main finished: {main_outcome!r}') | ||||
| 
 | ||||
|         startup_msg += ( | ||||
|             f'-> created {trio_done_callback!r}\n' | ||||
|             f'-> scheduling `trio_main`: {trio_main!r}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # start the infection: run trio on the asyncio loop in "guest mode" | ||||
|         log.runtime( | ||||
|             'Infecting `asyncio`-process with a `trio` guest-run of\n\n' | ||||
|             f'{trio_main!r}\n\n' | ||||
| 
 | ||||
|             f'{trio_done_callback}\n' | ||||
|             f'{startup_msg}\n\n' | ||||
|             + | ||||
|             'Infecting `asyncio`-process with a `trio` guest-run!\n' | ||||
|         ) | ||||
| 
 | ||||
|         trio.lowlevel.start_guest_run( | ||||
|             trio_main, | ||||
|             run_sync_soon_threadsafe=loop.call_soon_threadsafe, | ||||
|             done_callback=trio_done_callback, | ||||
|         ) | ||||
|         # NOTE `.unwrap()` will raise on error | ||||
|         return (await trio_done_fut).unwrap() | ||||
|         try: | ||||
|             # TODO: better SIGINT handling since shielding seems to | ||||
|             # make NO DIFFERENCE XD | ||||
|             # -[ ] maybe this is due to 3.11's recent SIGINT handling | ||||
|             #  changes and we can better work with/around it? | ||||
|             # https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption | ||||
|             out: Outcome = await asyncio.shield(trio_done_fut) | ||||
|             # NOTE `Error.unwrap()` will raise | ||||
|             return out.unwrap() | ||||
| 
 | ||||
|         except asyncio.CancelledError: | ||||
|             actor: tractor.Actor = tractor.current_actor() | ||||
|             log.exception( | ||||
|                 '`asyncio`-side main task was cancelled!\n' | ||||
|                 'Cancelling actor-runtime..\n' | ||||
|                 f'c)>\n' | ||||
|                 f'  |_{actor}.cancel_soon()\n' | ||||
| 
 | ||||
|             ) | ||||
| 
 | ||||
|             # XXX NOTE XXX the next LOC is super important!!! | ||||
|             #  => without it, we can get a guest-run abandonment case | ||||
|             #  where asyncio will not trigger `trio` in a final event | ||||
|             #  loop cycle! | ||||
|             # | ||||
|             #  our test, | ||||
|             #  `test_infected_asyncio.test_sigint_closes_lifetime_stack()` | ||||
|             #  demonstrates how if when we raise a SIGINT-signal in an infected | ||||
|             #  child we get a variable race condition outcome where | ||||
|             #  either of the following can indeterminately happen, | ||||
|             # | ||||
|             # - "silent-abandon": `asyncio` abandons the `trio` | ||||
|             #   guest-run task silently and no `trio`-guest-run or | ||||
|             #   `tractor`-actor-runtime teardown happens whatsoever.. | ||||
|             #   this is the WORST (race) case outcome. | ||||
|             # | ||||
|             # - OR, "loud-abandon": the guest run get's abaondoned "loudly" with | ||||
|             #   `trio` reporting a console traceback and further tbs of all | ||||
|             #   the failed shutdown routines also show on console.. | ||||
|             # | ||||
|             # our test can thus fail and (has been parametrized for) | ||||
|             # the 2 cases: | ||||
|             # | ||||
|             # - when the parent raises a KBI just after | ||||
|             #   signalling the child, | ||||
|             #  |_silent-abandon => the `Actor.lifetime_stack` will | ||||
|             #    never be closed thus leaking a resource! | ||||
|             #   -> FAIL! | ||||
|             #  |_loud-abandon => despite the abandonment at least the | ||||
|             #    stack will be closed out.. | ||||
|             #   -> PASS | ||||
|             # | ||||
|             # - when the parent instead simply waits on `ctx.wait_for_result()` | ||||
|             #   (i.e. DOES not raise a KBI itself), | ||||
|             #  |_silent-abandon => test will just hang and thus the ctx | ||||
|             #    and actor will never be closed/cancelled/shutdown | ||||
|             #    resulting in leaking a (file) resource since the | ||||
|             #    `trio`/`tractor` runtime never relays a ctxc back to | ||||
|             #    the parent; the test's timeout will trigger.. | ||||
|             #   -> FAIL! | ||||
|             #  |_loud-abandon => this case seems to never happen?? | ||||
|             # | ||||
|             # XXX FIRST PART XXX, SO, this is a fix to the | ||||
|             # "silent-abandon" case, NOT the `trio`-guest-run | ||||
|             # abandonment issue in general, for which the NEXT LOC | ||||
|             # is apparently a working fix! | ||||
|             actor.cancel_soon() | ||||
| 
 | ||||
|             # XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to | ||||
|             # `trio`-guest-run to complete and teardown !! | ||||
|             # | ||||
|             # XXX WITHOUT THIS the guest-run gets race-conditionally | ||||
|             # abandoned by `asyncio`!! | ||||
|             # XD XD XD | ||||
|             await asyncio.shield( | ||||
|                 asyncio.sleep(.1)  # NOPE! it can't be 0 either XD | ||||
|             ) | ||||
|             raise | ||||
| 
 | ||||
|     # might as well if it's installed. | ||||
|     try: | ||||
|  | @ -599,4 +700,6 @@ def run_as_asyncio_guest( | |||
|     except ImportError: | ||||
|         pass | ||||
| 
 | ||||
|     return asyncio.run(aio_main(trio_main)) | ||||
|     return asyncio.run( | ||||
|         aio_main(trio_main), | ||||
|     ) | ||||
|  |  | |||
|  | @ -156,11 +156,12 @@ class BroadcastState(Struct): | |||
| 
 | ||||
| class BroadcastReceiver(ReceiveChannel): | ||||
|     ''' | ||||
|     A memory receive channel broadcaster which is non-lossy for the | ||||
|     fastest consumer. | ||||
|     A memory receive channel broadcaster which is non-lossy for | ||||
|     the fastest consumer. | ||||
| 
 | ||||
|     Additional consumer tasks can receive all produced values by registering | ||||
|     with ``.subscribe()`` and receiving from the new instance it delivers. | ||||
|     Additional consumer tasks can receive all produced values by | ||||
|     registering with ``.subscribe()`` and receiving from the new | ||||
|     instance it delivers. | ||||
| 
 | ||||
|     ''' | ||||
|     def __init__( | ||||
|  |  | |||
|  | @ -18,8 +18,12 @@ | |||
| Async context manager primitives with hard ``trio``-aware semantics | ||||
| 
 | ||||
| ''' | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from __future__ import annotations | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| import inspect | ||||
| from types import ModuleType | ||||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncContextManager, | ||||
|  | @ -30,13 +34,16 @@ from typing import ( | |||
|     Optional, | ||||
|     Sequence, | ||||
|     TypeVar, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| 
 | ||||
| from tractor._state import current_actor | ||||
| from tractor.log import get_logger | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from tractor import ActorNursery | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
|  | @ -46,8 +53,10 @@ T = TypeVar("T") | |||
| 
 | ||||
| @acm | ||||
| async def maybe_open_nursery( | ||||
|     nursery: trio.Nursery | None = None, | ||||
|     nursery: trio.Nursery|ActorNursery|None = None, | ||||
|     shield: bool = False, | ||||
|     lib: ModuleType = trio, | ||||
| 
 | ||||
| ) -> AsyncGenerator[trio.Nursery, Any]: | ||||
|     ''' | ||||
|     Create a new nursery if None provided. | ||||
|  | @ -58,13 +67,12 @@ async def maybe_open_nursery( | |||
|     if nursery is not None: | ||||
|         yield nursery | ||||
|     else: | ||||
|         async with trio.open_nursery() as nursery: | ||||
|         async with lib.open_nursery() as nursery: | ||||
|             nursery.cancel_scope.shield = shield | ||||
|             yield nursery | ||||
| 
 | ||||
| 
 | ||||
| async def _enter_and_wait( | ||||
| 
 | ||||
|     mngr: AsyncContextManager[T], | ||||
|     unwrapped: dict[int, T], | ||||
|     all_entered: trio.Event, | ||||
|  | @ -91,7 +99,6 @@ async def _enter_and_wait( | |||
| 
 | ||||
| @acm | ||||
| async def gather_contexts( | ||||
| 
 | ||||
|     mngrs: Sequence[AsyncContextManager[T]], | ||||
| 
 | ||||
| ) -> AsyncGenerator[ | ||||
|  | @ -102,15 +109,17 @@ async def gather_contexts( | |||
|     None, | ||||
| ]: | ||||
|     ''' | ||||
|     Concurrently enter a sequence of async context managers, each in | ||||
|     a separate ``trio`` task and deliver the unwrapped values in the | ||||
|     same order once all managers have entered. On exit all contexts are | ||||
|     subsequently and concurrently exited. | ||||
|     Concurrently enter a sequence of async context managers (acms), | ||||
|     each from a separate `trio` task and deliver the unwrapped | ||||
|     `yield`-ed values in the same order once all managers have entered. | ||||
| 
 | ||||
|     This function is somewhat similar to common usage of | ||||
|     ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in | ||||
|     combo with ``asyncio.gather()`` except the managers are concurrently | ||||
|     entered and exited, and cancellation just works. | ||||
|     On exit, all acms are subsequently and concurrently exited. | ||||
| 
 | ||||
|     This function is somewhat similar to a batch of non-blocking | ||||
|     calls to `contextlib.AsyncExitStack.enter_async_context()` | ||||
|     (inside a loop) *in combo with* a `asyncio.gather()` to get the | ||||
|     `.__aenter__()`-ed values, except the managers are both | ||||
|     concurrently entered and exited and *cancellation just works*(R). | ||||
| 
 | ||||
|     ''' | ||||
|     seed: int = id(mngrs) | ||||
|  | @ -210,9 +219,10 @@ async def maybe_open_context( | |||
| 
 | ||||
| ) -> AsyncIterator[tuple[bool, T]]: | ||||
|     ''' | ||||
|     Maybe open a context manager if there is not already a _Cached | ||||
|     version for the provided ``key`` for *this* actor. Return the | ||||
|     _Cached instance on a _Cache hit. | ||||
|     Maybe open an async-context-manager (acm) if there is not already | ||||
|     a `_Cached` version for the provided (input) `key` for *this* actor. | ||||
| 
 | ||||
|     Return the `_Cached` instance on a _Cache hit. | ||||
| 
 | ||||
|     ''' | ||||
|     fid = id(acm_func) | ||||
|  | @ -273,8 +283,13 @@ async def maybe_open_context( | |||
|     else: | ||||
|         _Cache.users += 1 | ||||
|         log.runtime( | ||||
|             f'Reusing resource for `_Cache` user {_Cache.users}\n\n' | ||||
|             f'{ctx_key!r} -> {yielded!r}\n' | ||||
|             f'Re-using cached resource for user {_Cache.users}\n\n' | ||||
|             f'{ctx_key!r} -> {type(yielded)}\n' | ||||
| 
 | ||||
|             # TODO: make this work with values but without | ||||
|             # `msgspec.Struct` causing frickin crashes on field-type | ||||
|             # lookups.. | ||||
|             # f'{ctx_key!r} -> {yielded!r}\n' | ||||
|         ) | ||||
|         lock.release() | ||||
|         yield True, yielded | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue