Compare commits
	
		
			1 Commits 
		
	
	
		
			5ed30dec40
			...
			284fa0340e
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 284fa0340e | 
|  | @ -1,6 +1,7 @@ | ||||||
| """ | """ | ||||||
| ``tractor`` testing!! | ``tractor`` testing!! | ||||||
| """ | """ | ||||||
|  | from functools import partial | ||||||
| import sys | import sys | ||||||
| import subprocess | import subprocess | ||||||
| import os | import os | ||||||
|  | @ -8,6 +9,9 @@ import random | ||||||
| import signal | import signal | ||||||
| import platform | import platform | ||||||
| import time | import time | ||||||
|  | from typing import ( | ||||||
|  |     AsyncContextManager, | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| import pytest | import pytest | ||||||
| import tractor | import tractor | ||||||
|  | @ -150,6 +154,18 @@ def pytest_generate_tests(metafunc): | ||||||
|         metafunc.parametrize("start_method", [spawn_backend], scope='module') |         metafunc.parametrize("start_method", [spawn_backend], scope='module') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # TODO: a way to let test scripts (like from `examples/`) | ||||||
|  | # guarantee they won't registry addr collide! | ||||||
|  | @pytest.fixture | ||||||
|  | def open_test_runtime( | ||||||
|  |     reg_addr: tuple, | ||||||
|  | ) -> AsyncContextManager: | ||||||
|  |     return partial( | ||||||
|  |         tractor.open_nursery, | ||||||
|  |         registry_addrs=[reg_addr], | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def sig_prog(proc, sig): | def sig_prog(proc, sig): | ||||||
|     "Kill the actor-process with ``sig``." |     "Kill the actor-process with ``sig``." | ||||||
|     proc.send_signal(sig) |     proc.send_signal(sig) | ||||||
|  |  | ||||||
|  | @ -41,7 +41,7 @@ from tractor.msg import ( | ||||||
| from tractor.msg.types import ( | from tractor.msg.types import ( | ||||||
|     _payload_msgs, |     _payload_msgs, | ||||||
|     log, |     log, | ||||||
|     Msg, |     PayloadMsg, | ||||||
|     Started, |     Started, | ||||||
|     mk_msg_spec, |     mk_msg_spec, | ||||||
| ) | ) | ||||||
|  | @ -61,7 +61,7 @@ def mk_custom_codec( | ||||||
|     uid: tuple[str, str] = tractor.current_actor().uid |     uid: tuple[str, str] = tractor.current_actor().uid | ||||||
| 
 | 
 | ||||||
|     # XXX NOTE XXX: despite defining `NamespacePath` as a type |     # XXX NOTE XXX: despite defining `NamespacePath` as a type | ||||||
|     # field on our `Msg.pld`, we still need a enc/dec_hook() pair |     # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair | ||||||
|     # to cast to/from that type on the wire. See the docs: |     # to cast to/from that type on the wire. See the docs: | ||||||
|     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types |     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||||
| 
 | 
 | ||||||
|  | @ -321,12 +321,12 @@ def dec_type_union( | ||||||
|     import importlib |     import importlib | ||||||
|     types: list[Type] = [] |     types: list[Type] = [] | ||||||
|     for type_name in type_names: |     for type_name in type_names: | ||||||
|         for ns in [ |         for mod in [ | ||||||
|             typing, |             typing, | ||||||
|             importlib.import_module(__name__), |             importlib.import_module(__name__), | ||||||
|         ]: |         ]: | ||||||
|             if type_ref := getattr( |             if type_ref := getattr( | ||||||
|                 ns, |                 mod, | ||||||
|                 type_name, |                 type_name, | ||||||
|                 False, |                 False, | ||||||
|             ): |             ): | ||||||
|  | @ -744,7 +744,7 @@ def chk_pld_type( | ||||||
|     # 'Error',  .pld: ErrorData |     # 'Error',  .pld: ErrorData | ||||||
| 
 | 
 | ||||||
|     codec: MsgCodec = mk_codec( |     codec: MsgCodec = mk_codec( | ||||||
|         # NOTE: this ONLY accepts `Msg.pld` fields of a specified |         # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified | ||||||
|         # type union. |         # type union. | ||||||
|         ipc_pld_spec=payload_spec, |         ipc_pld_spec=payload_spec, | ||||||
|     ) |     ) | ||||||
|  | @ -752,7 +752,7 @@ def chk_pld_type( | ||||||
|     # make a one-off dec to compare with our `MsgCodec` instance |     # make a one-off dec to compare with our `MsgCodec` instance | ||||||
|     # which does the below `mk_msg_spec()` call internally |     # which does the below `mk_msg_spec()` call internally | ||||||
|     ipc_msg_spec: Union[Type[Struct]] |     ipc_msg_spec: Union[Type[Struct]] | ||||||
|     msg_types: list[Msg[payload_spec]] |     msg_types: list[PayloadMsg[payload_spec]] | ||||||
|     ( |     ( | ||||||
|         ipc_msg_spec, |         ipc_msg_spec, | ||||||
|         msg_types, |         msg_types, | ||||||
|  | @ -761,7 +761,7 @@ def chk_pld_type( | ||||||
|     ) |     ) | ||||||
|     _enc = msgpack.Encoder() |     _enc = msgpack.Encoder() | ||||||
|     _dec = msgpack.Decoder( |     _dec = msgpack.Decoder( | ||||||
|         type=ipc_msg_spec or Any,  # like `Msg[Any]` |         type=ipc_msg_spec or Any,  # like `PayloadMsg[Any]` | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     assert ( |     assert ( | ||||||
|  | @ -806,7 +806,7 @@ def chk_pld_type( | ||||||
|             'cid': '666', |             'cid': '666', | ||||||
|             'pld': pld, |             'pld': pld, | ||||||
|         } |         } | ||||||
|         enc_msg: Msg = typedef(**kwargs) |         enc_msg: PayloadMsg = typedef(**kwargs) | ||||||
| 
 | 
 | ||||||
|         _wire_bytes: bytes = _enc.encode(enc_msg) |         _wire_bytes: bytes = _enc.encode(enc_msg) | ||||||
|         wire_bytes: bytes = codec.enc.encode(enc_msg) |         wire_bytes: bytes = codec.enc.encode(enc_msg) | ||||||
|  | @ -883,25 +883,16 @@ def test_limit_msgspec(): | ||||||
|             debug_mode=True |             debug_mode=True | ||||||
|         ): |         ): | ||||||
| 
 | 
 | ||||||
|             # ensure we can round-trip a boxing `Msg` |             # ensure we can round-trip a boxing `PayloadMsg` | ||||||
|             assert chk_pld_type( |             assert chk_pld_type( | ||||||
|                 # Msg, |                 payload_spec=Any, | ||||||
|                 Any, |                 pld=None, | ||||||
|                 None, |  | ||||||
|                 expect_roundtrip=True, |                 expect_roundtrip=True, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             # TODO: don't need this any more right since |  | ||||||
|             # `msgspec>=0.15` has the nice generics stuff yah?? |  | ||||||
|             # |  | ||||||
|             # manually override the type annot of the payload |  | ||||||
|             # field and ensure it propagates to all msg-subtypes. |  | ||||||
|             # Msg.__annotations__['pld'] = Any |  | ||||||
| 
 |  | ||||||
|             # verify that a mis-typed payload value won't decode |             # verify that a mis-typed payload value won't decode | ||||||
|             assert not chk_pld_type( |             assert not chk_pld_type( | ||||||
|                 # Msg, |                 payload_spec=int, | ||||||
|                 int, |  | ||||||
|                 pld='doggy', |                 pld='doggy', | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  | @ -913,18 +904,16 @@ def test_limit_msgspec(): | ||||||
|                 value: Any |                 value: Any | ||||||
| 
 | 
 | ||||||
|             assert not chk_pld_type( |             assert not chk_pld_type( | ||||||
|                 # Msg, |                 payload_spec=CustomPayload, | ||||||
|                 CustomPayload, |  | ||||||
|                 pld='doggy', |                 pld='doggy', | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             assert chk_pld_type( |             assert chk_pld_type( | ||||||
|                 # Msg, |                 payload_spec=CustomPayload, | ||||||
|                 CustomPayload, |  | ||||||
|                 pld=CustomPayload(name='doggy', value='urmom') |                 pld=CustomPayload(name='doggy', value='urmom') | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             # uhh bc we can `.pause_from_sync()` now! :surfer: |             # yah, we can `.pause_from_sync()` now! | ||||||
|             # breakpoint() |             # breakpoint() | ||||||
| 
 | 
 | ||||||
|     trio.run(main) |     trio.run(main) | ||||||
|  |  | ||||||
|  | @ -1336,6 +1336,23 @@ def test_shield_pause( | ||||||
|     child.expect(pexpect.EOF) |     child.expect(pexpect.EOF) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # TODO: better error for "non-ideal" usage from the root actor. | ||||||
|  | # -[ ] if called from an async scope emit a message that suggests | ||||||
|  | #    using `await tractor.pause()` instead since it's less overhead | ||||||
|  | #    (in terms of `greenback` and/or extra threads) and if it's from | ||||||
|  | #    a sync scope suggest that usage must first call | ||||||
|  | #    `ensure_portal()` in the (eventual parent) async calling scope? | ||||||
|  | def test_sync_pause_from_bg_task_in_root_actor_(): | ||||||
|  |     ''' | ||||||
|  |     When used from the root actor, normally we can only implicitly | ||||||
|  |     support `.pause_from_sync()` from the main-parent-task (that | ||||||
|  |     opens the runtime via `open_root_actor()`) since `greenback` | ||||||
|  |     requires a `.ensure_portal()` call per `trio.Task` where it is | ||||||
|  |     used. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     ... | ||||||
|  | 
 | ||||||
| # TODO: needs ANSI code stripping tho, see `assert_before()` # above! | # TODO: needs ANSI code stripping tho, see `assert_before()` # above! | ||||||
| def test_correct_frames_below_hidden(): | def test_correct_frames_below_hidden(): | ||||||
|     ''' |     ''' | ||||||
|  |  | ||||||
|  | @ -19,7 +19,7 @@ from tractor._testing import ( | ||||||
| @pytest.fixture | @pytest.fixture | ||||||
| def run_example_in_subproc( | def run_example_in_subproc( | ||||||
|     loglevel: str, |     loglevel: str, | ||||||
|     testdir, |     testdir: pytest.Testdir, | ||||||
|     reg_addr: tuple[str, int], |     reg_addr: tuple[str, int], | ||||||
| ): | ): | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -2,16 +2,25 @@ | ||||||
| The hipster way to force SC onto the stdlib's "async": 'infection mode'. | The hipster way to force SC onto the stdlib's "async": 'infection mode'. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from typing import Optional, Iterable, Union |  | ||||||
| import asyncio | import asyncio | ||||||
| import builtins | import builtins | ||||||
|  | from contextlib import ExitStack | ||||||
| import itertools | import itertools | ||||||
| import importlib | import importlib | ||||||
|  | import os | ||||||
|  | from pathlib import Path | ||||||
|  | import signal | ||||||
|  | from typing import ( | ||||||
|  |     Callable, | ||||||
|  |     Iterable, | ||||||
|  |     Union, | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| import pytest | import pytest | ||||||
| import trio | import trio | ||||||
| import tractor | import tractor | ||||||
| from tractor import ( | from tractor import ( | ||||||
|  |     current_actor, | ||||||
|     to_asyncio, |     to_asyncio, | ||||||
|     RemoteActorError, |     RemoteActorError, | ||||||
|     ContextCancelled, |     ContextCancelled, | ||||||
|  | @ -25,8 +34,8 @@ async def sleep_and_err( | ||||||
| 
 | 
 | ||||||
|     # just signature placeholders for compat with |     # just signature placeholders for compat with | ||||||
|     # ``to_asyncio.open_channel_from()`` |     # ``to_asyncio.open_channel_from()`` | ||||||
|     to_trio: Optional[trio.MemorySendChannel] = None, |     to_trio: trio.MemorySendChannel|None = None, | ||||||
|     from_trio: Optional[asyncio.Queue] = None, |     from_trio: asyncio.Queue|None = None, | ||||||
| 
 | 
 | ||||||
| ): | ): | ||||||
|     if to_trio: |     if to_trio: | ||||||
|  | @ -36,7 +45,7 @@ async def sleep_and_err( | ||||||
|     assert 0 |     assert 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def sleep_forever(): | async def aio_sleep_forever(): | ||||||
|     await asyncio.sleep(float('inf')) |     await asyncio.sleep(float('inf')) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -44,7 +53,7 @@ async def trio_cancels_single_aio_task(): | ||||||
| 
 | 
 | ||||||
|     # spawn an ``asyncio`` task to run a func and return result |     # spawn an ``asyncio`` task to run a func and return result | ||||||
|     with trio.move_on_after(.2): |     with trio.move_on_after(.2): | ||||||
|         await tractor.to_asyncio.run_task(sleep_forever) |         await tractor.to_asyncio.run_task(aio_sleep_forever) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_trio_cancels_aio_on_actor_side(reg_addr): | def test_trio_cancels_aio_on_actor_side(reg_addr): | ||||||
|  | @ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def asyncio_actor( | async def asyncio_actor( | ||||||
| 
 |  | ||||||
|     target: str, |     target: str, | ||||||
|     expect_err: Exception|None = None |     expect_err: Exception|None = None | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
| 
 | 
 | ||||||
|     assert tractor.current_actor().is_infected_aio() |     assert tractor.current_actor().is_infected_aio() | ||||||
|     target = globals()[target] |     target: Callable = globals()[target] | ||||||
| 
 | 
 | ||||||
|     if '.' in expect_err: |     if '.' in expect_err: | ||||||
|         modpath, _, name = expect_err.rpartition('.') |         modpath, _, name = expect_err.rpartition('.') | ||||||
|  | @ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr): | ||||||
|         async with tractor.open_nursery() as n: |         async with tractor.open_nursery() as n: | ||||||
|             portal = await n.run_in_actor( |             portal = await n.run_in_actor( | ||||||
|                 asyncio_actor, |                 asyncio_actor, | ||||||
|                 target='sleep_forever', |                 target='aio_sleep_forever', | ||||||
|                 expect_err='trio.Cancelled', |                 expect_err='trio.Cancelled', | ||||||
|                 infect_asyncio=True, |                 infect_asyncio=True, | ||||||
|             ) |             ) | ||||||
|  | @ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr): | ||||||
|             async with tractor.open_nursery() as n: |             async with tractor.open_nursery() as n: | ||||||
|                 await n.run_in_actor( |                 await n.run_in_actor( | ||||||
|                     asyncio_actor, |                     asyncio_actor, | ||||||
|                     target='sleep_forever', |                     target='aio_sleep_forever', | ||||||
|                     expect_err='trio.Cancelled', |                     expect_err='trio.Cancelled', | ||||||
|                     infect_asyncio=True, |                     infect_asyncio=True, | ||||||
|                 ) |                 ) | ||||||
|  | @ -195,7 +203,7 @@ async def trio_ctx( | ||||||
|             # spawn another asyncio task for the cuck of it. |             # spawn another asyncio task for the cuck of it. | ||||||
|             n.start_soon( |             n.start_soon( | ||||||
|                 tractor.to_asyncio.run_task, |                 tractor.to_asyncio.run_task, | ||||||
|                 sleep_forever, |                 aio_sleep_forever, | ||||||
|             ) |             ) | ||||||
|             await trio.sleep_forever() |             await trio.sleep_forever() | ||||||
| 
 | 
 | ||||||
|  | @ -285,7 +293,7 @@ async def aio_cancel(): | ||||||
| 
 | 
 | ||||||
|     # cancel and enter sleep |     # cancel and enter sleep | ||||||
|     task.cancel() |     task.cancel() | ||||||
|     await sleep_forever() |     await aio_sleep_forever() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): | def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): | ||||||
|  | @ -355,7 +363,6 @@ async def push_from_aio_task( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def stream_from_aio( | async def stream_from_aio( | ||||||
| 
 |  | ||||||
|     exit_early: bool = False, |     exit_early: bool = False, | ||||||
|     raise_err: bool = False, |     raise_err: bool = False, | ||||||
|     aio_raise_err: bool = False, |     aio_raise_err: bool = False, | ||||||
|  | @ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics( | ||||||
|         trio.run(main) |         trio.run(main) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def manage_file( | ||||||
|  |     ctx: tractor.Context, | ||||||
|  |     tmp_path_str: str, | ||||||
|  |     bg_aio_task: bool = False, | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Start an `asyncio` task that just sleeps after registering a context | ||||||
|  |     with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree | ||||||
|  |     and ensure the stack is closed in the infected mode child. | ||||||
|  | 
 | ||||||
|  |     To verify the teardown state just write a tmpfile to the `testdir` | ||||||
|  |     and delete it on actor close. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
|  |     tmp_path: Path = Path(tmp_path_str) | ||||||
|  |     tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file' | ||||||
|  | 
 | ||||||
|  |     # create a the tmp file and tell the parent where it's at | ||||||
|  |     assert not tmp_file.is_file() | ||||||
|  |     tmp_file.touch() | ||||||
|  | 
 | ||||||
|  |     stack: ExitStack = current_actor().lifetime_stack | ||||||
|  |     stack.callback(tmp_file.unlink) | ||||||
|  | 
 | ||||||
|  |     await ctx.started(( | ||||||
|  |         str(tmp_file), | ||||||
|  |         os.getpid(), | ||||||
|  |     )) | ||||||
|  | 
 | ||||||
|  |     # expect to be cancelled from here! | ||||||
|  |     try: | ||||||
|  | 
 | ||||||
|  |         # NOTE: turns out you don't even need to sched an aio task | ||||||
|  |         # since the original issue, even though seemingly was due to | ||||||
|  |         # the guest-run being abandoned + a `._debug.pause()` inside | ||||||
|  |         # `._runtime._async_main()` (which was originally trying to | ||||||
|  |         # debug the `.lifetime_stack` not closing), IS NOT actually | ||||||
|  |         # the core issue? | ||||||
|  |         # | ||||||
|  |         # further notes: | ||||||
|  |         # | ||||||
|  |         # - `trio` only issues the " RuntimeWarning: Trio guest run | ||||||
|  |         #   got abandoned without properly finishing... weird stuff | ||||||
|  |         #   might happen" IFF you DO run a asyncio task here, BUT | ||||||
|  |         # - the original issue of the `.lifetime_stack` not closing | ||||||
|  |         #   will still happen even if you don't run an `asyncio` task | ||||||
|  |         #   here even though the "abandon" messgage won't be shown.. | ||||||
|  |         # | ||||||
|  |         # => ????? honestly i'm lost but it seems to be some issue | ||||||
|  |         #   with `asyncio` and SIGINT.. | ||||||
|  |         # | ||||||
|  |         # XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and | ||||||
|  |         # `run_as_asyncio_guest()` is written WITHOUT THE | ||||||
|  |         # `.cancel_soon()` soln, both of these tests will pass ?? | ||||||
|  |         # so maybe it has something to do with `asyncio` loop init | ||||||
|  |         # state?!? | ||||||
|  |         # honestly, this REALLY reminds me why i haven't used | ||||||
|  |         # `asyncio` by choice in years.. XD | ||||||
|  |         # | ||||||
|  |         # await tractor.to_asyncio.run_task(aio_sleep_forever) | ||||||
|  |         if bg_aio_task: | ||||||
|  |             async with trio.open_nursery() as tn: | ||||||
|  |                 tn.start_soon( | ||||||
|  |                     tractor.to_asyncio.run_task, | ||||||
|  |                     aio_sleep_forever, | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |         await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |     # signalled manually at the OS level (aka KBI) by the parent actor. | ||||||
|  |     except KeyboardInterrupt: | ||||||
|  |         print('child raised KBI..') | ||||||
|  |         assert tmp_file.exists() | ||||||
|  |         raise | ||||||
|  |     else: | ||||||
|  |         raise RuntimeError('shoulda received a KBI?') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @pytest.mark.parametrize( | ||||||
|  |     'bg_aio_task', | ||||||
|  |     [ | ||||||
|  |         False, | ||||||
|  | 
 | ||||||
|  |         # NOTE: (and see notes in `manage_file()` above as well) if | ||||||
|  |         # we FOR SURE SPAWN AN AIO TASK in the child it seems the | ||||||
|  |         # "silent-abandon" case (as is described in detail in | ||||||
|  |         # `to_asyncio.run_as_asyncio_guest()`) does not happen and | ||||||
|  |         # `asyncio`'s loop will at least abandon the `trio` side | ||||||
|  |         # loudly? .. prolly the state-spot to start looking for | ||||||
|  |         # a soln that results in NO ABANDONMENT.. XD | ||||||
|  |         True, | ||||||
|  |     ], | ||||||
|  |     ids=[ | ||||||
|  |         'bg_aio_task', | ||||||
|  |         'just_trio_slee', | ||||||
|  |     ], | ||||||
|  | ) | ||||||
|  | @pytest.mark.parametrize( | ||||||
|  |     'wait_for_ctx', | ||||||
|  |     [ | ||||||
|  |         False, | ||||||
|  |         True, | ||||||
|  |     ], | ||||||
|  |     ids=[ | ||||||
|  |         'raise_KBI_in_rent', | ||||||
|  |         'wait_for_ctx', | ||||||
|  |     ], | ||||||
|  | ) | ||||||
|  | def test_sigint_closes_lifetime_stack( | ||||||
|  |     tmp_path: Path, | ||||||
|  |     wait_for_ctx: bool, | ||||||
|  |     bg_aio_task: bool, | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Ensure that an infected child can use the `Actor.lifetime_stack` | ||||||
|  |     to make a file on boot and it's automatically cleaned up by the | ||||||
|  |     actor-lifetime-linked exit stack closure. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     async def main(): | ||||||
|  |         try: | ||||||
|  |             async with tractor.open_nursery() as n: | ||||||
|  |                 p = await n.start_actor( | ||||||
|  |                     'file_mngr', | ||||||
|  |                     enable_modules=[__name__], | ||||||
|  |                     infect_asyncio=True, | ||||||
|  |                 ) | ||||||
|  |                 async with p.open_context( | ||||||
|  |                     manage_file, | ||||||
|  |                     tmp_path_str=str(tmp_path), | ||||||
|  |                     bg_aio_task=bg_aio_task, | ||||||
|  |                 ) as (ctx, first): | ||||||
|  | 
 | ||||||
|  |                     path_str, cpid = first | ||||||
|  |                     tmp_file: Path = Path(path_str) | ||||||
|  |                     assert tmp_file.exists() | ||||||
|  | 
 | ||||||
|  |                     # XXX originally to simulate what (hopefully) | ||||||
|  |                     # the below now triggers.. had to manually | ||||||
|  |                     # trigger a SIGINT from a ctl-c in the root. | ||||||
|  |                     # await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |                     # XXX NOTE XXX signal infected-`asyncio` child to | ||||||
|  |                     # OS-cancel with SIGINT; this should trigger the | ||||||
|  |                     # bad `asyncio` cancel behaviour that can cause | ||||||
|  |                     # a guest-run abandon as was seen causing | ||||||
|  |                     # shm-buffer leaks in `piker`'s live quote stream | ||||||
|  |                     # susbys! | ||||||
|  |                     # | ||||||
|  |                     # await trio.sleep(.5) | ||||||
|  |                     await trio.sleep(.2) | ||||||
|  |                     os.kill( | ||||||
|  |                         cpid, | ||||||
|  |                         signal.SIGINT, | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                     # XXX CASE 1: without the bug fixed, in | ||||||
|  |                     # the non-KBI-raised-in-parent case, this | ||||||
|  |                     # timeout should trigger! | ||||||
|  |                     if wait_for_ctx: | ||||||
|  |                         print('waiting for ctx outcome in parent..') | ||||||
|  |                         try: | ||||||
|  |                             with trio.fail_after(.7): | ||||||
|  |                                 await ctx.wait_for_result() | ||||||
|  |                         except tractor.ContextCancelled as ctxc: | ||||||
|  |                             assert ctxc.canceller == ctx.chan.uid | ||||||
|  |                             raise | ||||||
|  | 
 | ||||||
|  |                     # XXX CASE 2: this seems to be the source of the | ||||||
|  |                     # original issue which exhibited BEFORE we put | ||||||
|  |                     # a `Actor.cancel_soon()` inside | ||||||
|  |                     # `run_as_asyncio_guest()`.. | ||||||
|  |                     else: | ||||||
|  |                         raise KeyboardInterrupt | ||||||
|  | 
 | ||||||
|  |                 pytest.fail('should have raised some kinda error?!?') | ||||||
|  | 
 | ||||||
|  |         except ( | ||||||
|  |             KeyboardInterrupt, | ||||||
|  |             ContextCancelled, | ||||||
|  |         ): | ||||||
|  |             # XXX CASE 2: without the bug fixed, in the | ||||||
|  |             # KBI-raised-in-parent case, the actor teardown should | ||||||
|  |             # never get run (silently abaondoned by `asyncio`..) and | ||||||
|  |             # thus the file should leak! | ||||||
|  |             assert not tmp_file.exists() | ||||||
|  |             assert ctx.maybe_error | ||||||
|  | 
 | ||||||
|  |     trio.run(main) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # TODO: debug_mode tests once we get support for `asyncio`! | # TODO: debug_mode tests once we get support for `asyncio`! | ||||||
| # | # | ||||||
| # -[ ] need tests to wrap both scripts: | # -[ ] need tests to wrap both scripts: | ||||||
|  |  | ||||||
|  | @ -121,10 +121,19 @@ class Unresolved: | ||||||
| @dataclass | @dataclass | ||||||
| class Context: | class Context: | ||||||
|     ''' |     ''' | ||||||
|     An inter-actor, SC transitive, `Task` communication context. |     An inter-actor, SC transitive, `trio.Task` (pair) | ||||||
|  |     communication context. | ||||||
| 
 | 
 | ||||||
|     NB: This class should **never be instatiated directly**, it is allocated |     (We've also considered other names and ideas: | ||||||
|     by the runtime in 2 ways: |      - "communicating tasks scope": cts | ||||||
|  |      - "distributed task scope": dts | ||||||
|  |      - "communicating tasks context": ctc | ||||||
|  | 
 | ||||||
|  |      **Got a better idea for naming? Make an issue dawg!** | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     NB: This class should **never be instatiated directly**, it is | ||||||
|  |     allocated by the runtime in 2 ways: | ||||||
|      - by entering `Portal.open_context()` which is the primary |      - by entering `Portal.open_context()` which is the primary | ||||||
|        public API for any "parent" task or, |        public API for any "parent" task or, | ||||||
|      - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg |      - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg | ||||||
|  | @ -210,6 +219,16 @@ class Context: | ||||||
|     # more the the `Context` is needed? |     # more the the `Context` is needed? | ||||||
|     _portal: Portal | None = None |     _portal: Portal | None = None | ||||||
| 
 | 
 | ||||||
|  |     @property | ||||||
|  |     def portal(self) -> Portal|None: | ||||||
|  |         ''' | ||||||
|  |         Return any wrapping memory-`Portal` if this is | ||||||
|  |         a 'parent'-side task which called `Portal.open_context()`, | ||||||
|  |         otherwise `None`. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         return self._portal | ||||||
|  | 
 | ||||||
|     # NOTE: each side of the context has its own cancel scope |     # NOTE: each side of the context has its own cancel scope | ||||||
|     # which is exactly the primitive that allows for |     # which is exactly the primitive that allows for | ||||||
|     # cross-actor-task-supervision and thus SC. |     # cross-actor-task-supervision and thus SC. | ||||||
|  | @ -299,6 +318,8 @@ class Context: | ||||||
|     # boxed exception. NOW, it's used for spawning overrun queuing |     # boxed exception. NOW, it's used for spawning overrun queuing | ||||||
|     # tasks when `.allow_overruns ==  True` !!! |     # tasks when `.allow_overruns ==  True` !!! | ||||||
|     _scope_nursery: trio.Nursery|None = None |     _scope_nursery: trio.Nursery|None = None | ||||||
|  |     # ^-TODO-^ change name? | ||||||
|  |     # -> `._scope_tn` "scope task nursery" | ||||||
| 
 | 
 | ||||||
|     # streaming overrun state tracking |     # streaming overrun state tracking | ||||||
|     _in_overrun: bool = False |     _in_overrun: bool = False | ||||||
|  | @ -408,10 +429,23 @@ class Context: | ||||||
|         ''' |         ''' | ||||||
|         return self._cancel_called |         return self._cancel_called | ||||||
| 
 | 
 | ||||||
|  |     @cancel_called.setter | ||||||
|  |     def cancel_called(self, val: bool) -> None: | ||||||
|  |         ''' | ||||||
|  |         Set the self-cancelled request `bool` value. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         # to debug who frickin sets it.. | ||||||
|  |         # if val: | ||||||
|  |         #     from .devx import pause_from_sync | ||||||
|  |         #     pause_from_sync() | ||||||
|  | 
 | ||||||
|  |         self._cancel_called = val | ||||||
|  | 
 | ||||||
|     @property |     @property | ||||||
|     def canceller(self) -> tuple[str, str]|None: |     def canceller(self) -> tuple[str, str]|None: | ||||||
|         ''' |         ''' | ||||||
|         ``Actor.uid: tuple[str, str]`` of the (remote) |         `Actor.uid: tuple[str, str]` of the (remote) | ||||||
|         actor-process who's task was cancelled thus causing this |         actor-process who's task was cancelled thus causing this | ||||||
|         (side of the) context to also be cancelled. |         (side of the) context to also be cancelled. | ||||||
| 
 | 
 | ||||||
|  | @ -515,7 +549,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|             # the local scope was never cancelled |             # the local scope was never cancelled | ||||||
|             # and instead likely we received a remote side |             # and instead likely we received a remote side | ||||||
|             # # cancellation that was raised inside `.result()` |             # # cancellation that was raised inside `.wait_for_result()` | ||||||
|             # or ( |             # or ( | ||||||
|             #     (se := self._local_error) |             #     (se := self._local_error) | ||||||
|             #     and se is re |             #     and se is re | ||||||
|  | @ -585,6 +619,8 @@ class Context: | ||||||
|         self, |         self, | ||||||
|         error: BaseException, |         error: BaseException, | ||||||
| 
 | 
 | ||||||
|  |         set_cancel_called: bool = False, | ||||||
|  | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         (Maybe) cancel this local scope due to a received remote |         (Maybe) cancel this local scope due to a received remote | ||||||
|  | @ -603,7 +639,7 @@ class Context: | ||||||
|         - `Portal.open_context()` |         - `Portal.open_context()` | ||||||
|         - `Portal.result()` |         - `Portal.result()` | ||||||
|         - `Context.open_stream()` |         - `Context.open_stream()` | ||||||
|         - `Context.result()` |         - `Context.wait_for_result()` | ||||||
| 
 | 
 | ||||||
|         when called/closed by actor local task(s). |         when called/closed by actor local task(s). | ||||||
| 
 | 
 | ||||||
|  | @ -729,7 +765,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         # Cancel the local `._scope`, catch that |         # Cancel the local `._scope`, catch that | ||||||
|         # `._scope.cancelled_caught` and re-raise any remote error |         # `._scope.cancelled_caught` and re-raise any remote error | ||||||
|         # once exiting (or manually calling `.result()`) the |         # once exiting (or manually calling `.wait_for_result()`) the | ||||||
|         # `.open_context()`  block. |         # `.open_context()`  block. | ||||||
|         cs: trio.CancelScope = self._scope |         cs: trio.CancelScope = self._scope | ||||||
|         if ( |         if ( | ||||||
|  | @ -764,8 +800,9 @@ class Context: | ||||||
|                 # `trio.Cancelled` subtype here ;) |                 # `trio.Cancelled` subtype here ;) | ||||||
|                 # https://github.com/goodboy/tractor/issues/368 |                 # https://github.com/goodboy/tractor/issues/368 | ||||||
|                 message: str = 'Cancelling `Context._scope` !\n\n' |                 message: str = 'Cancelling `Context._scope` !\n\n' | ||||||
|  |                 # from .devx import pause_from_sync | ||||||
|  |                 # pause_from_sync() | ||||||
|                 self._scope.cancel() |                 self._scope.cancel() | ||||||
| 
 |  | ||||||
|         else: |         else: | ||||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' |             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||||
|             # from .devx import mk_pdb |             # from .devx import mk_pdb | ||||||
|  | @ -889,7 +926,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         side: str = self.side |         side: str = self.side | ||||||
|         self._cancel_called: bool = True |         self.cancel_called: bool = True | ||||||
| 
 | 
 | ||||||
|         header: str = ( |         header: str = ( | ||||||
|             f'Cancelling ctx with peer from {side.upper()} side\n\n' |             f'Cancelling ctx with peer from {side.upper()} side\n\n' | ||||||
|  | @ -912,7 +949,7 @@ class Context: | ||||||
|         # `._scope.cancel()` since we expect the eventual |         # `._scope.cancel()` since we expect the eventual | ||||||
|         # `ContextCancelled` from the other side to trigger this |         # `ContextCancelled` from the other side to trigger this | ||||||
|         # when the runtime finally receives it during teardown |         # when the runtime finally receives it during teardown | ||||||
|         # (normally in `.result()` called from |         # (normally in `.wait_for_result()` called from | ||||||
|         # `Portal.open_context().__aexit__()`) |         # `Portal.open_context().__aexit__()`) | ||||||
|         if side == 'parent': |         if side == 'parent': | ||||||
|             if not self._portal: |             if not self._portal: | ||||||
|  | @ -1025,10 +1062,10 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = hide_tb |         __tracebackhide__: bool = hide_tb | ||||||
|         our_uid: tuple = self.chan.uid |         peer_uid: tuple = self.chan.uid | ||||||
| 
 | 
 | ||||||
|         # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption |         # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption | ||||||
|         # for "graceful cancellation" case: |         # for "graceful cancellation" case(s): | ||||||
|         # |         # | ||||||
|         # Whenever a "side" of a context (a `Task` running in |         # Whenever a "side" of a context (a `Task` running in | ||||||
|         # an actor) **is** the side which requested ctx |         # an actor) **is** the side which requested ctx | ||||||
|  | @ -1045,9 +1082,11 @@ class Context: | ||||||
|         # set to the `Actor.uid` of THIS task (i.e. the |         # set to the `Actor.uid` of THIS task (i.e. the | ||||||
|         # cancellation requesting task's actor is the actor |         # cancellation requesting task's actor is the actor | ||||||
|         # checking whether it should absorb the ctxc). |         # checking whether it should absorb the ctxc). | ||||||
|  |         self_ctxc: bool = self._is_self_cancelled(remote_error) | ||||||
|         if ( |         if ( | ||||||
|  |             self_ctxc | ||||||
|  |             and | ||||||
|             not raise_ctxc_from_self_call |             not raise_ctxc_from_self_call | ||||||
|             and self._is_self_cancelled(remote_error) |  | ||||||
| 
 | 
 | ||||||
|             # TODO: ?potentially it is useful to emit certain |             # TODO: ?potentially it is useful to emit certain | ||||||
|             # warning/cancel logs for the cases where the |             # warning/cancel logs for the cases where the | ||||||
|  | @ -1077,8 +1116,8 @@ class Context: | ||||||
|             and isinstance(remote_error, RemoteActorError) |             and isinstance(remote_error, RemoteActorError) | ||||||
|             and remote_error.boxed_type is StreamOverrun |             and remote_error.boxed_type is StreamOverrun | ||||||
| 
 | 
 | ||||||
|             # and tuple(remote_error.msgdata['sender']) == our_uid |             # and tuple(remote_error.msgdata['sender']) == peer_uid | ||||||
|             and tuple(remote_error.sender) == our_uid |             and tuple(remote_error.sender) == peer_uid | ||||||
|         ): |         ): | ||||||
|             # NOTE: we set the local scope error to any "self |             # NOTE: we set the local scope error to any "self | ||||||
|             # cancellation" error-response thus "absorbing" |             # cancellation" error-response thus "absorbing" | ||||||
|  | @ -1140,9 +1179,9 @@ class Context: | ||||||
|         of the remote cancellation. |         of the remote cancellation. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__ = hide_tb |         __tracebackhide__: bool = False | ||||||
|         assert self._portal, ( |         assert self._portal, ( | ||||||
|             "Context.result() can not be called from callee side!" |             '`Context.wait_for_result()` can not be called from callee side!' | ||||||
|         ) |         ) | ||||||
|         if self._final_result_is_set(): |         if self._final_result_is_set(): | ||||||
|             return self._result |             return self._result | ||||||
|  | @ -1169,7 +1208,8 @@ class Context: | ||||||
|                 drained_msgs, |                 drained_msgs, | ||||||
|             ) = await msgops.drain_to_final_msg( |             ) = await msgops.drain_to_final_msg( | ||||||
|                 ctx=self, |                 ctx=self, | ||||||
|                 hide_tb=hide_tb, |                 # hide_tb=hide_tb, | ||||||
|  |                 hide_tb=False, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             drained_status: str = ( |             drained_status: str = ( | ||||||
|  | @ -1185,6 +1225,8 @@ class Context: | ||||||
| 
 | 
 | ||||||
|             log.cancel(drained_status) |             log.cancel(drained_status) | ||||||
| 
 | 
 | ||||||
|  |         # __tracebackhide__: bool = hide_tb | ||||||
|  | 
 | ||||||
|         self.maybe_raise( |         self.maybe_raise( | ||||||
|             # NOTE: obvi we don't care if we |             # NOTE: obvi we don't care if we | ||||||
|             # overran the far end if we're already |             # overran the far end if we're already | ||||||
|  | @ -1197,7 +1239,8 @@ class Context: | ||||||
|                 # raising something we know might happen |                 # raising something we know might happen | ||||||
|                 # during cancellation ;) |                 # during cancellation ;) | ||||||
|                 (not self._cancel_called) |                 (not self._cancel_called) | ||||||
|             ) |             ), | ||||||
|  |             hide_tb=hide_tb, | ||||||
|         ) |         ) | ||||||
|         # TODO: eventually make `.outcome: Outcome` and thus return |         # TODO: eventually make `.outcome: Outcome` and thus return | ||||||
|         # `self.outcome.unwrap()` here! |         # `self.outcome.unwrap()` here! | ||||||
|  | @ -1583,7 +1626,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         - NEVER `return` early before delivering the msg! |         - NEVER `return` early before delivering the msg! | ||||||
|           bc if the error is a ctxc and there is a task waiting on |           bc if the error is a ctxc and there is a task waiting on | ||||||
|           `.result()` we need the msg to be |           `.wait_for_result()` we need the msg to be | ||||||
|           `send_chan.send_nowait()`-ed over the `._rx_chan` so |           `send_chan.send_nowait()`-ed over the `._rx_chan` so | ||||||
|           that the error is relayed to that waiter task and thus |           that the error is relayed to that waiter task and thus | ||||||
|           raised in user code! |           raised in user code! | ||||||
|  | @ -1828,7 +1871,7 @@ async def open_context_from_portal( | ||||||
|     When the "callee" (side that is "called"/started by a call |     When the "callee" (side that is "called"/started by a call | ||||||
|     to *this* method) returns, the caller side (this) unblocks |     to *this* method) returns, the caller side (this) unblocks | ||||||
|     and any final value delivered from the other end can be |     and any final value delivered from the other end can be | ||||||
|     retrieved using the `Contex.result()` api. |     retrieved using the `Contex.wait_for_result()` api. | ||||||
| 
 | 
 | ||||||
|     The yielded ``Context`` instance further allows for opening |     The yielded ``Context`` instance further allows for opening | ||||||
|     bidirectional streams, explicit cancellation and |     bidirectional streams, explicit cancellation and | ||||||
|  | @ -1965,14 +2008,14 @@ async def open_context_from_portal( | ||||||
|             yield ctx, first |             yield ctx, first | ||||||
| 
 | 
 | ||||||
|             # ??TODO??: do we still want to consider this or is |             # ??TODO??: do we still want to consider this or is | ||||||
|             # the `else:` block handling via a `.result()` |             # the `else:` block handling via a `.wait_for_result()` | ||||||
|             # call below enough?? |             # call below enough?? | ||||||
|             # |             # | ||||||
|             # -[ ] pretty sure `.result()` internals do the |             # -[ ] pretty sure `.wait_for_result()` internals do the | ||||||
|             # same as our ctxc handler below so it ended up |             # same as our ctxc handler below so it ended up | ||||||
|             # being same (repeated?) behaviour, but ideally we |             # being same (repeated?) behaviour, but ideally we | ||||||
|             # wouldn't have that duplication either by somehow |             # wouldn't have that duplication either by somehow | ||||||
|             # factoring the `.result()` handler impl in a way |             # factoring the `.wait_for_result()` handler impl in a way | ||||||
|             # that we can re-use it around the `yield` ^ here |             # that we can re-use it around the `yield` ^ here | ||||||
|             # or vice versa? |             # or vice versa? | ||||||
|             # |             # | ||||||
|  | @ -2110,7 +2153,7 @@ async def open_context_from_portal( | ||||||
|         #    AND a group-exc is only raised if there was > 1 |         #    AND a group-exc is only raised if there was > 1 | ||||||
|         #    tasks started *here* in the "caller" / opener |         #    tasks started *here* in the "caller" / opener | ||||||
|         #    block. If any one of those tasks calls |         #    block. If any one of those tasks calls | ||||||
|         #    `.result()` or `MsgStream.receive()` |         #    `.wait_for_result()` or `MsgStream.receive()` | ||||||
|         #    `._maybe_raise_remote_err()` will be transitively |         #    `._maybe_raise_remote_err()` will be transitively | ||||||
|         #    called and the remote error raised causing all |         #    called and the remote error raised causing all | ||||||
|         #    tasks to be cancelled. |         #    tasks to be cancelled. | ||||||
|  | @ -2180,7 +2223,7 @@ async def open_context_from_portal( | ||||||
|                 f'|_{ctx._task}\n' |                 f'|_{ctx._task}\n' | ||||||
|             ) |             ) | ||||||
|             # XXX NOTE XXX: the below call to |             # XXX NOTE XXX: the below call to | ||||||
|             # `Context.result()` will ALWAYS raise |             # `Context.wait_for_result()` will ALWAYS raise | ||||||
|             # a `ContextCancelled` (via an embedded call to |             # a `ContextCancelled` (via an embedded call to | ||||||
|             # `Context._maybe_raise_remote_err()`) IFF |             # `Context._maybe_raise_remote_err()`) IFF | ||||||
|             # a `Context._remote_error` was set by the runtime |             # a `Context._remote_error` was set by the runtime | ||||||
|  | @ -2190,10 +2233,10 @@ async def open_context_from_portal( | ||||||
|             # ALWAYS SET any time "callee" side fails and causes "caller |             # ALWAYS SET any time "callee" side fails and causes "caller | ||||||
|             # side" cancellation via a `ContextCancelled` here. |             # side" cancellation via a `ContextCancelled` here. | ||||||
|             try: |             try: | ||||||
|                 result_or_err: Exception|Any = await ctx.result() |                 result_or_err: Exception|Any = await ctx.wait_for_result() | ||||||
|             except BaseException as berr: |             except BaseException as berr: | ||||||
|                 # on normal teardown, if we get some error |                 # on normal teardown, if we get some error | ||||||
|                 # raised in `Context.result()` we still want to |                 # raised in `Context.wait_for_result()` we still want to | ||||||
|                 # save that error on the ctx's state to |                 # save that error on the ctx's state to | ||||||
|                 # determine things like `.cancelled_caught` for |                 # determine things like `.cancelled_caught` for | ||||||
|                 # cases where there was remote cancellation but |                 # cases where there was remote cancellation but | ||||||
|  |  | ||||||
|  | @ -56,14 +56,12 @@ async def get_registry( | ||||||
| ]: | ]: | ||||||
|     ''' |     ''' | ||||||
|     Return a portal instance connected to a local or remote |     Return a portal instance connected to a local or remote | ||||||
|     arbiter. |     registry-service actor; if a connection already exists re-use it | ||||||
|  |     (presumably to call a `.register_actor()` registry runtime RPC | ||||||
|  |     ep). | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     actor = current_actor() |     actor: Actor = current_actor() | ||||||
| 
 |  | ||||||
|     if not actor: |  | ||||||
|         raise RuntimeError("No actor instance has been defined yet?") |  | ||||||
| 
 |  | ||||||
|     if actor.is_registrar: |     if actor.is_registrar: | ||||||
|         # we're already the arbiter |         # we're already the arbiter | ||||||
|         # (likely a re-entrant call from the arbiter actor) |         # (likely a re-entrant call from the arbiter actor) | ||||||
|  | @ -72,6 +70,8 @@ async def get_registry( | ||||||
|             Channel((host, port)) |             Channel((host, port)) | ||||||
|         ) |         ) | ||||||
|     else: |     else: | ||||||
|  |         # TODO: try to look pre-existing connection from | ||||||
|  |         # `Actor._peers` and use it instead? | ||||||
|         async with ( |         async with ( | ||||||
|             _connect_chan(host, port) as chan, |             _connect_chan(host, port) as chan, | ||||||
|             open_portal(chan) as regstr_ptl, |             open_portal(chan) as regstr_ptl, | ||||||
|  |  | ||||||
|  | @ -20,7 +20,8 @@ Sub-process entry points. | ||||||
| """ | """ | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
| from functools import partial | from functools import partial | ||||||
| # import textwrap | import os | ||||||
|  | import textwrap | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
|  | @ -58,7 +59,7 @@ def _mp_main( | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     ''' |     ''' | ||||||
|     The routine called *after fork* which invokes a fresh ``trio.run`` |     The routine called *after fork* which invokes a fresh `trio.run()` | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     actor._forkserver_info = forkserver_info |     actor._forkserver_info = forkserver_info | ||||||
|  | @ -96,6 +97,35 @@ def _mp_main( | ||||||
|         log.info(f"Subactor {actor.uid} terminated") |         log.info(f"Subactor {actor.uid} terminated") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # TODO: move this to some kinda `.devx._conc_lang.py` eventually | ||||||
|  | # as we work out our multi-domain state-flow-syntax! | ||||||
|  | def nest_from_op( | ||||||
|  |     input_op: str, | ||||||
|  |     tree_str: str, | ||||||
|  | 
 | ||||||
|  |     back_from_op: int = 1, | ||||||
|  | ) -> str: | ||||||
|  |     ''' | ||||||
|  |     Depth-increment the input (presumably hierarchy/supervision) | ||||||
|  |     input "tree string" below the provided `input_op` execution | ||||||
|  |     operator, so injecting a `"\n|_{input_op}\n"`and indenting the | ||||||
|  |     `tree_str` to nest content aligned with the ops last char. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     return ( | ||||||
|  |         f'{input_op}\n' | ||||||
|  |         + | ||||||
|  |         textwrap.indent( | ||||||
|  |             tree_str, | ||||||
|  |             prefix=( | ||||||
|  |                 len(input_op) | ||||||
|  |                 - | ||||||
|  |                 back_from_op | ||||||
|  |             ) *' ', | ||||||
|  |         ) | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def _trio_main( | def _trio_main( | ||||||
|     actor: Actor, |     actor: Actor, | ||||||
|     *, |     *, | ||||||
|  | @ -119,7 +149,6 @@ def _trio_main( | ||||||
| 
 | 
 | ||||||
|     if actor.loglevel is not None: |     if actor.loglevel is not None: | ||||||
|         get_console_log(actor.loglevel) |         get_console_log(actor.loglevel) | ||||||
|         import os |  | ||||||
|         actor_info: str = ( |         actor_info: str = ( | ||||||
|             f'|_{actor}\n' |             f'|_{actor}\n' | ||||||
|             f'  uid: {actor.uid}\n' |             f'  uid: {actor.uid}\n' | ||||||
|  | @ -128,13 +157,29 @@ def _trio_main( | ||||||
|             f'  loglevel: {actor.loglevel}\n' |             f'  loglevel: {actor.loglevel}\n' | ||||||
|         ) |         ) | ||||||
|         log.info( |         log.info( | ||||||
|             'Started new trio subactor:\n' |             'Started new `trio` subactor:\n' | ||||||
|             + |             + | ||||||
|             '>\n'  # like a "started/play"-icon from super perspective |             nest_from_op( | ||||||
|             + |                 input_op='(>',  # like a "started/play"-icon from super perspective | ||||||
|             actor_info, |                 tree_str=actor_info, | ||||||
|  |             ) | ||||||
|  |             # '>(\n'  # like a "started/play"-icon from super perspective | ||||||
|  |             # + | ||||||
|  |             # actor_info, | ||||||
|  |         ) | ||||||
|  |     logmeth = log.info | ||||||
|  |     message: str = ( | ||||||
|  |     # log.info( | ||||||
|  |         'Subactor terminated\n' | ||||||
|  |         + | ||||||
|  |         nest_from_op( | ||||||
|  |             input_op=')>',  # like a "started/play"-icon from super perspective | ||||||
|  |             tree_str=actor_info, | ||||||
|  |         ) | ||||||
|  |         # 'x\n'  # like a "crossed-out/killed" from super perspective | ||||||
|  |         # + | ||||||
|  |         # actor_info | ||||||
|     ) |     ) | ||||||
| 
 |  | ||||||
|     try: |     try: | ||||||
|         if infect_asyncio: |         if infect_asyncio: | ||||||
|             actor._infected_aio = True |             actor._infected_aio = True | ||||||
|  | @ -143,16 +188,18 @@ def _trio_main( | ||||||
|             trio.run(trio_main) |             trio.run(trio_main) | ||||||
| 
 | 
 | ||||||
|     except KeyboardInterrupt: |     except KeyboardInterrupt: | ||||||
|         log.cancel( |         logmeth = log.cancel | ||||||
|             'Actor received KBI\n' |         message: str = ( | ||||||
|  |             'Actor received KBI (aka an OS-cancel)\n' | ||||||
|             + |             + | ||||||
|             actor_info |             nest_from_op( | ||||||
|  |                 input_op='c)>',  # like a "started/play"-icon from super perspective | ||||||
|  |                 tree_str=actor_info, | ||||||
|             ) |             ) | ||||||
|  |         ) | ||||||
|  |     except BaseException: | ||||||
|  |         log.exception('Actor crashed exit?') | ||||||
|  |         raise | ||||||
|  | 
 | ||||||
|     finally: |     finally: | ||||||
|         log.info( |         logmeth(message) | ||||||
|             'Subactor terminated\n' |  | ||||||
|             + |  | ||||||
|             'x\n'  # like a "crossed-out/killed" from super perspective |  | ||||||
|             + |  | ||||||
|             actor_info |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|  | @ -922,15 +922,6 @@ class NoRuntime(RuntimeError): | ||||||
|     "The root actor has not been initialized yet" |     "The root actor has not been initialized yet" | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| class AsyncioCancelled(Exception): |  | ||||||
|     ''' |  | ||||||
|     Asyncio cancelled translation (non-base) error |  | ||||||
|     for use with the ``to_asyncio`` module |  | ||||||
|     to be raised in the ``trio`` side task |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
| 
 |  | ||||||
| class MessagingError(Exception): | class MessagingError(Exception): | ||||||
|     ''' |     ''' | ||||||
|     IPC related msg (typing), transaction (ordering) or dialog |     IPC related msg (typing), transaction (ordering) or dialog | ||||||
|  | @ -1324,7 +1315,9 @@ def _mk_recv_mte( | ||||||
|         any_pld: Any = msgpack.decode(msg.pld) |         any_pld: Any = msgpack.decode(msg.pld) | ||||||
|         message: str = ( |         message: str = ( | ||||||
|             f'invalid `{msg_type.__qualname__}` msg payload\n\n' |             f'invalid `{msg_type.__qualname__}` msg payload\n\n' | ||||||
|             f'value: `{any_pld!r}` does not match type-spec: ' |             f'{any_pld!r}\n\n' | ||||||
|  |             f'has type {type(any_pld)!r}\n\n' | ||||||
|  |             f'and does not match type-spec ' | ||||||
|             f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' |             f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' | ||||||
|         ) |         ) | ||||||
|         bad_msg = msg |         bad_msg = msg | ||||||
|  |  | ||||||
|  | @ -40,6 +40,7 @@ from typing import ( | ||||||
|     TypeVar, |     TypeVar, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | # import pdbp | ||||||
| import msgspec | import msgspec | ||||||
| from tricycle import BufferedReceiveStream | from tricycle import BufferedReceiveStream | ||||||
| import trio | import trio | ||||||
|  | @ -290,12 +291,14 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|                 else: |                 else: | ||||||
|                     raise |                     raise | ||||||
| 
 | 
 | ||||||
|  |     # @pdbp.hideframe | ||||||
|     async def send( |     async def send( | ||||||
|         self, |         self, | ||||||
|         msg: msgtypes.MsgType, |         msg: msgtypes.MsgType, | ||||||
| 
 | 
 | ||||||
|         strict_types: bool = True, |         strict_types: bool = True, | ||||||
|         # hide_tb: bool = False, |         hide_tb: bool = False, | ||||||
|  | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         Send a msgpack encoded py-object-blob-as-msg over TCP. |         Send a msgpack encoded py-object-blob-as-msg over TCP. | ||||||
|  | @ -304,7 +307,10 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|         invalid msg type |         invalid msg type | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         # __tracebackhide__: bool = hide_tb |         __tracebackhide__: bool = hide_tb | ||||||
|  |         # try: | ||||||
|  |         # XXX see `trio._sync.AsyncContextManagerMixin` for details | ||||||
|  |         # on the `.acquire()`/`.release()` sequencing.. | ||||||
|         async with self._send_lock: |         async with self._send_lock: | ||||||
| 
 | 
 | ||||||
|             # NOTE: lookup the `trio.Task.context`'s var for |             # NOTE: lookup the `trio.Task.context`'s var for | ||||||
|  | @ -352,6 +358,14 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|             size: bytes = struct.pack("<I", len(bytes_data)) |             size: bytes = struct.pack("<I", len(bytes_data)) | ||||||
|             return await self.stream.send_all(size + bytes_data) |             return await self.stream.send_all(size + bytes_data) | ||||||
| 
 | 
 | ||||||
|  |         # TODO: does it help ever to dynamically show this | ||||||
|  |         # frame? | ||||||
|  |         # except BaseException as _err: | ||||||
|  |         #     err = _err | ||||||
|  |         #     if not isinstance(err, MsgTypeError): | ||||||
|  |         #         __tracebackhide__: bool = False | ||||||
|  |         #     raise | ||||||
|  | 
 | ||||||
|     @property |     @property | ||||||
|     def laddr(self) -> tuple[str, int]: |     def laddr(self) -> tuple[str, int]: | ||||||
|         return self._laddr |         return self._laddr | ||||||
|  | @ -560,27 +574,40 @@ class Channel: | ||||||
|         ) |         ) | ||||||
|         return transport |         return transport | ||||||
| 
 | 
 | ||||||
|  |     # TODO: something like, | ||||||
|  |     # `pdbp.hideframe_on(errors=[MsgTypeError])` | ||||||
|  |     # @pdbp.hideframe | ||||||
|     async def send( |     async def send( | ||||||
|         self, |         self, | ||||||
|         payload: Any, |         payload: Any, | ||||||
| 
 | 
 | ||||||
|         # hide_tb: bool = False, |         hide_tb: bool = False, | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         Send a coded msg-blob over the transport. |         Send a coded msg-blob over the transport. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         # __tracebackhide__: bool = hide_tb |         __tracebackhide__: bool = hide_tb | ||||||
|  |         try: | ||||||
|             log.transport( |             log.transport( | ||||||
|                 '=> send IPC msg:\n\n' |                 '=> send IPC msg:\n\n' | ||||||
|                 f'{pformat(payload)}\n' |                 f'{pformat(payload)}\n' | ||||||
|         )  # type: ignore |             ) | ||||||
|         assert self._transport |             # assert self._transport  # but why typing? | ||||||
|             await self._transport.send( |             await self._transport.send( | ||||||
|                 payload, |                 payload, | ||||||
|             # hide_tb=hide_tb, |                 hide_tb=hide_tb, | ||||||
|             ) |             ) | ||||||
|  |         except BaseException as _err: | ||||||
|  |             err = _err  # bind for introspection | ||||||
|  |             if not isinstance(_err, MsgTypeError): | ||||||
|  |                 # assert err | ||||||
|  |                 __tracebackhide__: bool = False | ||||||
|  |             else: | ||||||
|  |                 assert err.cid | ||||||
|  | 
 | ||||||
|  |             raise | ||||||
| 
 | 
 | ||||||
|     async def recv(self) -> Any: |     async def recv(self) -> Any: | ||||||
|         assert self._transport |         assert self._transport | ||||||
|  |  | ||||||
|  | @ -121,7 +121,8 @@ class Portal: | ||||||
|         ) |         ) | ||||||
|         return self.chan |         return self.chan | ||||||
| 
 | 
 | ||||||
|     # TODO: factor this out into an `ActorNursery` wrapper |     # TODO: factor this out into a `.highlevel` API-wrapper that uses | ||||||
|  |     # a single `.open_context()` call underneath. | ||||||
|     async def _submit_for_result( |     async def _submit_for_result( | ||||||
|         self, |         self, | ||||||
|         ns: str, |         ns: str, | ||||||
|  | @ -141,13 +142,22 @@ class Portal: | ||||||
|             portal=self, |             portal=self, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |     # TODO: we should deprecate this API right? since if we remove | ||||||
|  |     # `.run_in_actor()` (and instead move it to a `.highlevel` | ||||||
|  |     # wrapper api (around a single `.open_context()` call) we don't | ||||||
|  |     # really have any notion of a "main" remote task any more? | ||||||
|  |     # | ||||||
|     # @api_frame |     # @api_frame | ||||||
|     async def result(self) -> Any: |     async def wait_for_result( | ||||||
|  |         self, | ||||||
|  |         hide_tb: bool = True, | ||||||
|  |     ) -> Any: | ||||||
|         ''' |         ''' | ||||||
|         Return the result(s) from the remote actor's "main" task. |         Return the final result delivered by a `Return`-msg from the | ||||||
|  |         remote peer actor's "main" task's `return` statement. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__ = True |         __tracebackhide__: bool = hide_tb | ||||||
|         # Check for non-rpc errors slapped on the |         # Check for non-rpc errors slapped on the | ||||||
|         # channel for which we always raise |         # channel for which we always raise | ||||||
|         exc = self.channel._exc |         exc = self.channel._exc | ||||||
|  | @ -182,6 +192,23 @@ class Portal: | ||||||
| 
 | 
 | ||||||
|         return self._final_result_pld |         return self._final_result_pld | ||||||
| 
 | 
 | ||||||
|  |     # TODO: factor this out into a `.highlevel` API-wrapper that uses | ||||||
|  |     # a single `.open_context()` call underneath. | ||||||
|  |     async def result( | ||||||
|  |         self, | ||||||
|  |         *args, | ||||||
|  |         **kwargs, | ||||||
|  |     ) -> Any|Exception: | ||||||
|  |         typname: str = type(self).__name__ | ||||||
|  |         log.warning( | ||||||
|  |             f'`{typname}.result()` is DEPRECATED!\n' | ||||||
|  |             'Use `{typname.wait_for_result()` instead!\n' | ||||||
|  |         ) | ||||||
|  |         return await self.wait_for_result( | ||||||
|  |             *args, | ||||||
|  |             **kwargs, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|     async def _cancel_streams(self): |     async def _cancel_streams(self): | ||||||
|         # terminate all locally running async generator |         # terminate all locally running async generator | ||||||
|         # IPC calls |         # IPC calls | ||||||
|  | @ -240,6 +267,7 @@ class Portal: | ||||||
|             f'{reminfo}' |             f'{reminfo}' | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |         # XXX the one spot we set it? | ||||||
|         self.channel._cancel_called: bool = True |         self.channel._cancel_called: bool = True | ||||||
|         try: |         try: | ||||||
|             # send cancel cmd - might not get response |             # send cancel cmd - might not get response | ||||||
|  | @ -279,6 +307,8 @@ class Portal: | ||||||
|             ) |             ) | ||||||
|             return False |             return False | ||||||
| 
 | 
 | ||||||
|  |     # TODO: do we still need this for low level `Actor`-runtime | ||||||
|  |     # method calls or can we also remove it? | ||||||
|     async def run_from_ns( |     async def run_from_ns( | ||||||
|         self, |         self, | ||||||
|         namespace_path: str, |         namespace_path: str, | ||||||
|  | @ -316,6 +346,8 @@ class Portal: | ||||||
|             expect_msg=Return, |             expect_msg=Return, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |     # TODO: factor this out into a `.highlevel` API-wrapper that uses | ||||||
|  |     # a single `.open_context()` call underneath. | ||||||
|     async def run( |     async def run( | ||||||
|         self, |         self, | ||||||
|         func: str, |         func: str, | ||||||
|  | @ -370,6 +402,8 @@ class Portal: | ||||||
|             expect_msg=Return, |             expect_msg=Return, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |     # TODO: factor this out into a `.highlevel` API-wrapper that uses | ||||||
|  |     # a single `.open_context()` call underneath. | ||||||
|     @acm |     @acm | ||||||
|     async def open_stream_from( |     async def open_stream_from( | ||||||
|         self, |         self, | ||||||
|  |  | ||||||
|  | @ -21,6 +21,7 @@ Root actor runtime ignition(s). | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
| from functools import partial | from functools import partial | ||||||
| import importlib | import importlib | ||||||
|  | import inspect | ||||||
| import logging | import logging | ||||||
| import os | import os | ||||||
| import signal | import signal | ||||||
|  | @ -115,10 +116,16 @@ async def open_root_actor( | ||||||
|     if ( |     if ( | ||||||
|         debug_mode |         debug_mode | ||||||
|         and maybe_enable_greenback |         and maybe_enable_greenback | ||||||
|         and await _debug.maybe_init_greenback( |         and ( | ||||||
|  |             maybe_mod := await _debug.maybe_init_greenback( | ||||||
|                 raise_not_found=False, |                 raise_not_found=False, | ||||||
|             ) |             ) | ||||||
|  |         ) | ||||||
|     ): |     ): | ||||||
|  |         logger.info( | ||||||
|  |             f'Found `greenback` installed @ {maybe_mod}\n' | ||||||
|  |             'Enabling `tractor.pause_from_sync()` support!\n' | ||||||
|  |         ) | ||||||
|         os.environ['PYTHONBREAKPOINT'] = ( |         os.environ['PYTHONBREAKPOINT'] = ( | ||||||
|             'tractor.devx._debug._sync_pause_from_builtin' |             'tractor.devx._debug._sync_pause_from_builtin' | ||||||
|         ) |         ) | ||||||
|  | @ -264,7 +271,10 @@ async def open_root_actor( | ||||||
| 
 | 
 | ||||||
|         except OSError: |         except OSError: | ||||||
|             # TODO: make this a "discovery" log level? |             # TODO: make this a "discovery" log level? | ||||||
|             logger.warning(f'No actor registry found @ {addr}') |             logger.info( | ||||||
|  |                 f'No actor registry found @ {addr}\n' | ||||||
|  |                 # 'Registry will be initialized in local actor..' | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|     async with trio.open_nursery() as tn: |     async with trio.open_nursery() as tn: | ||||||
|         for addr in registry_addrs: |         for addr in registry_addrs: | ||||||
|  | @ -365,23 +375,25 @@ async def open_root_actor( | ||||||
|             ) |             ) | ||||||
|             try: |             try: | ||||||
|                 yield actor |                 yield actor | ||||||
| 
 |  | ||||||
|             except ( |             except ( | ||||||
|                 Exception, |                 Exception, | ||||||
|                 BaseExceptionGroup, |                 BaseExceptionGroup, | ||||||
|             ) as err: |             ) as err: | ||||||
| 
 |                 # XXX NOTE XXX see equiv note inside | ||||||
|                 import inspect |                 # `._runtime.Actor._stream_handler()` where in the | ||||||
|  |                 # non-root or root-that-opened-this-mahually case we | ||||||
|  |                 # wait for the local actor-nursery to exit before | ||||||
|  |                 # exiting the transport channel handler. | ||||||
|                 entered: bool = await _debug._maybe_enter_pm( |                 entered: bool = await _debug._maybe_enter_pm( | ||||||
|                     err, |                     err, | ||||||
|                     api_frame=inspect.currentframe(), |                     api_frame=inspect.currentframe(), | ||||||
|                 ) |                 ) | ||||||
| 
 |  | ||||||
|                 if ( |                 if ( | ||||||
|                     not entered |                     not entered | ||||||
|                     and not is_multi_cancelled(err) |                     and | ||||||
|  |                     not is_multi_cancelled(err) | ||||||
|                 ): |                 ): | ||||||
|                     logger.exception('Root actor crashed:\n') |                     logger.exception('Root actor crashed\n') | ||||||
| 
 | 
 | ||||||
|                 # ALWAYS re-raise any error bubbled up from the |                 # ALWAYS re-raise any error bubbled up from the | ||||||
|                 # runtime! |                 # runtime! | ||||||
|  |  | ||||||
|  | @ -89,6 +89,15 @@ if TYPE_CHECKING: | ||||||
| log = get_logger('tractor') | log = get_logger('tractor') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # TODO: move to a `tractor.lowlevel.rpc` with the below | ||||||
|  | # func-type-cases implemented "on top of" `@context` defs. | ||||||
|  | # -[ ] std async func | ||||||
|  | # -[ ] `Portal.open_stream_from()` with async-gens? | ||||||
|  | #  |_ possibly a duplex form of this with a  | ||||||
|  | #    `sent_from_peer = yield send_to_peer` form, which would require | ||||||
|  | #    syncing the send/recv side with possibly `.receive_nowait()` | ||||||
|  | #    on each `yield`? | ||||||
|  | # -[ ]  | ||||||
| async def _invoke_non_context( | async def _invoke_non_context( | ||||||
|     actor: Actor, |     actor: Actor, | ||||||
|     cancel_scope: CancelScope, |     cancel_scope: CancelScope, | ||||||
|  | @ -108,6 +117,7 @@ async def _invoke_non_context( | ||||||
|     ] = trio.TASK_STATUS_IGNORED, |     ] = trio.TASK_STATUS_IGNORED, | ||||||
| ): | ): | ||||||
|     __tracebackhide__: bool = True |     __tracebackhide__: bool = True | ||||||
|  |     cs: CancelScope|None = None  # ref when activated | ||||||
| 
 | 
 | ||||||
|     # TODO: can we unify this with the `context=True` impl below? |     # TODO: can we unify this with the `context=True` impl below? | ||||||
|     if inspect.isasyncgen(coro): |     if inspect.isasyncgen(coro): | ||||||
|  | @ -160,10 +170,6 @@ async def _invoke_non_context( | ||||||
|                 functype='asyncgen', |                 functype='asyncgen', | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
|         # XXX: the async-func may spawn further tasks which push |  | ||||||
|         # back values like an async-generator would but must |  | ||||||
|         # manualy construct the response dict-packet-responses as |  | ||||||
|         # above |  | ||||||
|         with cancel_scope as cs: |         with cancel_scope as cs: | ||||||
|             ctx._scope = cs |             ctx._scope = cs | ||||||
|             task_status.started(ctx) |             task_status.started(ctx) | ||||||
|  | @ -175,15 +181,13 @@ async def _invoke_non_context( | ||||||
|             await chan.send( |             await chan.send( | ||||||
|                 Stop(cid=cid) |                 Stop(cid=cid) | ||||||
|             ) |             ) | ||||||
|     else: | 
 | ||||||
|         # regular async function/method |     # simplest function/method request-response pattern | ||||||
|         # XXX: possibly just a scheduled `Actor._cancel_task()` |     # XXX: in the most minimally used case, just a scheduled internal runtime | ||||||
|         # from a remote request to cancel some `Context`. |     # call to `Actor._cancel_task()` from the ctx-peer task since we | ||||||
|  |     # don't (yet) have a dedicated IPC msg. | ||||||
|     # ------ - ------ |     # ------ - ------ | ||||||
|         # TODO: ideally we unify this with the above `context=True` |     else: | ||||||
|         # block such that for any remote invocation ftype, we |  | ||||||
|         # always invoke the far end RPC task scheduling the same |  | ||||||
|         # way: using the linked IPC context machinery. |  | ||||||
|         failed_resp: bool = False |         failed_resp: bool = False | ||||||
|         try: |         try: | ||||||
|             ack = StartAck( |             ack = StartAck( | ||||||
|  | @ -354,8 +358,14 @@ async def _errors_relayed_via_ipc( | ||||||
|             # channel. |             # channel. | ||||||
|             task_status.started(err) |             task_status.started(err) | ||||||
| 
 | 
 | ||||||
|         # always reraise KBIs so they propagate at the sys-process level. |         # always reraise KBIs so they propagate at the sys-process | ||||||
|         if isinstance(err, KeyboardInterrupt): |         # level. | ||||||
|  |         # XXX LOL, except when running in asyncio mode XD | ||||||
|  |         # cmon guys, wtf.. | ||||||
|  |         if ( | ||||||
|  |             isinstance(err, KeyboardInterrupt) | ||||||
|  |             # and not actor.is_infected_aio() | ||||||
|  |         ): | ||||||
|             raise |             raise | ||||||
| 
 | 
 | ||||||
|     # RPC task bookeeping. |     # RPC task bookeeping. | ||||||
|  | @ -458,7 +468,6 @@ async def _invoke( | ||||||
|     # tb: TracebackType = None |     # tb: TracebackType = None | ||||||
| 
 | 
 | ||||||
|     cancel_scope = CancelScope() |     cancel_scope = CancelScope() | ||||||
|     cs: CancelScope|None = None  # ref when activated |  | ||||||
|     ctx = actor.get_context( |     ctx = actor.get_context( | ||||||
|         chan=chan, |         chan=chan, | ||||||
|         cid=cid, |         cid=cid, | ||||||
|  | @ -607,6 +616,8 @@ async def _invoke( | ||||||
|         #     `@context` marked RPC function. |         #     `@context` marked RPC function. | ||||||
|         # - `._portal` is never set. |         # - `._portal` is never set. | ||||||
|         try: |         try: | ||||||
|  |             tn: trio.Nursery | ||||||
|  |             rpc_ctx_cs: CancelScope | ||||||
|             async with ( |             async with ( | ||||||
|                 trio.open_nursery() as tn, |                 trio.open_nursery() as tn, | ||||||
|                 msgops.maybe_limit_plds( |                 msgops.maybe_limit_plds( | ||||||
|  | @ -616,7 +627,7 @@ async def _invoke( | ||||||
|                 ), |                 ), | ||||||
|             ): |             ): | ||||||
|                 ctx._scope_nursery = tn |                 ctx._scope_nursery = tn | ||||||
|                 ctx._scope = tn.cancel_scope |                 rpc_ctx_cs = ctx._scope = tn.cancel_scope | ||||||
|                 task_status.started(ctx) |                 task_status.started(ctx) | ||||||
| 
 | 
 | ||||||
|                 # TODO: better `trionics` tooling: |                 # TODO: better `trionics` tooling: | ||||||
|  | @ -642,7 +653,7 @@ async def _invoke( | ||||||
|             #   itself calls `ctx._maybe_cancel_and_set_remote_error()` |             #   itself calls `ctx._maybe_cancel_and_set_remote_error()` | ||||||
|             #   which cancels the scope presuming the input error |             #   which cancels the scope presuming the input error | ||||||
|             #   is not a `.cancel_acked` pleaser. |             #   is not a `.cancel_acked` pleaser. | ||||||
|             if ctx._scope.cancelled_caught: |             if rpc_ctx_cs.cancelled_caught: | ||||||
|                 our_uid: tuple = actor.uid |                 our_uid: tuple = actor.uid | ||||||
| 
 | 
 | ||||||
|                 # first check for and raise any remote error |                 # first check for and raise any remote error | ||||||
|  | @ -652,9 +663,7 @@ async def _invoke( | ||||||
|                 if re := ctx._remote_error: |                 if re := ctx._remote_error: | ||||||
|                     ctx._maybe_raise_remote_err(re) |                     ctx._maybe_raise_remote_err(re) | ||||||
| 
 | 
 | ||||||
|                 cs: CancelScope = ctx._scope |                 if rpc_ctx_cs.cancel_called: | ||||||
| 
 |  | ||||||
|                 if cs.cancel_called: |  | ||||||
|                     canceller: tuple = ctx.canceller |                     canceller: tuple = ctx.canceller | ||||||
|                     explain: str = f'{ctx.side!r}-side task was cancelled by ' |                     explain: str = f'{ctx.side!r}-side task was cancelled by ' | ||||||
| 
 | 
 | ||||||
|  | @ -680,9 +689,15 @@ async def _invoke( | ||||||
|                     elif canceller == ctx.chan.uid: |                     elif canceller == ctx.chan.uid: | ||||||
|                         explain += f'its {ctx.peer_side!r}-side peer' |                         explain += f'its {ctx.peer_side!r}-side peer' | ||||||
| 
 | 
 | ||||||
|                     else: |                     elif canceller == our_uid: | ||||||
|  |                         explain += 'itself' | ||||||
|  | 
 | ||||||
|  |                     elif canceller: | ||||||
|                         explain += 'a remote peer' |                         explain += 'a remote peer' | ||||||
| 
 | 
 | ||||||
|  |                     else: | ||||||
|  |                         explain += 'an unknown cause?' | ||||||
|  | 
 | ||||||
|                     explain += ( |                     explain += ( | ||||||
|                         add_div(message=explain) |                         add_div(message=explain) | ||||||
|                         + |                         + | ||||||
|  | @ -1238,7 +1253,7 @@ async def process_messages( | ||||||
|                 'Exiting IPC msg loop with final msg\n\n' |                 'Exiting IPC msg loop with final msg\n\n' | ||||||
|                 f'<= peer: {chan.uid}\n' |                 f'<= peer: {chan.uid}\n' | ||||||
|                 f'  |_{chan}\n\n' |                 f'  |_{chan}\n\n' | ||||||
|                 f'{pretty_struct.pformat(msg)}' |                 # f'{pretty_struct.pformat(msg)}' | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         log.runtime(message) |         log.runtime(message) | ||||||
|  |  | ||||||
|  | @ -1046,6 +1046,10 @@ class Actor: | ||||||
|                 # TODO: another `Struct` for rtvs.. |                 # TODO: another `Struct` for rtvs.. | ||||||
|                 rvs: dict[str, Any] = spawnspec._runtime_vars |                 rvs: dict[str, Any] = spawnspec._runtime_vars | ||||||
|                 if rvs['_debug_mode']: |                 if rvs['_debug_mode']: | ||||||
|  |                     from .devx import ( | ||||||
|  |                         enable_stack_on_sig, | ||||||
|  |                         maybe_init_greenback, | ||||||
|  |                     ) | ||||||
|                     try: |                     try: | ||||||
|                         # TODO: maybe return some status msgs upward |                         # TODO: maybe return some status msgs upward | ||||||
|                         # to that we can emit them in `con_status` |                         # to that we can emit them in `con_status` | ||||||
|  | @ -1053,13 +1057,27 @@ class Actor: | ||||||
|                         log.devx( |                         log.devx( | ||||||
|                             'Enabling `stackscope` traces on SIGUSR1' |                             'Enabling `stackscope` traces on SIGUSR1' | ||||||
|                         ) |                         ) | ||||||
|                         from .devx import enable_stack_on_sig |  | ||||||
|                         enable_stack_on_sig() |                         enable_stack_on_sig() | ||||||
|  | 
 | ||||||
|                     except ImportError: |                     except ImportError: | ||||||
|                         log.warning( |                         log.warning( | ||||||
|                             '`stackscope` not installed for use in debug mode!' |                             '`stackscope` not installed for use in debug mode!' | ||||||
|                         ) |                         ) | ||||||
| 
 | 
 | ||||||
|  |                     if rvs.get('use_greenback', False): | ||||||
|  |                         maybe_mod: ModuleType|None = await maybe_init_greenback() | ||||||
|  |                         if maybe_mod: | ||||||
|  |                             log.devx( | ||||||
|  |                                 'Activated `greenback` ' | ||||||
|  |                                 'for `tractor.pause_from_sync()` support!' | ||||||
|  |                             ) | ||||||
|  |                         else: | ||||||
|  |                             rvs['use_greenback'] = False | ||||||
|  |                             log.warning( | ||||||
|  |                                 '`greenback` not installed for use in debug mode!\n' | ||||||
|  |                                 '`tractor.pause_from_sync()` not available!' | ||||||
|  |                             ) | ||||||
|  | 
 | ||||||
|                 rvs['_is_root'] = False |                 rvs['_is_root'] = False | ||||||
|                 _state._runtime_vars.update(rvs) |                 _state._runtime_vars.update(rvs) | ||||||
| 
 | 
 | ||||||
|  | @ -1717,8 +1735,8 @@ async def async_main( | ||||||
| 
 | 
 | ||||||
|                 # Register with the arbiter if we're told its addr |                 # Register with the arbiter if we're told its addr | ||||||
|                 log.runtime( |                 log.runtime( | ||||||
|                     f'Registering `{actor.name}` ->\n' |                     f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' | ||||||
|                     f'{pformat(accept_addrs)}' |                     # ^-TODO-^ we should instead show the maddr here^^ | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|                 # TODO: ideally we don't fan out to all registrars |                 # TODO: ideally we don't fan out to all registrars | ||||||
|  | @ -1776,9 +1794,15 @@ async def async_main( | ||||||
| 
 | 
 | ||||||
|         # Blocks here as expected until the root nursery is |         # Blocks here as expected until the root nursery is | ||||||
|         # killed (i.e. this actor is cancelled or signalled by the parent) |         # killed (i.e. this actor is cancelled or signalled by the parent) | ||||||
|     except Exception as err: |     except Exception as internal_err: | ||||||
|         log.runtime("Closing all actor lifetime contexts") |         # ls: ExitStack = actor.lifetime_stack | ||||||
|         actor.lifetime_stack.close() |         # log.cancel( | ||||||
|  |         #     'Closing all actor-lifetime exec scopes\n\n' | ||||||
|  |         #     f'|_{ls}\n' | ||||||
|  |         # ) | ||||||
|  |         # # _debug.pause_from_sync() | ||||||
|  |         # # await _debug.pause(shield=True) | ||||||
|  |         # ls.close() | ||||||
| 
 | 
 | ||||||
|         if not is_registered: |         if not is_registered: | ||||||
|             # TODO: I guess we could try to connect back |             # TODO: I guess we could try to connect back | ||||||
|  | @ -1786,7 +1810,8 @@ async def async_main( | ||||||
|             # once we have that all working with std streams locking? |             # once we have that all working with std streams locking? | ||||||
|             log.exception( |             log.exception( | ||||||
|                 f"Actor errored and failed to register with arbiter " |                 f"Actor errored and failed to register with arbiter " | ||||||
|                 f"@ {actor.reg_addrs[0]}?") |                 f"@ {actor.reg_addrs[0]}?" | ||||||
|  |             ) | ||||||
|             log.error( |             log.error( | ||||||
|                 "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" |                 "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" | ||||||
|                 "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" |                 "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" | ||||||
|  | @ -1799,25 +1824,44 @@ async def async_main( | ||||||
|         if actor._parent_chan: |         if actor._parent_chan: | ||||||
|             await try_ship_error_to_remote( |             await try_ship_error_to_remote( | ||||||
|                 actor._parent_chan, |                 actor._parent_chan, | ||||||
|                 err, |                 internal_err, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         # always! |         # always! | ||||||
|         match err: |         match internal_err: | ||||||
|             case ContextCancelled(): |             case ContextCancelled(): | ||||||
|                 log.cancel( |                 log.cancel( | ||||||
|                     f'Actor: {actor.uid} was task-context-cancelled with,\n' |                     f'Actor: {actor.uid} was task-context-cancelled with,\n' | ||||||
|                     f'str(err)' |                     f'str(internal_err)' | ||||||
|                 ) |                 ) | ||||||
|             case _: |             case _: | ||||||
|                 log.exception("Actor errored:") |                 log.exception("Actor errored:") | ||||||
|         raise |         raise | ||||||
| 
 | 
 | ||||||
|     finally: |     finally: | ||||||
|         log.runtime( |         teardown_msg: str = ( | ||||||
|             'Runtime nursery complete' |             'Runtime nursery complete' | ||||||
|             '-> Closing all actor lifetime contexts..' |  | ||||||
|         ) |         ) | ||||||
|  | 
 | ||||||
|  |         ls: ExitStack = actor.lifetime_stack | ||||||
|  |         cbs: list[Callable] = [ | ||||||
|  |             repr(tup[1].__wrapped__) | ||||||
|  |             for tup in ls._exit_callbacks | ||||||
|  |         ] | ||||||
|  |         if cbs: | ||||||
|  |             cbs_str: str = '\n'.join(cbs) | ||||||
|  |             teardown_msg += ( | ||||||
|  |                 '-> Closing all actor-lifetime callbacks\n\n' | ||||||
|  |                 f'|_{cbs_str}\n' | ||||||
|  |             ) | ||||||
|  |             # XXX NOTE XXX this will cause an error which | ||||||
|  |             # prevents any `infected_aio` actor from continuing | ||||||
|  |             # and any callbacks in the `ls` here WILL NOT be | ||||||
|  |             # called!! | ||||||
|  |             # await _debug.pause(shield=True) | ||||||
|  | 
 | ||||||
|  |         ls.close() | ||||||
|  | 
 | ||||||
|         # tear down all lifetime contexts if not in guest mode |         # tear down all lifetime contexts if not in guest mode | ||||||
|         # XXX: should this just be in the entrypoint? |         # XXX: should this just be in the entrypoint? | ||||||
|         actor.lifetime_stack.close() |         actor.lifetime_stack.close() | ||||||
|  | @ -1856,23 +1900,28 @@ async def async_main( | ||||||
|                     failed = True |                     failed = True | ||||||
| 
 | 
 | ||||||
|                 if failed: |                 if failed: | ||||||
|                     log.warning( |                     teardown_msg += ( | ||||||
|                         f'Failed to unregister {actor.name} from ' |                         f'-> Failed to unregister {actor.name} from ' | ||||||
|                         f'registar @ {addr}' |                         f'registar @ {addr}\n' | ||||||
|                     ) |                     ) | ||||||
|  |                     # log.warning( | ||||||
| 
 | 
 | ||||||
|         # Ensure all peers (actors connected to us as clients) are finished |         # Ensure all peers (actors connected to us as clients) are finished | ||||||
|         if not actor._no_more_peers.is_set(): |         if not actor._no_more_peers.is_set(): | ||||||
|             if any( |             if any( | ||||||
|                 chan.connected() for chan in chain(*actor._peers.values()) |                 chan.connected() for chan in chain(*actor._peers.values()) | ||||||
|             ): |             ): | ||||||
|                 log.runtime( |                 teardown_msg += ( | ||||||
|                     f"Waiting for remaining peers {actor._peers} to clear") |                     f'-> Waiting for remaining peers {actor._peers} to clear..\n' | ||||||
|  |                 ) | ||||||
|  |                 log.runtime(teardown_msg) | ||||||
|                 with CancelScope(shield=True): |                 with CancelScope(shield=True): | ||||||
|                     await actor._no_more_peers.wait() |                     await actor._no_more_peers.wait() | ||||||
|         log.runtime("All peer channels are complete") |  | ||||||
| 
 | 
 | ||||||
|     log.runtime("Runtime completed") |         teardown_msg += ('-> All peer channels are complete\n') | ||||||
|  | 
 | ||||||
|  |     teardown_msg += ('Actor runtime completed') | ||||||
|  |     log.info(teardown_msg) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: rename to `Registry` and move to `._discovery`! | # TODO: rename to `Registry` and move to `._discovery`! | ||||||
|  |  | ||||||
|  | @ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = { | ||||||
|     '_root_mailbox': (None, None), |     '_root_mailbox': (None, None), | ||||||
|     '_registry_addrs': [], |     '_registry_addrs': [], | ||||||
| 
 | 
 | ||||||
|     # for `breakpoint()` support |     # for `tractor.pause_from_sync()` & `breakpoint()` support | ||||||
|     'use_greenback': False, |     'use_greenback': False, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel): | ||||||
|     @property |     @property | ||||||
|     def ctx(self) -> Context: |     def ctx(self) -> Context: | ||||||
|         ''' |         ''' | ||||||
|         This stream's IPC `Context` ref. |         A read-only ref to this stream's inter-actor-task `Context`. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         return self._ctx |         return self._ctx | ||||||
|  |  | ||||||
|  | @ -80,6 +80,7 @@ class ActorNursery: | ||||||
|     ''' |     ''' | ||||||
|     def __init__( |     def __init__( | ||||||
|         self, |         self, | ||||||
|  |         # TODO: maybe def these as fields of a struct looking type? | ||||||
|         actor: Actor, |         actor: Actor, | ||||||
|         ria_nursery: trio.Nursery, |         ria_nursery: trio.Nursery, | ||||||
|         da_nursery: trio.Nursery, |         da_nursery: trio.Nursery, | ||||||
|  | @ -88,8 +89,10 @@ class ActorNursery: | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         # self.supervisor = supervisor  # TODO |         # self.supervisor = supervisor  # TODO | ||||||
|         self._actor: Actor = actor |         self._actor: Actor = actor | ||||||
|         self._ria_nursery = ria_nursery | 
 | ||||||
|  |         # TODO: rename to `._tn` for our conventional "task-nursery" | ||||||
|         self._da_nursery = da_nursery |         self._da_nursery = da_nursery | ||||||
|  | 
 | ||||||
|         self._children: dict[ |         self._children: dict[ | ||||||
|             tuple[str, str], |             tuple[str, str], | ||||||
|             tuple[ |             tuple[ | ||||||
|  | @ -98,15 +101,13 @@ class ActorNursery: | ||||||
|                 Portal | None, |                 Portal | None, | ||||||
|             ] |             ] | ||||||
|         ] = {} |         ] = {} | ||||||
|         # portals spawned with ``run_in_actor()`` are | 
 | ||||||
|         # cancelled when their "main" result arrives |  | ||||||
|         self._cancel_after_result_on_exit: set = set() |  | ||||||
|         self.cancelled: bool = False |         self.cancelled: bool = False | ||||||
|         self._join_procs = trio.Event() |         self._join_procs = trio.Event() | ||||||
|         self._at_least_one_child_in_debug: bool = False |         self._at_least_one_child_in_debug: bool = False | ||||||
|         self.errors = errors |         self.errors = errors | ||||||
|         self.exited = trio.Event() |  | ||||||
|         self._scope_error: BaseException|None = None |         self._scope_error: BaseException|None = None | ||||||
|  |         self.exited = trio.Event() | ||||||
| 
 | 
 | ||||||
|         # NOTE: when no explicit call is made to |         # NOTE: when no explicit call is made to | ||||||
|         # `.open_root_actor()` by application code, |         # `.open_root_actor()` by application code, | ||||||
|  | @ -116,6 +117,13 @@ class ActorNursery: | ||||||
|         # and syncing purposes to any actor opened nurseries. |         # and syncing purposes to any actor opened nurseries. | ||||||
|         self._implicit_runtime_started: bool = False |         self._implicit_runtime_started: bool = False | ||||||
| 
 | 
 | ||||||
|  |         # TODO: remove the `.run_in_actor()` API and thus this 2ndary | ||||||
|  |         # nursery when that API get's moved outside this primitive! | ||||||
|  |         self._ria_nursery = ria_nursery | ||||||
|  |         # portals spawned with ``run_in_actor()`` are | ||||||
|  |         # cancelled when their "main" result arrives | ||||||
|  |         self._cancel_after_result_on_exit: set = set() | ||||||
|  | 
 | ||||||
|     async def start_actor( |     async def start_actor( | ||||||
|         self, |         self, | ||||||
|         name: str, |         name: str, | ||||||
|  | @ -126,10 +134,14 @@ class ActorNursery: | ||||||
|         rpc_module_paths: list[str]|None = None, |         rpc_module_paths: list[str]|None = None, | ||||||
|         enable_modules: list[str]|None = None, |         enable_modules: list[str]|None = None, | ||||||
|         loglevel: str|None = None,  # set log level per subactor |         loglevel: str|None = None,  # set log level per subactor | ||||||
|         nursery: trio.Nursery|None = None, |  | ||||||
|         debug_mode: bool|None = None, |         debug_mode: bool|None = None, | ||||||
|         infect_asyncio: bool = False, |         infect_asyncio: bool = False, | ||||||
| 
 | 
 | ||||||
|  |         # TODO: ideally we can rm this once we no longer have | ||||||
|  |         # a `._ria_nursery` since the dependent APIs have been | ||||||
|  |         # removed! | ||||||
|  |         nursery: trio.Nursery|None = None, | ||||||
|  | 
 | ||||||
|     ) -> Portal: |     ) -> Portal: | ||||||
|         ''' |         ''' | ||||||
|         Start a (daemon) actor: an process that has no designated |         Start a (daemon) actor: an process that has no designated | ||||||
|  | @ -200,6 +212,7 @@ class ActorNursery: | ||||||
|     #  |_ dynamic @context decoration on child side |     #  |_ dynamic @context decoration on child side | ||||||
|     #  |_ implicit `Portal.open_context() as (ctx, first):` |     #  |_ implicit `Portal.open_context() as (ctx, first):` | ||||||
|     #    and `return first` on parent side. |     #    and `return first` on parent side. | ||||||
|  |     #  |_ mention how it's similar to `trio-parallel` API? | ||||||
|     # -[ ] use @api_frame on the wrapper |     # -[ ] use @api_frame on the wrapper | ||||||
|     async def run_in_actor( |     async def run_in_actor( | ||||||
|         self, |         self, | ||||||
|  | @ -269,11 +282,14 @@ class ActorNursery: | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         Cancel this nursery by instructing each subactor to cancel |         Cancel this actor-nursery by instructing each subactor's | ||||||
|         itself and wait for all subactors to terminate. |         runtime to cancel and wait for all underlying sub-processes | ||||||
|  |         to terminate. | ||||||
| 
 | 
 | ||||||
|         If ``hard_killl`` is set to ``True`` then kill the processes |         If `hard_kill` is set then kill the processes directly using | ||||||
|         directly without any far end graceful ``trio`` cancellation. |         the spawning-backend's API/OS-machinery without any attempt | ||||||
|  |         at (graceful) `trio`-style cancellation using our | ||||||
|  |         `Actor.cancel()`. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __runtimeframe__: int = 1  # noqa |         __runtimeframe__: int = 1  # noqa | ||||||
|  | @ -629,8 +645,12 @@ async def open_nursery( | ||||||
|             f'|_{an}\n' |             f'|_{an}\n' | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         # shutdown runtime if it was started |  | ||||||
|         if implicit_runtime: |         if implicit_runtime: | ||||||
|  |             # shutdown runtime if it was started and report noisly | ||||||
|  |             # that we're did so. | ||||||
|             msg += '=> Shutting down actor runtime <=\n' |             msg += '=> Shutting down actor runtime <=\n' | ||||||
| 
 |  | ||||||
|             log.info(msg) |             log.info(msg) | ||||||
|  | 
 | ||||||
|  |         else: | ||||||
|  |             # keep noise low during std operation. | ||||||
|  |             log.runtime(msg) | ||||||
|  |  | ||||||
|  | @ -29,6 +29,7 @@ from ._debug import ( | ||||||
|     shield_sigint_handler as shield_sigint_handler, |     shield_sigint_handler as shield_sigint_handler, | ||||||
|     open_crash_handler as open_crash_handler, |     open_crash_handler as open_crash_handler, | ||||||
|     maybe_open_crash_handler as maybe_open_crash_handler, |     maybe_open_crash_handler as maybe_open_crash_handler, | ||||||
|  |     maybe_init_greenback as maybe_init_greenback, | ||||||
|     post_mortem as post_mortem, |     post_mortem as post_mortem, | ||||||
|     mk_pdb as mk_pdb, |     mk_pdb as mk_pdb, | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | @ -69,6 +69,7 @@ from trio import ( | ||||||
| import tractor | import tractor | ||||||
| from tractor.log import get_logger | from tractor.log import get_logger | ||||||
| from tractor._context import Context | from tractor._context import Context | ||||||
|  | from tractor import _state | ||||||
| from tractor._state import ( | from tractor._state import ( | ||||||
|     current_actor, |     current_actor, | ||||||
|     is_root_process, |     is_root_process, | ||||||
|  | @ -87,9 +88,6 @@ if TYPE_CHECKING: | ||||||
|     from tractor._runtime import ( |     from tractor._runtime import ( | ||||||
|         Actor, |         Actor, | ||||||
|     ) |     ) | ||||||
|     from tractor.msg import ( |  | ||||||
|         _codec, |  | ||||||
|     ) |  | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
|  | @ -1599,11 +1597,13 @@ async def _pause( | ||||||
|     try: |     try: | ||||||
|         task: Task = current_task() |         task: Task = current_task() | ||||||
|     except RuntimeError as rte: |     except RuntimeError as rte: | ||||||
|  |         __tracebackhide__: bool = False | ||||||
|         log.exception('Failed to get current task?') |         log.exception('Failed to get current task?') | ||||||
|         if actor.is_infected_aio(): |         if actor.is_infected_aio(): | ||||||
|  |             # mk_pdb().set_trace() | ||||||
|             raise RuntimeError( |             raise RuntimeError( | ||||||
|                 '`tractor.pause[_from_sync]()` not yet supported ' |                 '`tractor.pause[_from_sync]()` not yet supported ' | ||||||
|                 'for infected `asyncio` mode!' |                 'directly (infected) `asyncio` tasks!' | ||||||
|             ) from rte |             ) from rte | ||||||
| 
 | 
 | ||||||
|         raise |         raise | ||||||
|  | @ -2163,10 +2163,8 @@ def maybe_import_greenback( | ||||||
|         return False |         return False | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def maybe_init_greenback( | async def maybe_init_greenback(**kwargs) -> None|ModuleType: | ||||||
|     **kwargs, |     try: | ||||||
| ) -> None|ModuleType: |  | ||||||
| 
 |  | ||||||
|         if mod := maybe_import_greenback(**kwargs): |         if mod := maybe_import_greenback(**kwargs): | ||||||
|             await mod.ensure_portal() |             await mod.ensure_portal() | ||||||
|             log.devx( |             log.devx( | ||||||
|  | @ -2174,11 +2172,13 @@ async def maybe_init_greenback( | ||||||
|                 'Sync debug support activated!\n' |                 'Sync debug support activated!\n' | ||||||
|             ) |             ) | ||||||
|             return mod |             return mod | ||||||
|  |     except BaseException: | ||||||
|  |         log.exception('Failed to init `greenback`..') | ||||||
|  |         raise | ||||||
| 
 | 
 | ||||||
|     return None |     return None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| async def _pause_from_bg_root_thread( | async def _pause_from_bg_root_thread( | ||||||
|     behalf_of_thread: Thread, |     behalf_of_thread: Thread, | ||||||
|     repl: PdbREPL, |     repl: PdbREPL, | ||||||
|  | @ -2399,9 +2399,16 @@ def pause_from_sync( | ||||||
|         else:  # we are presumably the `trio.run()` + main thread |         else:  # we are presumably the `trio.run()` + main thread | ||||||
|             # raises on not-found by default |             # raises on not-found by default | ||||||
|             greenback: ModuleType = maybe_import_greenback() |             greenback: ModuleType = maybe_import_greenback() | ||||||
|  | 
 | ||||||
|  |             # TODO: how to ensure this is either dynamically (if | ||||||
|  |             # needed) called here (in some bg tn??) or that the | ||||||
|  |             # subactor always already called it? | ||||||
|  |             # greenback: ModuleType = await maybe_init_greenback() | ||||||
|  | 
 | ||||||
|             message += f'-> imported {greenback}\n' |             message += f'-> imported {greenback}\n' | ||||||
|             repl_owner: Task = current_task() |             repl_owner: Task = current_task() | ||||||
|             message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' |             message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' | ||||||
|  |             try: | ||||||
|                 out = greenback.await_( |                 out = greenback.await_( | ||||||
|                     _pause( |                     _pause( | ||||||
|                         debug_func=None, |                         debug_func=None, | ||||||
|  | @ -2411,6 +2418,18 @@ def pause_from_sync( | ||||||
|                         **_pause_kwargs, |                         **_pause_kwargs, | ||||||
|                     ) |                     ) | ||||||
|                 ) |                 ) | ||||||
|  |             except RuntimeError as rte: | ||||||
|  |                 if not _state._runtime_vars.get( | ||||||
|  |                         'use_greenback', | ||||||
|  |                         False, | ||||||
|  |                 ): | ||||||
|  |                     raise RuntimeError( | ||||||
|  |                         '`greenback` was never initialized in this actor!?\n\n' | ||||||
|  |                         f'{_state._runtime_vars}\n' | ||||||
|  |                     ) from rte | ||||||
|  | 
 | ||||||
|  |                 raise | ||||||
|  | 
 | ||||||
|             if out: |             if out: | ||||||
|                 bg_task, repl = out |                 bg_task, repl = out | ||||||
|                 assert repl is repl |                 assert repl is repl | ||||||
|  | @ -2801,10 +2820,10 @@ def open_crash_handler( | ||||||
|       `trio.run()`. |       `trio.run()`. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|  |     err: BaseException | ||||||
|     try: |     try: | ||||||
|         yield |         yield | ||||||
|     except tuple(catch) as err: |     except tuple(catch) as err: | ||||||
| 
 |  | ||||||
|         if type(err) not in ignore: |         if type(err) not in ignore: | ||||||
|             pdbp.xpm() |             pdbp.xpm() | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -234,7 +234,7 @@ def find_caller_info( | ||||||
| _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} | _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: -[x] move all this into new `.devx._code`! | # TODO: -[x] move all this into new `.devx._frame_stack`! | ||||||
| # -[ ] consider rename to _callstack? | # -[ ] consider rename to _callstack? | ||||||
| # -[ ] prolly create a `@runtime_api` dec? | # -[ ] prolly create a `@runtime_api` dec? | ||||||
| #   |_ @api_frame seems better? | #   |_ @api_frame seems better? | ||||||
|  | @ -286,3 +286,18 @@ def api_frame( | ||||||
|     wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache |     wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache | ||||||
|     wrapped.__api_func__: bool = True |     wrapped.__api_func__: bool = True | ||||||
|     return wrapper(wrapped) |     return wrapper(wrapped) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: something like this instead of the adhoc frame-unhiding | ||||||
|  | # blocks all over the runtime!! XD | ||||||
|  | # -[ ] ideally we can expect a certain error (set) and if something | ||||||
|  | #     else is raised then all frames below the wrapped one will be | ||||||
|  | #     un-hidden via `__tracebackhide__: bool = False`. | ||||||
|  | # |_ might need to dynamically mutate the code objs like | ||||||
|  | #    `pdbp.hideframe()` does? | ||||||
|  | # -[ ] use this as a `@acm` decorator as introed in 3.10? | ||||||
|  | # @acm | ||||||
|  | # async def unhide_frame_when_not( | ||||||
|  | #     error_set: set[BaseException], | ||||||
|  | # ) -> TracebackType: | ||||||
|  | #     ... | ||||||
|  |  | ||||||
|  | @ -0,0 +1,134 @@ | ||||||
|  | # tractor: structured concurrent "actors". | ||||||
|  | # Copyright 2024-eternity Tyler Goodlet. | ||||||
|  | 
 | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | 
 | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Affero General Public License for more details. | ||||||
|  | 
 | ||||||
|  | # You should have received a copy of the GNU Affero General Public License | ||||||
|  | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | Daemon subactor as service(s) management and supervision primitives | ||||||
|  | and API. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | from __future__ import annotations | ||||||
|  | from contextlib import ( | ||||||
|  |     # asynccontextmanager as acm, | ||||||
|  |     contextmanager as cm, | ||||||
|  | ) | ||||||
|  | from collections import defaultdict | ||||||
|  | from typing import ( | ||||||
|  |     Callable, | ||||||
|  |     Any, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | import trio | ||||||
|  | from trio import TaskStatus | ||||||
|  | from tractor import ( | ||||||
|  |     ActorNursery, | ||||||
|  |     current_actor, | ||||||
|  |     ContextCancelled, | ||||||
|  |     Context, | ||||||
|  |     Portal, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | from ._util import ( | ||||||
|  |     log,  # sub-sys logger | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: implement a `@singleton` deco-API for wrapping the below | ||||||
|  | # factory's impl for general actor-singleton use? | ||||||
|  | # | ||||||
|  | # -[ ] go through the options peeps on SO did? | ||||||
|  | #  * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python | ||||||
|  | #  * including @mikenerone's answer | ||||||
|  | #   |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 | ||||||
|  | # | ||||||
|  | # -[ ] put it in `tractor.lowlevel._globals` ? | ||||||
|  | #  * fits with our oustanding actor-local/global feat req? | ||||||
|  | #   |_ https://github.com/goodboy/tractor/issues/55 | ||||||
|  | #  * how can it relate to the `Actor.lifetime_stack` that was | ||||||
|  | #    silently patched in? | ||||||
|  | #   |_ we could implicitly call both of these in the same | ||||||
|  | #     spot in the runtime using the lifetime stack? | ||||||
|  | #    - `open_singleton_cm().__exit__()` | ||||||
|  | #    -`del_singleton()` | ||||||
|  | #   |_ gives SC fixtue semantics to sync code oriented around | ||||||
|  | #     sub-process lifetime? | ||||||
|  | #  * what about with `trio.RunVar`? | ||||||
|  | #   |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar | ||||||
|  | #    - which we'll need for no-GIL cpython (right?) presuming | ||||||
|  | #      multiple `trio.run()` calls in process? | ||||||
|  | # | ||||||
|  | # | ||||||
|  | # @singleton | ||||||
|  | # async def open_service_mngr( | ||||||
|  | #     **init_kwargs, | ||||||
|  | # ) -> ServiceMngr: | ||||||
|  | #     ''' | ||||||
|  | #     Note this function body is invoke IFF no existing singleton instance already | ||||||
|  | #     exists in this proc's memory. | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     # setup | ||||||
|  | #     yield ServiceMngr(**init_kwargs) | ||||||
|  | #     # teardown | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # a deletion API for explicit instance de-allocation? | ||||||
|  | # @open_service_mngr.deleter | ||||||
|  | # def del_service_mngr() -> None: | ||||||
|  | #     mngr = open_service_mngr._singleton[0] | ||||||
|  | #     open_service_mngr._singleton[0] = None | ||||||
|  | #     del mngr | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: singleton factory API instead of a class API | ||||||
|  | @cm | ||||||
|  | def open_service_mngr( | ||||||
|  |     *, | ||||||
|  |     _singleton: list[ServiceMngr|None] = [None], | ||||||
|  |     # NOTE; since default values for keyword-args are effectively | ||||||
|  |     # module-vars/globals as per the note from, | ||||||
|  |     # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values | ||||||
|  |     # | ||||||
|  |     # > "The default value is evaluated only once. This makes | ||||||
|  |     #   a difference when the default is a mutable object such as | ||||||
|  |     #   a list, dictionary, or instances of most classes" | ||||||
|  |     # | ||||||
|  |     **init_kwargs, | ||||||
|  | 
 | ||||||
|  | ) -> ServiceMngr: | ||||||
|  |     ''' | ||||||
|  |     Open a multi-subactor-as-service-daemon tree supervisor. | ||||||
|  | 
 | ||||||
|  |     The delivered `ServiceMngr` is a singleton instance for each | ||||||
|  |     actor-process and is allocated on first open and never | ||||||
|  |     de-allocated unless explicitly deleted by al call to | ||||||
|  |     `del_service_mngr()`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     mngr: ServiceMngr|None | ||||||
|  |     if (mngr := _singleton[0]) is None: | ||||||
|  |         log.info('Allocating a new service mngr!') | ||||||
|  |         mngr = _singleton[0] = ServiceMngr(**init_kwargs) | ||||||
|  |     else: | ||||||
|  |         log.info( | ||||||
|  |             'Using extant service mngr!\n\n' | ||||||
|  |             f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     with mngr: | ||||||
|  |         yield mngr | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @ -54,11 +54,12 @@ LOG_FORMAT = ( | ||||||
| DATE_FORMAT = '%b %d %H:%M:%S' | DATE_FORMAT = '%b %d %H:%M:%S' | ||||||
| 
 | 
 | ||||||
| # FYI, ERROR is 40 | # FYI, ERROR is 40 | ||||||
|  | # TODO: use a `bidict` to avoid the :155 check? | ||||||
| CUSTOM_LEVELS: dict[str, int] = { | CUSTOM_LEVELS: dict[str, int] = { | ||||||
|     'TRANSPORT': 5, |     'TRANSPORT': 5, | ||||||
|     'RUNTIME': 15, |     'RUNTIME': 15, | ||||||
|     'DEVX': 17, |     'DEVX': 17, | ||||||
|     'CANCEL': 18, |     'CANCEL': 22, | ||||||
|     'PDB': 500, |     'PDB': 500, | ||||||
| } | } | ||||||
| STD_PALETTE = { | STD_PALETTE = { | ||||||
|  | @ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter): | ||||||
|         Delegate a log call to the underlying logger, after adding |         Delegate a log call to the underlying logger, after adding | ||||||
|         contextual information from this adapter instance. |         contextual information from this adapter instance. | ||||||
| 
 | 
 | ||||||
|  |         NOTE: all custom level methods (above) delegate to this! | ||||||
|  | 
 | ||||||
|         ''' |         ''' | ||||||
|         if self.isEnabledFor(level): |         if self.isEnabledFor(level): | ||||||
|             stacklevel: int = 3 |             stacklevel: int = 3 | ||||||
|  |  | ||||||
|  | @ -41,8 +41,10 @@ import textwrap | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     Callable, |     Callable, | ||||||
|  |     Protocol, | ||||||
|     Type, |     Type, | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
|  |     TypeVar, | ||||||
|     Union, |     Union, | ||||||
| ) | ) | ||||||
| from types import ModuleType | from types import ModuleType | ||||||
|  | @ -181,7 +183,11 @@ def mk_dec( | ||||||
|     dec_hook: Callable|None = None, |     dec_hook: Callable|None = None, | ||||||
| 
 | 
 | ||||||
| ) -> MsgDec: | ) -> MsgDec: | ||||||
|  |     ''' | ||||||
|  |     Create an IPC msg decoder, normally used as the | ||||||
|  |     `PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`. | ||||||
| 
 | 
 | ||||||
|  |     ''' | ||||||
|     return MsgDec( |     return MsgDec( | ||||||
|         _dec=msgpack.Decoder( |         _dec=msgpack.Decoder( | ||||||
|             type=spec,  # like `MsgType[Any]` |             type=spec,  # like `MsgType[Any]` | ||||||
|  | @ -227,6 +233,13 @@ def pformat_msgspec( | ||||||
|     join_char: str = '\n', |     join_char: str = '\n', | ||||||
| 
 | 
 | ||||||
| ) -> str: | ) -> str: | ||||||
|  |     ''' | ||||||
|  |     Pretty `str` format the `msgspec.msgpack.Decoder.type` attributed | ||||||
|  |     for display in log messages as a nice (maybe multiline) | ||||||
|  |     presentation of all the supported `Struct`s availed for typed | ||||||
|  |     decoding. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|     dec: msgpack.Decoder = getattr(codec, 'dec', codec) |     dec: msgpack.Decoder = getattr(codec, 'dec', codec) | ||||||
|     return join_char.join( |     return join_char.join( | ||||||
|         mk_msgspec_table( |         mk_msgspec_table( | ||||||
|  | @ -630,31 +643,57 @@ def limit_msg_spec( | ||||||
| #         # import pdbp; pdbp.set_trace() | #         # import pdbp; pdbp.set_trace() | ||||||
| #         assert ext_codec.pld_spec == extended_spec | #         assert ext_codec.pld_spec == extended_spec | ||||||
| #         yield ext_codec | #         yield ext_codec | ||||||
|  | # | ||||||
|  | # ^-TODO-^ is it impossible to make something like this orr!? | ||||||
|  | 
 | ||||||
|  | # TODO: make an auto-custom hook generator from a set of input custom | ||||||
|  | # types? | ||||||
|  | # -[ ] below is a proto design using a `TypeCodec` idea? | ||||||
|  | # | ||||||
|  | # type var for the expected interchange-lib's | ||||||
|  | # IPC-transport type when not available as a built-in | ||||||
|  | # serialization output. | ||||||
|  | WireT = TypeVar('WireT') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: make something similar to this inside `._codec` such that | # TODO: some kinda (decorator) API for built-in subtypes | ||||||
| # user can just pass a type table of some sort? | # that builds this implicitly by inspecting the `mro()`? | ||||||
| # -[ ] we would need to decode all msgs to `pretty_struct.Struct` | class TypeCodec(Protocol): | ||||||
| #     and then call `.to_dict()` on them? |     ''' | ||||||
| # -[x] we're going to need to re-impl all the stuff changed in the |     A per-custom-type wire-transport serialization translator | ||||||
| #    runtime port such that it can handle dicts or `Msg`s? |     description type. | ||||||
| # | 
 | ||||||
| # def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: |     ''' | ||||||
| #     ''' |     src_type: Type | ||||||
| #     Deliver a `enc_hook()`/`dec_hook()` pair which does |     wire_type: WireT | ||||||
| #     manual convertion from our above native `Msg` set | 
 | ||||||
| #     to `dict` equivalent (wire msgs) in order to keep legacy compat |     def encode(obj: Type) -> WireT: | ||||||
| #     with the original runtime implementation. |         ... | ||||||
| # | 
 | ||||||
| #     Note: this is is/was primarly used while moving the core |     def decode( | ||||||
| #     runtime over to using native `Msg`-struct types wherein we |         obj_type: Type[WireT], | ||||||
| #     start with the send side emitting without loading |         obj: WireT, | ||||||
| #     a typed-decoder and then later flipping the switch over to |     ) -> Type: | ||||||
| #     load to the native struct types once all runtime usage has |         ... | ||||||
| #     been adjusted appropriately. | 
 | ||||||
| # | 
 | ||||||
| #     ''' | class MsgpackTypeCodec(TypeCodec): | ||||||
| #     return ( |     ... | ||||||
| #         # enc_to_dict, | 
 | ||||||
| #         dec_from_dict, | 
 | ||||||
| #     ) | def mk_codec_hooks( | ||||||
|  |     type_codecs: list[TypeCodec], | ||||||
|  | 
 | ||||||
|  | ) -> tuple[Callable, Callable]: | ||||||
|  |     ''' | ||||||
|  |     Deliver a `enc_hook()`/`dec_hook()` pair which handle | ||||||
|  |     manual convertion from an input `Type` set such that whenever | ||||||
|  |     the `TypeCodec.filter()` predicate matches the | ||||||
|  |     `TypeCodec.decode()` is called on the input native object by | ||||||
|  |     the `dec_hook()` and whenever the | ||||||
|  |     `isiinstance(obj, TypeCodec.type)` matches against an | ||||||
|  |     `enc_hook(obj=obj)` the return value is taken from a | ||||||
|  |     `TypeCodec.encode(obj)` callback. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     ... | ||||||
|  |  | ||||||
|  | @ -580,12 +580,15 @@ async def drain_to_final_msg( | ||||||
|         # 2. WE DID NOT REQUEST that cancel and thus |         # 2. WE DID NOT REQUEST that cancel and thus | ||||||
|         #    SHOULD RAISE HERE! |         #    SHOULD RAISE HERE! | ||||||
|         except trio.Cancelled as taskc: |         except trio.Cancelled as taskc: | ||||||
|  |             # from tractor.devx._debug import pause | ||||||
|  |             # await pause(shield=True) | ||||||
| 
 | 
 | ||||||
|             # CASE 2: mask the local cancelled-error(s) |             # CASE 2: mask the local cancelled-error(s) | ||||||
|             # only when we are sure the remote error is |             # only when we are sure the remote error is | ||||||
|             # the source cause of this local task's |             # the source cause of this local task's | ||||||
|             # cancellation. |             # cancellation. | ||||||
|             ctx.maybe_raise( |             ctx.maybe_raise( | ||||||
|  |                 hide_tb=hide_tb, | ||||||
|                 # TODO: when use this/ |                 # TODO: when use this/ | ||||||
|                 # from_src_exc=taskc, |                 # from_src_exc=taskc, | ||||||
|             ) |             ) | ||||||
|  |  | ||||||
|  | @ -34,6 +34,9 @@ from pprint import ( | ||||||
|     saferepr, |     saferepr, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | from tractor.log import get_logger | ||||||
|  | 
 | ||||||
|  | log = get_logger() | ||||||
| # TODO: auto-gen type sig for input func both for | # TODO: auto-gen type sig for input func both for | ||||||
| # type-msgs and logging of RPC tasks? | # type-msgs and logging of RPC tasks? | ||||||
| # taken and modified from: | # taken and modified from: | ||||||
|  | @ -143,7 +146,13 @@ def pformat( | ||||||
| 
 | 
 | ||||||
|         else:  # the `pprint` recursion-safe format: |         else:  # the `pprint` recursion-safe format: | ||||||
|             # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr |             # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr | ||||||
|  |             try: | ||||||
|                 val_str: str = saferepr(v) |                 val_str: str = saferepr(v) | ||||||
|  |             except Exception: | ||||||
|  |                 log.exception( | ||||||
|  |                     'Failed to `saferepr({type(struct)})` !?\n' | ||||||
|  |                 ) | ||||||
|  |             return _Struct.__repr__(struct) | ||||||
| 
 | 
 | ||||||
|         # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! |         # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! | ||||||
|         obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') |         obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') | ||||||
|  | @ -194,12 +203,20 @@ class Struct( | ||||||
|         return sin_props |         return sin_props | ||||||
| 
 | 
 | ||||||
|     pformat = pformat |     pformat = pformat | ||||||
|  |     # __repr__ = pformat | ||||||
|     # __str__ = __repr__ = pformat |     # __str__ = __repr__ = pformat | ||||||
|     # TODO: use a pprint.PrettyPrinter instance around ONLY rendering |     # TODO: use a pprint.PrettyPrinter instance around ONLY rendering | ||||||
|     # inside a known tty? |     # inside a known tty? | ||||||
|     # def __repr__(self) -> str: |     # def __repr__(self) -> str: | ||||||
|     #     ... |     #     ... | ||||||
|     __repr__ = pformat |     def __repr__(self) -> str: | ||||||
|  |         try: | ||||||
|  |             return pformat(self) | ||||||
|  |         except Exception: | ||||||
|  |             log.exception( | ||||||
|  |                 f'Failed to `pformat({type(self)})` !?\n' | ||||||
|  |             ) | ||||||
|  |             return _Struct.__repr__(self) | ||||||
| 
 | 
 | ||||||
|     def copy( |     def copy( | ||||||
|         self, |         self, | ||||||
|  |  | ||||||
|  | @ -18,11 +18,13 @@ | ||||||
| Infection apis for ``asyncio`` loops running ``trio`` using guest mode. | Infection apis for ``asyncio`` loops running ``trio`` using guest mode. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
|  | from __future__ import annotations | ||||||
| import asyncio | import asyncio | ||||||
| from asyncio.exceptions import CancelledError | from asyncio.exceptions import CancelledError | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
| from dataclasses import dataclass | from dataclasses import dataclass | ||||||
| import inspect | import inspect | ||||||
|  | import traceback | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     Callable, |     Callable, | ||||||
|  | @ -30,20 +32,21 @@ from typing import ( | ||||||
|     Awaitable, |     Awaitable, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| import trio | import tractor | ||||||
| from outcome import Error |  | ||||||
| 
 |  | ||||||
| from tractor.log import get_logger |  | ||||||
| from tractor._state import ( | from tractor._state import ( | ||||||
|     current_actor, |  | ||||||
|     debug_mode, |     debug_mode, | ||||||
| ) | ) | ||||||
|  | from tractor.log import get_logger | ||||||
| from tractor.devx import _debug | from tractor.devx import _debug | ||||||
| from tractor._exceptions import AsyncioCancelled |  | ||||||
| from tractor.trionics._broadcast import ( | from tractor.trionics._broadcast import ( | ||||||
|     broadcast_receiver, |     broadcast_receiver, | ||||||
|     BroadcastReceiver, |     BroadcastReceiver, | ||||||
| ) | ) | ||||||
|  | import trio | ||||||
|  | from outcome import ( | ||||||
|  |     Error, | ||||||
|  |     Outcome, | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
|  | @ -161,7 +164,7 @@ def _run_asyncio_task( | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     __tracebackhide__ = True |     __tracebackhide__ = True | ||||||
|     if not current_actor().is_infected_aio(): |     if not tractor.current_actor().is_infected_aio(): | ||||||
|         raise RuntimeError( |         raise RuntimeError( | ||||||
|             "`infect_asyncio` mode is not enabled!?" |             "`infect_asyncio` mode is not enabled!?" | ||||||
|         ) |         ) | ||||||
|  | @ -172,7 +175,6 @@ def _run_asyncio_task( | ||||||
|     to_trio, from_aio = trio.open_memory_channel(qsize)  # type: ignore |     to_trio, from_aio = trio.open_memory_channel(qsize)  # type: ignore | ||||||
| 
 | 
 | ||||||
|     args = tuple(inspect.getfullargspec(func).args) |     args = tuple(inspect.getfullargspec(func).args) | ||||||
| 
 |  | ||||||
|     if getattr(func, '_tractor_steam_function', None): |     if getattr(func, '_tractor_steam_function', None): | ||||||
|         # the assumption is that the target async routine accepts the |         # the assumption is that the target async routine accepts the | ||||||
|         # send channel then it intends to yield more then one return |         # send channel then it intends to yield more then one return | ||||||
|  | @ -346,13 +348,22 @@ def _run_asyncio_task( | ||||||
|             # on a checkpoint. |             # on a checkpoint. | ||||||
|             cancel_scope.cancel() |             cancel_scope.cancel() | ||||||
| 
 | 
 | ||||||
|             # raise any ``asyncio`` side error. |             # raise any `asyncio` side error. | ||||||
|             raise aio_err |             raise aio_err | ||||||
| 
 | 
 | ||||||
|     task.add_done_callback(cancel_trio) |     task.add_done_callback(cancel_trio) | ||||||
|     return chan |     return chan | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | class AsyncioCancelled(CancelledError): | ||||||
|  |     ''' | ||||||
|  |     Asyncio cancelled translation (non-base) error | ||||||
|  |     for use with the ``to_asyncio`` module | ||||||
|  |     to be raised in the ``trio`` side task | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| @acm | @acm | ||||||
| async def translate_aio_errors( | async def translate_aio_errors( | ||||||
| 
 | 
 | ||||||
|  | @ -516,7 +527,6 @@ async def open_channel_from( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def run_as_asyncio_guest( | def run_as_asyncio_guest( | ||||||
| 
 |  | ||||||
|     trio_main: Callable, |     trio_main: Callable, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|  | @ -548,6 +558,11 @@ def run_as_asyncio_guest( | ||||||
| 
 | 
 | ||||||
|         loop = asyncio.get_running_loop() |         loop = asyncio.get_running_loop() | ||||||
|         trio_done_fut = asyncio.Future() |         trio_done_fut = asyncio.Future() | ||||||
|  |         startup_msg: str = ( | ||||||
|  |             'Starting `asyncio` guest-loop-run\n' | ||||||
|  |             '-> got running loop\n' | ||||||
|  |             '-> built a `trio`-done future\n' | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         if debug_mode(): |         if debug_mode(): | ||||||
|             # XXX make it obvi we know this isn't supported yet! |             # XXX make it obvi we know this isn't supported yet! | ||||||
|  | @ -562,34 +577,120 @@ def run_as_asyncio_guest( | ||||||
|         def trio_done_callback(main_outcome): |         def trio_done_callback(main_outcome): | ||||||
| 
 | 
 | ||||||
|             if isinstance(main_outcome, Error): |             if isinstance(main_outcome, Error): | ||||||
|                 error = main_outcome.error |                 error: BaseException = main_outcome.error | ||||||
|  | 
 | ||||||
|  |                 # show an dedicated `asyncio`-side tb from the error | ||||||
|  |                 tb_str: str = ''.join(traceback.format_exception(error)) | ||||||
|  |                 log.exception( | ||||||
|  |                     'Guest-run errored!?\n\n' | ||||||
|  |                     f'{main_outcome}\n' | ||||||
|  |                     f'{error}\n\n' | ||||||
|  |                     f'{tb_str}\n' | ||||||
|  |                 ) | ||||||
|                 trio_done_fut.set_exception(error) |                 trio_done_fut.set_exception(error) | ||||||
| 
 | 
 | ||||||
|                 # TODO: explicit asyncio tb? |                 # raise inline | ||||||
|                 # traceback.print_exception(error) |  | ||||||
| 
 |  | ||||||
|                 # XXX: do we need this? |  | ||||||
|                 # actor.cancel_soon() |  | ||||||
| 
 |  | ||||||
|                 main_outcome.unwrap() |                 main_outcome.unwrap() | ||||||
|  | 
 | ||||||
|             else: |             else: | ||||||
|                 trio_done_fut.set_result(main_outcome) |                 trio_done_fut.set_result(main_outcome) | ||||||
|                 log.runtime(f"trio_main finished: {main_outcome!r}") |                 log.runtime(f'trio_main finished: {main_outcome!r}') | ||||||
|  | 
 | ||||||
|  |         startup_msg += ( | ||||||
|  |             f'-> created {trio_done_callback!r}\n' | ||||||
|  |             f'-> scheduling `trio_main`: {trio_main!r}\n' | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         # start the infection: run trio on the asyncio loop in "guest mode" |         # start the infection: run trio on the asyncio loop in "guest mode" | ||||||
|         log.runtime( |         log.runtime( | ||||||
|             'Infecting `asyncio`-process with a `trio` guest-run of\n\n' |             f'{startup_msg}\n\n' | ||||||
|             f'{trio_main!r}\n\n' |             + | ||||||
| 
 |             'Infecting `asyncio`-process with a `trio` guest-run!\n' | ||||||
|             f'{trio_done_callback}\n' |  | ||||||
|         ) |         ) | ||||||
|  | 
 | ||||||
|         trio.lowlevel.start_guest_run( |         trio.lowlevel.start_guest_run( | ||||||
|             trio_main, |             trio_main, | ||||||
|             run_sync_soon_threadsafe=loop.call_soon_threadsafe, |             run_sync_soon_threadsafe=loop.call_soon_threadsafe, | ||||||
|             done_callback=trio_done_callback, |             done_callback=trio_done_callback, | ||||||
|         ) |         ) | ||||||
|         # NOTE `.unwrap()` will raise on error |         try: | ||||||
|         return (await trio_done_fut).unwrap() |             # TODO: better SIGINT handling since shielding seems to | ||||||
|  |             # make NO DIFFERENCE XD | ||||||
|  |             # -[ ] maybe this is due to 3.11's recent SIGINT handling | ||||||
|  |             #  changes and we can better work with/around it? | ||||||
|  |             # https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption | ||||||
|  |             out: Outcome = await asyncio.shield(trio_done_fut) | ||||||
|  |             # NOTE `Error.unwrap()` will raise | ||||||
|  |             return out.unwrap() | ||||||
|  | 
 | ||||||
|  |         except asyncio.CancelledError: | ||||||
|  |             actor: tractor.Actor = tractor.current_actor() | ||||||
|  |             log.exception( | ||||||
|  |                 '`asyncio`-side main task was cancelled!\n' | ||||||
|  |                 'Cancelling actor-runtime..\n' | ||||||
|  |                 f'c)>\n' | ||||||
|  |                 f'  |_{actor}.cancel_soon()\n' | ||||||
|  | 
 | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             # XXX NOTE XXX the next LOC is super important!!! | ||||||
|  |             #  => without it, we can get a guest-run abandonment case | ||||||
|  |             #  where asyncio will not trigger `trio` in a final event | ||||||
|  |             #  loop cycle! | ||||||
|  |             # | ||||||
|  |             #  our test, | ||||||
|  |             #  `test_infected_asyncio.test_sigint_closes_lifetime_stack()` | ||||||
|  |             #  demonstrates how if when we raise a SIGINT-signal in an infected | ||||||
|  |             #  child we get a variable race condition outcome where | ||||||
|  |             #  either of the following can indeterminately happen, | ||||||
|  |             # | ||||||
|  |             # - "silent-abandon": `asyncio` abandons the `trio` | ||||||
|  |             #   guest-run task silently and no `trio`-guest-run or | ||||||
|  |             #   `tractor`-actor-runtime teardown happens whatsoever.. | ||||||
|  |             #   this is the WORST (race) case outcome. | ||||||
|  |             # | ||||||
|  |             # - OR, "loud-abandon": the guest run get's abaondoned "loudly" with | ||||||
|  |             #   `trio` reporting a console traceback and further tbs of all | ||||||
|  |             #   the failed shutdown routines also show on console.. | ||||||
|  |             # | ||||||
|  |             # our test can thus fail and (has been parametrized for) | ||||||
|  |             # the 2 cases: | ||||||
|  |             # | ||||||
|  |             # - when the parent raises a KBI just after | ||||||
|  |             #   signalling the child, | ||||||
|  |             #  |_silent-abandon => the `Actor.lifetime_stack` will | ||||||
|  |             #    never be closed thus leaking a resource! | ||||||
|  |             #   -> FAIL! | ||||||
|  |             #  |_loud-abandon => despite the abandonment at least the | ||||||
|  |             #    stack will be closed out.. | ||||||
|  |             #   -> PASS | ||||||
|  |             # | ||||||
|  |             # - when the parent instead simply waits on `ctx.wait_for_result()` | ||||||
|  |             #   (i.e. DOES not raise a KBI itself), | ||||||
|  |             #  |_silent-abandon => test will just hang and thus the ctx | ||||||
|  |             #    and actor will never be closed/cancelled/shutdown | ||||||
|  |             #    resulting in leaking a (file) resource since the | ||||||
|  |             #    `trio`/`tractor` runtime never relays a ctxc back to | ||||||
|  |             #    the parent; the test's timeout will trigger.. | ||||||
|  |             #   -> FAIL! | ||||||
|  |             #  |_loud-abandon => this case seems to never happen?? | ||||||
|  |             # | ||||||
|  |             # XXX FIRST PART XXX, SO, this is a fix to the | ||||||
|  |             # "silent-abandon" case, NOT the `trio`-guest-run | ||||||
|  |             # abandonment issue in general, for which the NEXT LOC | ||||||
|  |             # is apparently a working fix! | ||||||
|  |             actor.cancel_soon() | ||||||
|  | 
 | ||||||
|  |             # XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to | ||||||
|  |             # `trio`-guest-run to complete and teardown !! | ||||||
|  |             # | ||||||
|  |             # XXX WITHOUT THIS the guest-run gets race-conditionally | ||||||
|  |             # abandoned by `asyncio`!! | ||||||
|  |             # XD XD XD | ||||||
|  |             await asyncio.shield( | ||||||
|  |                 asyncio.sleep(.1)  # NOPE! it can't be 0 either XD | ||||||
|  |             ) | ||||||
|  |             raise | ||||||
| 
 | 
 | ||||||
|     # might as well if it's installed. |     # might as well if it's installed. | ||||||
|     try: |     try: | ||||||
|  | @ -599,4 +700,6 @@ def run_as_asyncio_guest( | ||||||
|     except ImportError: |     except ImportError: | ||||||
|         pass |         pass | ||||||
| 
 | 
 | ||||||
|     return asyncio.run(aio_main(trio_main)) |     return asyncio.run( | ||||||
|  |         aio_main(trio_main), | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  | @ -156,11 +156,12 @@ class BroadcastState(Struct): | ||||||
| 
 | 
 | ||||||
| class BroadcastReceiver(ReceiveChannel): | class BroadcastReceiver(ReceiveChannel): | ||||||
|     ''' |     ''' | ||||||
|     A memory receive channel broadcaster which is non-lossy for the |     A memory receive channel broadcaster which is non-lossy for | ||||||
|     fastest consumer. |     the fastest consumer. | ||||||
| 
 | 
 | ||||||
|     Additional consumer tasks can receive all produced values by registering |     Additional consumer tasks can receive all produced values by | ||||||
|     with ``.subscribe()`` and receiving from the new instance it delivers. |     registering with ``.subscribe()`` and receiving from the new | ||||||
|  |     instance it delivers. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     def __init__( |     def __init__( | ||||||
|  |  | ||||||
|  | @ -18,8 +18,12 @@ | ||||||
| Async context manager primitives with hard ``trio``-aware semantics | Async context manager primitives with hard ``trio``-aware semantics | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from contextlib import asynccontextmanager as acm | from __future__ import annotations | ||||||
|  | from contextlib import ( | ||||||
|  |     asynccontextmanager as acm, | ||||||
|  | ) | ||||||
| import inspect | import inspect | ||||||
|  | from types import ModuleType | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     AsyncContextManager, |     AsyncContextManager, | ||||||
|  | @ -30,13 +34,16 @@ from typing import ( | ||||||
|     Optional, |     Optional, | ||||||
|     Sequence, |     Sequence, | ||||||
|     TypeVar, |     TypeVar, | ||||||
|  |     TYPE_CHECKING, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| import trio | import trio | ||||||
| 
 |  | ||||||
| from tractor._state import current_actor | from tractor._state import current_actor | ||||||
| from tractor.log import get_logger | from tractor.log import get_logger | ||||||
| 
 | 
 | ||||||
|  | if TYPE_CHECKING: | ||||||
|  |     from tractor import ActorNursery | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
|  | @ -46,8 +53,10 @@ T = TypeVar("T") | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def maybe_open_nursery( | async def maybe_open_nursery( | ||||||
|     nursery: trio.Nursery | None = None, |     nursery: trio.Nursery|ActorNursery|None = None, | ||||||
|     shield: bool = False, |     shield: bool = False, | ||||||
|  |     lib: ModuleType = trio, | ||||||
|  | 
 | ||||||
| ) -> AsyncGenerator[trio.Nursery, Any]: | ) -> AsyncGenerator[trio.Nursery, Any]: | ||||||
|     ''' |     ''' | ||||||
|     Create a new nursery if None provided. |     Create a new nursery if None provided. | ||||||
|  | @ -58,13 +67,12 @@ async def maybe_open_nursery( | ||||||
|     if nursery is not None: |     if nursery is not None: | ||||||
|         yield nursery |         yield nursery | ||||||
|     else: |     else: | ||||||
|         async with trio.open_nursery() as nursery: |         async with lib.open_nursery() as nursery: | ||||||
|             nursery.cancel_scope.shield = shield |             nursery.cancel_scope.shield = shield | ||||||
|             yield nursery |             yield nursery | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def _enter_and_wait( | async def _enter_and_wait( | ||||||
| 
 |  | ||||||
|     mngr: AsyncContextManager[T], |     mngr: AsyncContextManager[T], | ||||||
|     unwrapped: dict[int, T], |     unwrapped: dict[int, T], | ||||||
|     all_entered: trio.Event, |     all_entered: trio.Event, | ||||||
|  | @ -91,7 +99,6 @@ async def _enter_and_wait( | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def gather_contexts( | async def gather_contexts( | ||||||
| 
 |  | ||||||
|     mngrs: Sequence[AsyncContextManager[T]], |     mngrs: Sequence[AsyncContextManager[T]], | ||||||
| 
 | 
 | ||||||
| ) -> AsyncGenerator[ | ) -> AsyncGenerator[ | ||||||
|  | @ -102,15 +109,17 @@ async def gather_contexts( | ||||||
|     None, |     None, | ||||||
| ]: | ]: | ||||||
|     ''' |     ''' | ||||||
|     Concurrently enter a sequence of async context managers, each in |     Concurrently enter a sequence of async context managers (acms), | ||||||
|     a separate ``trio`` task and deliver the unwrapped values in the |     each from a separate `trio` task and deliver the unwrapped | ||||||
|     same order once all managers have entered. On exit all contexts are |     `yield`-ed values in the same order once all managers have entered. | ||||||
|     subsequently and concurrently exited. |  | ||||||
| 
 | 
 | ||||||
|     This function is somewhat similar to common usage of |     On exit, all acms are subsequently and concurrently exited. | ||||||
|     ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in | 
 | ||||||
|     combo with ``asyncio.gather()`` except the managers are concurrently |     This function is somewhat similar to a batch of non-blocking | ||||||
|     entered and exited, and cancellation just works. |     calls to `contextlib.AsyncExitStack.enter_async_context()` | ||||||
|  |     (inside a loop) *in combo with* a `asyncio.gather()` to get the | ||||||
|  |     `.__aenter__()`-ed values, except the managers are both | ||||||
|  |     concurrently entered and exited and *cancellation just works*(R). | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     seed: int = id(mngrs) |     seed: int = id(mngrs) | ||||||
|  | @ -210,9 +219,10 @@ async def maybe_open_context( | ||||||
| 
 | 
 | ||||||
| ) -> AsyncIterator[tuple[bool, T]]: | ) -> AsyncIterator[tuple[bool, T]]: | ||||||
|     ''' |     ''' | ||||||
|     Maybe open a context manager if there is not already a _Cached |     Maybe open an async-context-manager (acm) if there is not already | ||||||
|     version for the provided ``key`` for *this* actor. Return the |     a `_Cached` version for the provided (input) `key` for *this* actor. | ||||||
|     _Cached instance on a _Cache hit. | 
 | ||||||
|  |     Return the `_Cached` instance on a _Cache hit. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     fid = id(acm_func) |     fid = id(acm_func) | ||||||
|  | @ -273,8 +283,13 @@ async def maybe_open_context( | ||||||
|     else: |     else: | ||||||
|         _Cache.users += 1 |         _Cache.users += 1 | ||||||
|         log.runtime( |         log.runtime( | ||||||
|             f'Reusing resource for `_Cache` user {_Cache.users}\n\n' |             f'Re-using cached resource for user {_Cache.users}\n\n' | ||||||
|             f'{ctx_key!r} -> {yielded!r}\n' |             f'{ctx_key!r} -> {type(yielded)}\n' | ||||||
|  | 
 | ||||||
|  |             # TODO: make this work with values but without | ||||||
|  |             # `msgspec.Struct` causing frickin crashes on field-type | ||||||
|  |             # lookups.. | ||||||
|  |             # f'{ctx_key!r} -> {yielded!r}\n' | ||||||
|         ) |         ) | ||||||
|         lock.release() |         lock.release() | ||||||
|         yield True, yielded |         yield True, yielded | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue