Compare commits
	
		
			1 Commits 
		
	
	
		
			284fa0340e
			...
			5ed30dec40
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 5ed30dec40 | 
|  | @ -1,7 +1,6 @@ | ||||||
| """ | """ | ||||||
| ``tractor`` testing!! | ``tractor`` testing!! | ||||||
| """ | """ | ||||||
| from functools import partial |  | ||||||
| import sys | import sys | ||||||
| import subprocess | import subprocess | ||||||
| import os | import os | ||||||
|  | @ -9,9 +8,6 @@ import random | ||||||
| import signal | import signal | ||||||
| import platform | import platform | ||||||
| import time | import time | ||||||
| from typing import ( |  | ||||||
|     AsyncContextManager, |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| import pytest | import pytest | ||||||
| import tractor | import tractor | ||||||
|  | @ -154,18 +150,6 @@ def pytest_generate_tests(metafunc): | ||||||
|         metafunc.parametrize("start_method", [spawn_backend], scope='module') |         metafunc.parametrize("start_method", [spawn_backend], scope='module') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: a way to let test scripts (like from `examples/`) |  | ||||||
| # guarantee they won't registry addr collide! |  | ||||||
| @pytest.fixture |  | ||||||
| def open_test_runtime( |  | ||||||
|     reg_addr: tuple, |  | ||||||
| ) -> AsyncContextManager: |  | ||||||
|     return partial( |  | ||||||
|         tractor.open_nursery, |  | ||||||
|         registry_addrs=[reg_addr], |  | ||||||
|     ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def sig_prog(proc, sig): | def sig_prog(proc, sig): | ||||||
|     "Kill the actor-process with ``sig``." |     "Kill the actor-process with ``sig``." | ||||||
|     proc.send_signal(sig) |     proc.send_signal(sig) | ||||||
|  |  | ||||||
|  | @ -41,7 +41,7 @@ from tractor.msg import ( | ||||||
| from tractor.msg.types import ( | from tractor.msg.types import ( | ||||||
|     _payload_msgs, |     _payload_msgs, | ||||||
|     log, |     log, | ||||||
|     PayloadMsg, |     Msg, | ||||||
|     Started, |     Started, | ||||||
|     mk_msg_spec, |     mk_msg_spec, | ||||||
| ) | ) | ||||||
|  | @ -61,7 +61,7 @@ def mk_custom_codec( | ||||||
|     uid: tuple[str, str] = tractor.current_actor().uid |     uid: tuple[str, str] = tractor.current_actor().uid | ||||||
| 
 | 
 | ||||||
|     # XXX NOTE XXX: despite defining `NamespacePath` as a type |     # XXX NOTE XXX: despite defining `NamespacePath` as a type | ||||||
|     # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair |     # field on our `Msg.pld`, we still need a enc/dec_hook() pair | ||||||
|     # to cast to/from that type on the wire. See the docs: |     # to cast to/from that type on the wire. See the docs: | ||||||
|     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types |     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||||
| 
 | 
 | ||||||
|  | @ -321,12 +321,12 @@ def dec_type_union( | ||||||
|     import importlib |     import importlib | ||||||
|     types: list[Type] = [] |     types: list[Type] = [] | ||||||
|     for type_name in type_names: |     for type_name in type_names: | ||||||
|         for mod in [ |         for ns in [ | ||||||
|             typing, |             typing, | ||||||
|             importlib.import_module(__name__), |             importlib.import_module(__name__), | ||||||
|         ]: |         ]: | ||||||
|             if type_ref := getattr( |             if type_ref := getattr( | ||||||
|                 mod, |                 ns, | ||||||
|                 type_name, |                 type_name, | ||||||
|                 False, |                 False, | ||||||
|             ): |             ): | ||||||
|  | @ -744,7 +744,7 @@ def chk_pld_type( | ||||||
|     # 'Error',  .pld: ErrorData |     # 'Error',  .pld: ErrorData | ||||||
| 
 | 
 | ||||||
|     codec: MsgCodec = mk_codec( |     codec: MsgCodec = mk_codec( | ||||||
|         # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified |         # NOTE: this ONLY accepts `Msg.pld` fields of a specified | ||||||
|         # type union. |         # type union. | ||||||
|         ipc_pld_spec=payload_spec, |         ipc_pld_spec=payload_spec, | ||||||
|     ) |     ) | ||||||
|  | @ -752,7 +752,7 @@ def chk_pld_type( | ||||||
|     # make a one-off dec to compare with our `MsgCodec` instance |     # make a one-off dec to compare with our `MsgCodec` instance | ||||||
|     # which does the below `mk_msg_spec()` call internally |     # which does the below `mk_msg_spec()` call internally | ||||||
|     ipc_msg_spec: Union[Type[Struct]] |     ipc_msg_spec: Union[Type[Struct]] | ||||||
|     msg_types: list[PayloadMsg[payload_spec]] |     msg_types: list[Msg[payload_spec]] | ||||||
|     ( |     ( | ||||||
|         ipc_msg_spec, |         ipc_msg_spec, | ||||||
|         msg_types, |         msg_types, | ||||||
|  | @ -761,7 +761,7 @@ def chk_pld_type( | ||||||
|     ) |     ) | ||||||
|     _enc = msgpack.Encoder() |     _enc = msgpack.Encoder() | ||||||
|     _dec = msgpack.Decoder( |     _dec = msgpack.Decoder( | ||||||
|         type=ipc_msg_spec or Any,  # like `PayloadMsg[Any]` |         type=ipc_msg_spec or Any,  # like `Msg[Any]` | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     assert ( |     assert ( | ||||||
|  | @ -806,7 +806,7 @@ def chk_pld_type( | ||||||
|             'cid': '666', |             'cid': '666', | ||||||
|             'pld': pld, |             'pld': pld, | ||||||
|         } |         } | ||||||
|         enc_msg: PayloadMsg = typedef(**kwargs) |         enc_msg: Msg = typedef(**kwargs) | ||||||
| 
 | 
 | ||||||
|         _wire_bytes: bytes = _enc.encode(enc_msg) |         _wire_bytes: bytes = _enc.encode(enc_msg) | ||||||
|         wire_bytes: bytes = codec.enc.encode(enc_msg) |         wire_bytes: bytes = codec.enc.encode(enc_msg) | ||||||
|  | @ -883,16 +883,25 @@ def test_limit_msgspec(): | ||||||
|             debug_mode=True |             debug_mode=True | ||||||
|         ): |         ): | ||||||
| 
 | 
 | ||||||
|             # ensure we can round-trip a boxing `PayloadMsg` |             # ensure we can round-trip a boxing `Msg` | ||||||
|             assert chk_pld_type( |             assert chk_pld_type( | ||||||
|                 payload_spec=Any, |                 # Msg, | ||||||
|                 pld=None, |                 Any, | ||||||
|  |                 None, | ||||||
|                 expect_roundtrip=True, |                 expect_roundtrip=True, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  |             # TODO: don't need this any more right since | ||||||
|  |             # `msgspec>=0.15` has the nice generics stuff yah?? | ||||||
|  |             # | ||||||
|  |             # manually override the type annot of the payload | ||||||
|  |             # field and ensure it propagates to all msg-subtypes. | ||||||
|  |             # Msg.__annotations__['pld'] = Any | ||||||
|  | 
 | ||||||
|             # verify that a mis-typed payload value won't decode |             # verify that a mis-typed payload value won't decode | ||||||
|             assert not chk_pld_type( |             assert not chk_pld_type( | ||||||
|                 payload_spec=int, |                 # Msg, | ||||||
|  |                 int, | ||||||
|                 pld='doggy', |                 pld='doggy', | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  | @ -904,16 +913,18 @@ def test_limit_msgspec(): | ||||||
|                 value: Any |                 value: Any | ||||||
| 
 | 
 | ||||||
|             assert not chk_pld_type( |             assert not chk_pld_type( | ||||||
|                 payload_spec=CustomPayload, |                 # Msg, | ||||||
|  |                 CustomPayload, | ||||||
|                 pld='doggy', |                 pld='doggy', | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             assert chk_pld_type( |             assert chk_pld_type( | ||||||
|                 payload_spec=CustomPayload, |                 # Msg, | ||||||
|  |                 CustomPayload, | ||||||
|                 pld=CustomPayload(name='doggy', value='urmom') |                 pld=CustomPayload(name='doggy', value='urmom') | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             # yah, we can `.pause_from_sync()` now! |             # uhh bc we can `.pause_from_sync()` now! :surfer: | ||||||
|             # breakpoint() |             # breakpoint() | ||||||
| 
 | 
 | ||||||
|     trio.run(main) |     trio.run(main) | ||||||
|  |  | ||||||
|  | @ -1336,23 +1336,6 @@ def test_shield_pause( | ||||||
|     child.expect(pexpect.EOF) |     child.expect(pexpect.EOF) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: better error for "non-ideal" usage from the root actor. |  | ||||||
| # -[ ] if called from an async scope emit a message that suggests |  | ||||||
| #    using `await tractor.pause()` instead since it's less overhead |  | ||||||
| #    (in terms of `greenback` and/or extra threads) and if it's from |  | ||||||
| #    a sync scope suggest that usage must first call |  | ||||||
| #    `ensure_portal()` in the (eventual parent) async calling scope? |  | ||||||
| def test_sync_pause_from_bg_task_in_root_actor_(): |  | ||||||
|     ''' |  | ||||||
|     When used from the root actor, normally we can only implicitly |  | ||||||
|     support `.pause_from_sync()` from the main-parent-task (that |  | ||||||
|     opens the runtime via `open_root_actor()`) since `greenback` |  | ||||||
|     requires a `.ensure_portal()` call per `trio.Task` where it is |  | ||||||
|     used. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     ... |  | ||||||
| 
 |  | ||||||
| # TODO: needs ANSI code stripping tho, see `assert_before()` # above! | # TODO: needs ANSI code stripping tho, see `assert_before()` # above! | ||||||
| def test_correct_frames_below_hidden(): | def test_correct_frames_below_hidden(): | ||||||
|     ''' |     ''' | ||||||
|  |  | ||||||
|  | @ -19,7 +19,7 @@ from tractor._testing import ( | ||||||
| @pytest.fixture | @pytest.fixture | ||||||
| def run_example_in_subproc( | def run_example_in_subproc( | ||||||
|     loglevel: str, |     loglevel: str, | ||||||
|     testdir: pytest.Testdir, |     testdir, | ||||||
|     reg_addr: tuple[str, int], |     reg_addr: tuple[str, int], | ||||||
| ): | ): | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -121,19 +121,10 @@ class Unresolved: | ||||||
| @dataclass | @dataclass | ||||||
| class Context: | class Context: | ||||||
|     ''' |     ''' | ||||||
|     An inter-actor, SC transitive, `trio.Task` (pair) |     An inter-actor, SC transitive, `Task` communication context. | ||||||
|     communication context. |  | ||||||
| 
 | 
 | ||||||
|     (We've also considered other names and ideas: |     NB: This class should **never be instatiated directly**, it is allocated | ||||||
|      - "communicating tasks scope": cts |     by the runtime in 2 ways: | ||||||
|      - "distributed task scope": dts |  | ||||||
|      - "communicating tasks context": ctc |  | ||||||
| 
 |  | ||||||
|      **Got a better idea for naming? Make an issue dawg!** |  | ||||||
|     ) |  | ||||||
| 
 |  | ||||||
|     NB: This class should **never be instatiated directly**, it is |  | ||||||
|     allocated by the runtime in 2 ways: |  | ||||||
|      - by entering `Portal.open_context()` which is the primary |      - by entering `Portal.open_context()` which is the primary | ||||||
|        public API for any "parent" task or, |        public API for any "parent" task or, | ||||||
|      - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg |      - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg | ||||||
|  | @ -219,16 +210,6 @@ class Context: | ||||||
|     # more the the `Context` is needed? |     # more the the `Context` is needed? | ||||||
|     _portal: Portal | None = None |     _portal: Portal | None = None | ||||||
| 
 | 
 | ||||||
|     @property |  | ||||||
|     def portal(self) -> Portal|None: |  | ||||||
|         ''' |  | ||||||
|         Return any wrapping memory-`Portal` if this is |  | ||||||
|         a 'parent'-side task which called `Portal.open_context()`, |  | ||||||
|         otherwise `None`. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         return self._portal |  | ||||||
| 
 |  | ||||||
|     # NOTE: each side of the context has its own cancel scope |     # NOTE: each side of the context has its own cancel scope | ||||||
|     # which is exactly the primitive that allows for |     # which is exactly the primitive that allows for | ||||||
|     # cross-actor-task-supervision and thus SC. |     # cross-actor-task-supervision and thus SC. | ||||||
|  | @ -318,8 +299,6 @@ class Context: | ||||||
|     # boxed exception. NOW, it's used for spawning overrun queuing |     # boxed exception. NOW, it's used for spawning overrun queuing | ||||||
|     # tasks when `.allow_overruns ==  True` !!! |     # tasks when `.allow_overruns ==  True` !!! | ||||||
|     _scope_nursery: trio.Nursery|None = None |     _scope_nursery: trio.Nursery|None = None | ||||||
|     # ^-TODO-^ change name? |  | ||||||
|     # -> `._scope_tn` "scope task nursery" |  | ||||||
| 
 | 
 | ||||||
|     # streaming overrun state tracking |     # streaming overrun state tracking | ||||||
|     _in_overrun: bool = False |     _in_overrun: bool = False | ||||||
|  | @ -429,23 +408,10 @@ class Context: | ||||||
|         ''' |         ''' | ||||||
|         return self._cancel_called |         return self._cancel_called | ||||||
| 
 | 
 | ||||||
|     @cancel_called.setter |  | ||||||
|     def cancel_called(self, val: bool) -> None: |  | ||||||
|         ''' |  | ||||||
|         Set the self-cancelled request `bool` value. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         # to debug who frickin sets it.. |  | ||||||
|         # if val: |  | ||||||
|         #     from .devx import pause_from_sync |  | ||||||
|         #     pause_from_sync() |  | ||||||
| 
 |  | ||||||
|         self._cancel_called = val |  | ||||||
| 
 |  | ||||||
|     @property |     @property | ||||||
|     def canceller(self) -> tuple[str, str]|None: |     def canceller(self) -> tuple[str, str]|None: | ||||||
|         ''' |         ''' | ||||||
|         `Actor.uid: tuple[str, str]` of the (remote) |         ``Actor.uid: tuple[str, str]`` of the (remote) | ||||||
|         actor-process who's task was cancelled thus causing this |         actor-process who's task was cancelled thus causing this | ||||||
|         (side of the) context to also be cancelled. |         (side of the) context to also be cancelled. | ||||||
| 
 | 
 | ||||||
|  | @ -549,7 +515,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|             # the local scope was never cancelled |             # the local scope was never cancelled | ||||||
|             # and instead likely we received a remote side |             # and instead likely we received a remote side | ||||||
|             # # cancellation that was raised inside `.wait_for_result()` |             # # cancellation that was raised inside `.result()` | ||||||
|             # or ( |             # or ( | ||||||
|             #     (se := self._local_error) |             #     (se := self._local_error) | ||||||
|             #     and se is re |             #     and se is re | ||||||
|  | @ -619,8 +585,6 @@ class Context: | ||||||
|         self, |         self, | ||||||
|         error: BaseException, |         error: BaseException, | ||||||
| 
 | 
 | ||||||
|         set_cancel_called: bool = False, |  | ||||||
| 
 |  | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         (Maybe) cancel this local scope due to a received remote |         (Maybe) cancel this local scope due to a received remote | ||||||
|  | @ -639,7 +603,7 @@ class Context: | ||||||
|         - `Portal.open_context()` |         - `Portal.open_context()` | ||||||
|         - `Portal.result()` |         - `Portal.result()` | ||||||
|         - `Context.open_stream()` |         - `Context.open_stream()` | ||||||
|         - `Context.wait_for_result()` |         - `Context.result()` | ||||||
| 
 | 
 | ||||||
|         when called/closed by actor local task(s). |         when called/closed by actor local task(s). | ||||||
| 
 | 
 | ||||||
|  | @ -765,7 +729,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         # Cancel the local `._scope`, catch that |         # Cancel the local `._scope`, catch that | ||||||
|         # `._scope.cancelled_caught` and re-raise any remote error |         # `._scope.cancelled_caught` and re-raise any remote error | ||||||
|         # once exiting (or manually calling `.wait_for_result()`) the |         # once exiting (or manually calling `.result()`) the | ||||||
|         # `.open_context()`  block. |         # `.open_context()`  block. | ||||||
|         cs: trio.CancelScope = self._scope |         cs: trio.CancelScope = self._scope | ||||||
|         if ( |         if ( | ||||||
|  | @ -800,9 +764,8 @@ class Context: | ||||||
|                 # `trio.Cancelled` subtype here ;) |                 # `trio.Cancelled` subtype here ;) | ||||||
|                 # https://github.com/goodboy/tractor/issues/368 |                 # https://github.com/goodboy/tractor/issues/368 | ||||||
|                 message: str = 'Cancelling `Context._scope` !\n\n' |                 message: str = 'Cancelling `Context._scope` !\n\n' | ||||||
|                 # from .devx import pause_from_sync |  | ||||||
|                 # pause_from_sync() |  | ||||||
|                 self._scope.cancel() |                 self._scope.cancel() | ||||||
|  | 
 | ||||||
|         else: |         else: | ||||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' |             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||||
|             # from .devx import mk_pdb |             # from .devx import mk_pdb | ||||||
|  | @ -926,7 +889,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         side: str = self.side |         side: str = self.side | ||||||
|         self.cancel_called: bool = True |         self._cancel_called: bool = True | ||||||
| 
 | 
 | ||||||
|         header: str = ( |         header: str = ( | ||||||
|             f'Cancelling ctx with peer from {side.upper()} side\n\n' |             f'Cancelling ctx with peer from {side.upper()} side\n\n' | ||||||
|  | @ -949,7 +912,7 @@ class Context: | ||||||
|         # `._scope.cancel()` since we expect the eventual |         # `._scope.cancel()` since we expect the eventual | ||||||
|         # `ContextCancelled` from the other side to trigger this |         # `ContextCancelled` from the other side to trigger this | ||||||
|         # when the runtime finally receives it during teardown |         # when the runtime finally receives it during teardown | ||||||
|         # (normally in `.wait_for_result()` called from |         # (normally in `.result()` called from | ||||||
|         # `Portal.open_context().__aexit__()`) |         # `Portal.open_context().__aexit__()`) | ||||||
|         if side == 'parent': |         if side == 'parent': | ||||||
|             if not self._portal: |             if not self._portal: | ||||||
|  | @ -1062,10 +1025,10 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = hide_tb |         __tracebackhide__: bool = hide_tb | ||||||
|         peer_uid: tuple = self.chan.uid |         our_uid: tuple = self.chan.uid | ||||||
| 
 | 
 | ||||||
|         # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption |         # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption | ||||||
|         # for "graceful cancellation" case(s): |         # for "graceful cancellation" case: | ||||||
|         # |         # | ||||||
|         # Whenever a "side" of a context (a `Task` running in |         # Whenever a "side" of a context (a `Task` running in | ||||||
|         # an actor) **is** the side which requested ctx |         # an actor) **is** the side which requested ctx | ||||||
|  | @ -1082,11 +1045,9 @@ class Context: | ||||||
|         # set to the `Actor.uid` of THIS task (i.e. the |         # set to the `Actor.uid` of THIS task (i.e. the | ||||||
|         # cancellation requesting task's actor is the actor |         # cancellation requesting task's actor is the actor | ||||||
|         # checking whether it should absorb the ctxc). |         # checking whether it should absorb the ctxc). | ||||||
|         self_ctxc: bool = self._is_self_cancelled(remote_error) |  | ||||||
|         if ( |         if ( | ||||||
|             self_ctxc |  | ||||||
|             and |  | ||||||
|             not raise_ctxc_from_self_call |             not raise_ctxc_from_self_call | ||||||
|  |             and self._is_self_cancelled(remote_error) | ||||||
| 
 | 
 | ||||||
|             # TODO: ?potentially it is useful to emit certain |             # TODO: ?potentially it is useful to emit certain | ||||||
|             # warning/cancel logs for the cases where the |             # warning/cancel logs for the cases where the | ||||||
|  | @ -1116,8 +1077,8 @@ class Context: | ||||||
|             and isinstance(remote_error, RemoteActorError) |             and isinstance(remote_error, RemoteActorError) | ||||||
|             and remote_error.boxed_type is StreamOverrun |             and remote_error.boxed_type is StreamOverrun | ||||||
| 
 | 
 | ||||||
|             # and tuple(remote_error.msgdata['sender']) == peer_uid |             # and tuple(remote_error.msgdata['sender']) == our_uid | ||||||
|             and tuple(remote_error.sender) == peer_uid |             and tuple(remote_error.sender) == our_uid | ||||||
|         ): |         ): | ||||||
|             # NOTE: we set the local scope error to any "self |             # NOTE: we set the local scope error to any "self | ||||||
|             # cancellation" error-response thus "absorbing" |             # cancellation" error-response thus "absorbing" | ||||||
|  | @ -1179,9 +1140,9 @@ class Context: | ||||||
|         of the remote cancellation. |         of the remote cancellation. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = False |         __tracebackhide__ = hide_tb | ||||||
|         assert self._portal, ( |         assert self._portal, ( | ||||||
|             '`Context.wait_for_result()` can not be called from callee side!' |             "Context.result() can not be called from callee side!" | ||||||
|         ) |         ) | ||||||
|         if self._final_result_is_set(): |         if self._final_result_is_set(): | ||||||
|             return self._result |             return self._result | ||||||
|  | @ -1208,8 +1169,7 @@ class Context: | ||||||
|                 drained_msgs, |                 drained_msgs, | ||||||
|             ) = await msgops.drain_to_final_msg( |             ) = await msgops.drain_to_final_msg( | ||||||
|                 ctx=self, |                 ctx=self, | ||||||
|                 # hide_tb=hide_tb, |                 hide_tb=hide_tb, | ||||||
|                 hide_tb=False, |  | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             drained_status: str = ( |             drained_status: str = ( | ||||||
|  | @ -1225,8 +1185,6 @@ class Context: | ||||||
| 
 | 
 | ||||||
|             log.cancel(drained_status) |             log.cancel(drained_status) | ||||||
| 
 | 
 | ||||||
|         # __tracebackhide__: bool = hide_tb |  | ||||||
| 
 |  | ||||||
|         self.maybe_raise( |         self.maybe_raise( | ||||||
|             # NOTE: obvi we don't care if we |             # NOTE: obvi we don't care if we | ||||||
|             # overran the far end if we're already |             # overran the far end if we're already | ||||||
|  | @ -1239,8 +1197,7 @@ class Context: | ||||||
|                 # raising something we know might happen |                 # raising something we know might happen | ||||||
|                 # during cancellation ;) |                 # during cancellation ;) | ||||||
|                 (not self._cancel_called) |                 (not self._cancel_called) | ||||||
|             ), |             ) | ||||||
|             hide_tb=hide_tb, |  | ||||||
|         ) |         ) | ||||||
|         # TODO: eventually make `.outcome: Outcome` and thus return |         # TODO: eventually make `.outcome: Outcome` and thus return | ||||||
|         # `self.outcome.unwrap()` here! |         # `self.outcome.unwrap()` here! | ||||||
|  | @ -1626,7 +1583,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         - NEVER `return` early before delivering the msg! |         - NEVER `return` early before delivering the msg! | ||||||
|           bc if the error is a ctxc and there is a task waiting on |           bc if the error is a ctxc and there is a task waiting on | ||||||
|           `.wait_for_result()` we need the msg to be |           `.result()` we need the msg to be | ||||||
|           `send_chan.send_nowait()`-ed over the `._rx_chan` so |           `send_chan.send_nowait()`-ed over the `._rx_chan` so | ||||||
|           that the error is relayed to that waiter task and thus |           that the error is relayed to that waiter task and thus | ||||||
|           raised in user code! |           raised in user code! | ||||||
|  | @ -1871,7 +1828,7 @@ async def open_context_from_portal( | ||||||
|     When the "callee" (side that is "called"/started by a call |     When the "callee" (side that is "called"/started by a call | ||||||
|     to *this* method) returns, the caller side (this) unblocks |     to *this* method) returns, the caller side (this) unblocks | ||||||
|     and any final value delivered from the other end can be |     and any final value delivered from the other end can be | ||||||
|     retrieved using the `Contex.wait_for_result()` api. |     retrieved using the `Contex.result()` api. | ||||||
| 
 | 
 | ||||||
|     The yielded ``Context`` instance further allows for opening |     The yielded ``Context`` instance further allows for opening | ||||||
|     bidirectional streams, explicit cancellation and |     bidirectional streams, explicit cancellation and | ||||||
|  | @ -2008,14 +1965,14 @@ async def open_context_from_portal( | ||||||
|             yield ctx, first |             yield ctx, first | ||||||
| 
 | 
 | ||||||
|             # ??TODO??: do we still want to consider this or is |             # ??TODO??: do we still want to consider this or is | ||||||
|             # the `else:` block handling via a `.wait_for_result()` |             # the `else:` block handling via a `.result()` | ||||||
|             # call below enough?? |             # call below enough?? | ||||||
|             # |             # | ||||||
|             # -[ ] pretty sure `.wait_for_result()` internals do the |             # -[ ] pretty sure `.result()` internals do the | ||||||
|             # same as our ctxc handler below so it ended up |             # same as our ctxc handler below so it ended up | ||||||
|             # being same (repeated?) behaviour, but ideally we |             # being same (repeated?) behaviour, but ideally we | ||||||
|             # wouldn't have that duplication either by somehow |             # wouldn't have that duplication either by somehow | ||||||
|             # factoring the `.wait_for_result()` handler impl in a way |             # factoring the `.result()` handler impl in a way | ||||||
|             # that we can re-use it around the `yield` ^ here |             # that we can re-use it around the `yield` ^ here | ||||||
|             # or vice versa? |             # or vice versa? | ||||||
|             # |             # | ||||||
|  | @ -2153,7 +2110,7 @@ async def open_context_from_portal( | ||||||
|         #    AND a group-exc is only raised if there was > 1 |         #    AND a group-exc is only raised if there was > 1 | ||||||
|         #    tasks started *here* in the "caller" / opener |         #    tasks started *here* in the "caller" / opener | ||||||
|         #    block. If any one of those tasks calls |         #    block. If any one of those tasks calls | ||||||
|         #    `.wait_for_result()` or `MsgStream.receive()` |         #    `.result()` or `MsgStream.receive()` | ||||||
|         #    `._maybe_raise_remote_err()` will be transitively |         #    `._maybe_raise_remote_err()` will be transitively | ||||||
|         #    called and the remote error raised causing all |         #    called and the remote error raised causing all | ||||||
|         #    tasks to be cancelled. |         #    tasks to be cancelled. | ||||||
|  | @ -2223,7 +2180,7 @@ async def open_context_from_portal( | ||||||
|                 f'|_{ctx._task}\n' |                 f'|_{ctx._task}\n' | ||||||
|             ) |             ) | ||||||
|             # XXX NOTE XXX: the below call to |             # XXX NOTE XXX: the below call to | ||||||
|             # `Context.wait_for_result()` will ALWAYS raise |             # `Context.result()` will ALWAYS raise | ||||||
|             # a `ContextCancelled` (via an embedded call to |             # a `ContextCancelled` (via an embedded call to | ||||||
|             # `Context._maybe_raise_remote_err()`) IFF |             # `Context._maybe_raise_remote_err()`) IFF | ||||||
|             # a `Context._remote_error` was set by the runtime |             # a `Context._remote_error` was set by the runtime | ||||||
|  | @ -2233,10 +2190,10 @@ async def open_context_from_portal( | ||||||
|             # ALWAYS SET any time "callee" side fails and causes "caller |             # ALWAYS SET any time "callee" side fails and causes "caller | ||||||
|             # side" cancellation via a `ContextCancelled` here. |             # side" cancellation via a `ContextCancelled` here. | ||||||
|             try: |             try: | ||||||
|                 result_or_err: Exception|Any = await ctx.wait_for_result() |                 result_or_err: Exception|Any = await ctx.result() | ||||||
|             except BaseException as berr: |             except BaseException as berr: | ||||||
|                 # on normal teardown, if we get some error |                 # on normal teardown, if we get some error | ||||||
|                 # raised in `Context.wait_for_result()` we still want to |                 # raised in `Context.result()` we still want to | ||||||
|                 # save that error on the ctx's state to |                 # save that error on the ctx's state to | ||||||
|                 # determine things like `.cancelled_caught` for |                 # determine things like `.cancelled_caught` for | ||||||
|                 # cases where there was remote cancellation but |                 # cases where there was remote cancellation but | ||||||
|  |  | ||||||
|  | @ -56,12 +56,14 @@ async def get_registry( | ||||||
| ]: | ]: | ||||||
|     ''' |     ''' | ||||||
|     Return a portal instance connected to a local or remote |     Return a portal instance connected to a local or remote | ||||||
|     registry-service actor; if a connection already exists re-use it |     arbiter. | ||||||
|     (presumably to call a `.register_actor()` registry runtime RPC |  | ||||||
|     ep). |  | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     actor: Actor = current_actor() |     actor = current_actor() | ||||||
|  | 
 | ||||||
|  |     if not actor: | ||||||
|  |         raise RuntimeError("No actor instance has been defined yet?") | ||||||
|  | 
 | ||||||
|     if actor.is_registrar: |     if actor.is_registrar: | ||||||
|         # we're already the arbiter |         # we're already the arbiter | ||||||
|         # (likely a re-entrant call from the arbiter actor) |         # (likely a re-entrant call from the arbiter actor) | ||||||
|  | @ -70,8 +72,6 @@ async def get_registry( | ||||||
|             Channel((host, port)) |             Channel((host, port)) | ||||||
|         ) |         ) | ||||||
|     else: |     else: | ||||||
|         # TODO: try to look pre-existing connection from |  | ||||||
|         # `Actor._peers` and use it instead? |  | ||||||
|         async with ( |         async with ( | ||||||
|             _connect_chan(host, port) as chan, |             _connect_chan(host, port) as chan, | ||||||
|             open_portal(chan) as regstr_ptl, |             open_portal(chan) as regstr_ptl, | ||||||
|  |  | ||||||
|  | @ -20,8 +20,7 @@ Sub-process entry points. | ||||||
| """ | """ | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
| from functools import partial | from functools import partial | ||||||
| import os | # import textwrap | ||||||
| import textwrap |  | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
|  | @ -59,7 +58,7 @@ def _mp_main( | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     ''' |     ''' | ||||||
|     The routine called *after fork* which invokes a fresh `trio.run()` |     The routine called *after fork* which invokes a fresh ``trio.run`` | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     actor._forkserver_info = forkserver_info |     actor._forkserver_info = forkserver_info | ||||||
|  | @ -97,35 +96,6 @@ def _mp_main( | ||||||
|         log.info(f"Subactor {actor.uid} terminated") |         log.info(f"Subactor {actor.uid} terminated") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: move this to some kinda `.devx._conc_lang.py` eventually |  | ||||||
| # as we work out our multi-domain state-flow-syntax! |  | ||||||
| def nest_from_op( |  | ||||||
|     input_op: str, |  | ||||||
|     tree_str: str, |  | ||||||
| 
 |  | ||||||
|     back_from_op: int = 1, |  | ||||||
| ) -> str: |  | ||||||
|     ''' |  | ||||||
|     Depth-increment the input (presumably hierarchy/supervision) |  | ||||||
|     input "tree string" below the provided `input_op` execution |  | ||||||
|     operator, so injecting a `"\n|_{input_op}\n"`and indenting the |  | ||||||
|     `tree_str` to nest content aligned with the ops last char. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     return ( |  | ||||||
|         f'{input_op}\n' |  | ||||||
|         + |  | ||||||
|         textwrap.indent( |  | ||||||
|             tree_str, |  | ||||||
|             prefix=( |  | ||||||
|                 len(input_op) |  | ||||||
|                 - |  | ||||||
|                 back_from_op |  | ||||||
|             ) *' ', |  | ||||||
|         ) |  | ||||||
|     ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def _trio_main( | def _trio_main( | ||||||
|     actor: Actor, |     actor: Actor, | ||||||
|     *, |     *, | ||||||
|  | @ -149,6 +119,7 @@ def _trio_main( | ||||||
| 
 | 
 | ||||||
|     if actor.loglevel is not None: |     if actor.loglevel is not None: | ||||||
|         get_console_log(actor.loglevel) |         get_console_log(actor.loglevel) | ||||||
|  |         import os | ||||||
|         actor_info: str = ( |         actor_info: str = ( | ||||||
|             f'|_{actor}\n' |             f'|_{actor}\n' | ||||||
|             f'  uid: {actor.uid}\n' |             f'  uid: {actor.uid}\n' | ||||||
|  | @ -157,29 +128,13 @@ def _trio_main( | ||||||
|             f'  loglevel: {actor.loglevel}\n' |             f'  loglevel: {actor.loglevel}\n' | ||||||
|         ) |         ) | ||||||
|         log.info( |         log.info( | ||||||
|             'Started new `trio` subactor:\n' |             'Started new trio subactor:\n' | ||||||
|             + |             + | ||||||
|             nest_from_op( |             '>\n'  # like a "started/play"-icon from super perspective | ||||||
|                 input_op='(>',  # like a "started/play"-icon from super perspective |             + | ||||||
|                 tree_str=actor_info, |             actor_info, | ||||||
|             ) |  | ||||||
|             # '>(\n'  # like a "started/play"-icon from super perspective |  | ||||||
|             # + |  | ||||||
|             # actor_info, |  | ||||||
|         ) |         ) | ||||||
|     logmeth = log.info | 
 | ||||||
|     message: str = ( |  | ||||||
|     # log.info( |  | ||||||
|         'Subactor terminated\n' |  | ||||||
|         + |  | ||||||
|         nest_from_op( |  | ||||||
|             input_op=')>',  # like a "started/play"-icon from super perspective |  | ||||||
|             tree_str=actor_info, |  | ||||||
|         ) |  | ||||||
|         # 'x\n'  # like a "crossed-out/killed" from super perspective |  | ||||||
|         # + |  | ||||||
|         # actor_info |  | ||||||
|     ) |  | ||||||
|     try: |     try: | ||||||
|         if infect_asyncio: |         if infect_asyncio: | ||||||
|             actor._infected_aio = True |             actor._infected_aio = True | ||||||
|  | @ -188,18 +143,16 @@ def _trio_main( | ||||||
|             trio.run(trio_main) |             trio.run(trio_main) | ||||||
| 
 | 
 | ||||||
|     except KeyboardInterrupt: |     except KeyboardInterrupt: | ||||||
|         logmeth = log.cancel |         log.cancel( | ||||||
|         message: str = ( |             'Actor received KBI\n' | ||||||
|             'Actor received KBI (aka an OS-cancel)\n' |  | ||||||
|             + |             + | ||||||
|             nest_from_op( |             actor_info | ||||||
|                 input_op='c)>',  # like a "started/play"-icon from super perspective |  | ||||||
|                 tree_str=actor_info, |  | ||||||
|             ) |  | ||||||
|         ) |         ) | ||||||
|     except BaseException: |  | ||||||
|         log.exception('Actor crashed exit?') |  | ||||||
|         raise |  | ||||||
| 
 |  | ||||||
|     finally: |     finally: | ||||||
|         logmeth(message) |         log.info( | ||||||
|  |             'Subactor terminated\n' | ||||||
|  |             + | ||||||
|  |             'x\n'  # like a "crossed-out/killed" from super perspective | ||||||
|  |             + | ||||||
|  |             actor_info | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  | @ -40,7 +40,6 @@ from typing import ( | ||||||
|     TypeVar, |     TypeVar, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| # import pdbp |  | ||||||
| import msgspec | import msgspec | ||||||
| from tricycle import BufferedReceiveStream | from tricycle import BufferedReceiveStream | ||||||
| import trio | import trio | ||||||
|  | @ -291,14 +290,12 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|                 else: |                 else: | ||||||
|                     raise |                     raise | ||||||
| 
 | 
 | ||||||
|     # @pdbp.hideframe |  | ||||||
|     async def send( |     async def send( | ||||||
|         self, |         self, | ||||||
|         msg: msgtypes.MsgType, |         msg: msgtypes.MsgType, | ||||||
| 
 | 
 | ||||||
|         strict_types: bool = True, |         strict_types: bool = True, | ||||||
|         hide_tb: bool = False, |         # hide_tb: bool = False, | ||||||
| 
 |  | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         Send a msgpack encoded py-object-blob-as-msg over TCP. |         Send a msgpack encoded py-object-blob-as-msg over TCP. | ||||||
|  | @ -307,10 +304,7 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|         invalid msg type |         invalid msg type | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = hide_tb |         # __tracebackhide__: bool = hide_tb | ||||||
|         # try: |  | ||||||
|         # XXX see `trio._sync.AsyncContextManagerMixin` for details |  | ||||||
|         # on the `.acquire()`/`.release()` sequencing.. |  | ||||||
|         async with self._send_lock: |         async with self._send_lock: | ||||||
| 
 | 
 | ||||||
|             # NOTE: lookup the `trio.Task.context`'s var for |             # NOTE: lookup the `trio.Task.context`'s var for | ||||||
|  | @ -358,14 +352,6 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|             size: bytes = struct.pack("<I", len(bytes_data)) |             size: bytes = struct.pack("<I", len(bytes_data)) | ||||||
|             return await self.stream.send_all(size + bytes_data) |             return await self.stream.send_all(size + bytes_data) | ||||||
| 
 | 
 | ||||||
|         # TODO: does it help ever to dynamically show this |  | ||||||
|         # frame? |  | ||||||
|         # except BaseException as _err: |  | ||||||
|         #     err = _err |  | ||||||
|         #     if not isinstance(err, MsgTypeError): |  | ||||||
|         #         __tracebackhide__: bool = False |  | ||||||
|         #     raise |  | ||||||
| 
 |  | ||||||
|     @property |     @property | ||||||
|     def laddr(self) -> tuple[str, int]: |     def laddr(self) -> tuple[str, int]: | ||||||
|         return self._laddr |         return self._laddr | ||||||
|  | @ -574,40 +560,27 @@ class Channel: | ||||||
|         ) |         ) | ||||||
|         return transport |         return transport | ||||||
| 
 | 
 | ||||||
|     # TODO: something like, |  | ||||||
|     # `pdbp.hideframe_on(errors=[MsgTypeError])` |  | ||||||
|     # @pdbp.hideframe |  | ||||||
|     async def send( |     async def send( | ||||||
|         self, |         self, | ||||||
|         payload: Any, |         payload: Any, | ||||||
| 
 | 
 | ||||||
|         hide_tb: bool = False, |         # hide_tb: bool = False, | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         Send a coded msg-blob over the transport. |         Send a coded msg-blob over the transport. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = hide_tb |         # __tracebackhide__: bool = hide_tb | ||||||
|         try: |         log.transport( | ||||||
|             log.transport( |             '=> send IPC msg:\n\n' | ||||||
|                 '=> send IPC msg:\n\n' |             f'{pformat(payload)}\n' | ||||||
|                 f'{pformat(payload)}\n' |         )  # type: ignore | ||||||
|             ) |         assert self._transport | ||||||
|             # assert self._transport  # but why typing? |         await self._transport.send( | ||||||
|             await self._transport.send( |             payload, | ||||||
|                 payload, |             # hide_tb=hide_tb, | ||||||
|                 hide_tb=hide_tb, |         ) | ||||||
|             ) |  | ||||||
|         except BaseException as _err: |  | ||||||
|             err = _err  # bind for introspection |  | ||||||
|             if not isinstance(_err, MsgTypeError): |  | ||||||
|                 # assert err |  | ||||||
|                 __tracebackhide__: bool = False |  | ||||||
|             else: |  | ||||||
|                 assert err.cid |  | ||||||
| 
 |  | ||||||
|             raise |  | ||||||
| 
 | 
 | ||||||
|     async def recv(self) -> Any: |     async def recv(self) -> Any: | ||||||
|         assert self._transport |         assert self._transport | ||||||
|  |  | ||||||
|  | @ -121,8 +121,7 @@ class Portal: | ||||||
|         ) |         ) | ||||||
|         return self.chan |         return self.chan | ||||||
| 
 | 
 | ||||||
|     # TODO: factor this out into a `.highlevel` API-wrapper that uses |     # TODO: factor this out into an `ActorNursery` wrapper | ||||||
|     # a single `.open_context()` call underneath. |  | ||||||
|     async def _submit_for_result( |     async def _submit_for_result( | ||||||
|         self, |         self, | ||||||
|         ns: str, |         ns: str, | ||||||
|  | @ -142,22 +141,13 @@ class Portal: | ||||||
|             portal=self, |             portal=self, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     # TODO: we should deprecate this API right? since if we remove |  | ||||||
|     # `.run_in_actor()` (and instead move it to a `.highlevel` |  | ||||||
|     # wrapper api (around a single `.open_context()` call) we don't |  | ||||||
|     # really have any notion of a "main" remote task any more? |  | ||||||
|     # |  | ||||||
|     # @api_frame |     # @api_frame | ||||||
|     async def wait_for_result( |     async def result(self) -> Any: | ||||||
|         self, |  | ||||||
|         hide_tb: bool = True, |  | ||||||
|     ) -> Any: |  | ||||||
|         ''' |         ''' | ||||||
|         Return the final result delivered by a `Return`-msg from the |         Return the result(s) from the remote actor's "main" task. | ||||||
|         remote peer actor's "main" task's `return` statement. |  | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = hide_tb |         __tracebackhide__ = True | ||||||
|         # Check for non-rpc errors slapped on the |         # Check for non-rpc errors slapped on the | ||||||
|         # channel for which we always raise |         # channel for which we always raise | ||||||
|         exc = self.channel._exc |         exc = self.channel._exc | ||||||
|  | @ -192,23 +182,6 @@ class Portal: | ||||||
| 
 | 
 | ||||||
|         return self._final_result_pld |         return self._final_result_pld | ||||||
| 
 | 
 | ||||||
|     # TODO: factor this out into a `.highlevel` API-wrapper that uses |  | ||||||
|     # a single `.open_context()` call underneath. |  | ||||||
|     async def result( |  | ||||||
|         self, |  | ||||||
|         *args, |  | ||||||
|         **kwargs, |  | ||||||
|     ) -> Any|Exception: |  | ||||||
|         typname: str = type(self).__name__ |  | ||||||
|         log.warning( |  | ||||||
|             f'`{typname}.result()` is DEPRECATED!\n' |  | ||||||
|             'Use `{typname.wait_for_result()` instead!\n' |  | ||||||
|         ) |  | ||||||
|         return await self.wait_for_result( |  | ||||||
|             *args, |  | ||||||
|             **kwargs, |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|     async def _cancel_streams(self): |     async def _cancel_streams(self): | ||||||
|         # terminate all locally running async generator |         # terminate all locally running async generator | ||||||
|         # IPC calls |         # IPC calls | ||||||
|  | @ -267,7 +240,6 @@ class Portal: | ||||||
|             f'{reminfo}' |             f'{reminfo}' | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         # XXX the one spot we set it? |  | ||||||
|         self.channel._cancel_called: bool = True |         self.channel._cancel_called: bool = True | ||||||
|         try: |         try: | ||||||
|             # send cancel cmd - might not get response |             # send cancel cmd - might not get response | ||||||
|  | @ -307,8 +279,6 @@ class Portal: | ||||||
|             ) |             ) | ||||||
|             return False |             return False | ||||||
| 
 | 
 | ||||||
|     # TODO: do we still need this for low level `Actor`-runtime |  | ||||||
|     # method calls or can we also remove it? |  | ||||||
|     async def run_from_ns( |     async def run_from_ns( | ||||||
|         self, |         self, | ||||||
|         namespace_path: str, |         namespace_path: str, | ||||||
|  | @ -346,8 +316,6 @@ class Portal: | ||||||
|             expect_msg=Return, |             expect_msg=Return, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     # TODO: factor this out into a `.highlevel` API-wrapper that uses |  | ||||||
|     # a single `.open_context()` call underneath. |  | ||||||
|     async def run( |     async def run( | ||||||
|         self, |         self, | ||||||
|         func: str, |         func: str, | ||||||
|  | @ -402,8 +370,6 @@ class Portal: | ||||||
|             expect_msg=Return, |             expect_msg=Return, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     # TODO: factor this out into a `.highlevel` API-wrapper that uses |  | ||||||
|     # a single `.open_context()` call underneath. |  | ||||||
|     @acm |     @acm | ||||||
|     async def open_stream_from( |     async def open_stream_from( | ||||||
|         self, |         self, | ||||||
|  |  | ||||||
|  | @ -21,7 +21,6 @@ Root actor runtime ignition(s). | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
| from functools import partial | from functools import partial | ||||||
| import importlib | import importlib | ||||||
| import inspect |  | ||||||
| import logging | import logging | ||||||
| import os | import os | ||||||
| import signal | import signal | ||||||
|  | @ -116,16 +115,10 @@ async def open_root_actor( | ||||||
|     if ( |     if ( | ||||||
|         debug_mode |         debug_mode | ||||||
|         and maybe_enable_greenback |         and maybe_enable_greenback | ||||||
|         and ( |         and await _debug.maybe_init_greenback( | ||||||
|             maybe_mod := await _debug.maybe_init_greenback( |             raise_not_found=False, | ||||||
|                 raise_not_found=False, |  | ||||||
|             ) |  | ||||||
|         ) |         ) | ||||||
|     ): |     ): | ||||||
|         logger.info( |  | ||||||
|             f'Found `greenback` installed @ {maybe_mod}\n' |  | ||||||
|             'Enabling `tractor.pause_from_sync()` support!\n' |  | ||||||
|         ) |  | ||||||
|         os.environ['PYTHONBREAKPOINT'] = ( |         os.environ['PYTHONBREAKPOINT'] = ( | ||||||
|             'tractor.devx._debug._sync_pause_from_builtin' |             'tractor.devx._debug._sync_pause_from_builtin' | ||||||
|         ) |         ) | ||||||
|  | @ -271,10 +264,7 @@ async def open_root_actor( | ||||||
| 
 | 
 | ||||||
|         except OSError: |         except OSError: | ||||||
|             # TODO: make this a "discovery" log level? |             # TODO: make this a "discovery" log level? | ||||||
|             logger.info( |             logger.warning(f'No actor registry found @ {addr}') | ||||||
|                 f'No actor registry found @ {addr}\n' |  | ||||||
|                 # 'Registry will be initialized in local actor..' |  | ||||||
|             ) |  | ||||||
| 
 | 
 | ||||||
|     async with trio.open_nursery() as tn: |     async with trio.open_nursery() as tn: | ||||||
|         for addr in registry_addrs: |         for addr in registry_addrs: | ||||||
|  | @ -375,25 +365,23 @@ async def open_root_actor( | ||||||
|             ) |             ) | ||||||
|             try: |             try: | ||||||
|                 yield actor |                 yield actor | ||||||
|  | 
 | ||||||
|             except ( |             except ( | ||||||
|                 Exception, |                 Exception, | ||||||
|                 BaseExceptionGroup, |                 BaseExceptionGroup, | ||||||
|             ) as err: |             ) as err: | ||||||
|                 # XXX NOTE XXX see equiv note inside | 
 | ||||||
|                 # `._runtime.Actor._stream_handler()` where in the |                 import inspect | ||||||
|                 # non-root or root-that-opened-this-mahually case we |  | ||||||
|                 # wait for the local actor-nursery to exit before |  | ||||||
|                 # exiting the transport channel handler. |  | ||||||
|                 entered: bool = await _debug._maybe_enter_pm( |                 entered: bool = await _debug._maybe_enter_pm( | ||||||
|                     err, |                     err, | ||||||
|                     api_frame=inspect.currentframe(), |                     api_frame=inspect.currentframe(), | ||||||
|                 ) |                 ) | ||||||
|  | 
 | ||||||
|                 if ( |                 if ( | ||||||
|                     not entered |                     not entered | ||||||
|                     and |                     and not is_multi_cancelled(err) | ||||||
|                     not is_multi_cancelled(err) |  | ||||||
|                 ): |                 ): | ||||||
|                     logger.exception('Root actor crashed\n') |                     logger.exception('Root actor crashed:\n') | ||||||
| 
 | 
 | ||||||
|                 # ALWAYS re-raise any error bubbled up from the |                 # ALWAYS re-raise any error bubbled up from the | ||||||
|                 # runtime! |                 # runtime! | ||||||
|  |  | ||||||
|  | @ -89,15 +89,6 @@ if TYPE_CHECKING: | ||||||
| log = get_logger('tractor') | log = get_logger('tractor') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: move to a `tractor.lowlevel.rpc` with the below |  | ||||||
| # func-type-cases implemented "on top of" `@context` defs. |  | ||||||
| # -[ ] std async func |  | ||||||
| # -[ ] `Portal.open_stream_from()` with async-gens? |  | ||||||
| #  |_ possibly a duplex form of this with a  |  | ||||||
| #    `sent_from_peer = yield send_to_peer` form, which would require |  | ||||||
| #    syncing the send/recv side with possibly `.receive_nowait()` |  | ||||||
| #    on each `yield`? |  | ||||||
| # -[ ]  |  | ||||||
| async def _invoke_non_context( | async def _invoke_non_context( | ||||||
|     actor: Actor, |     actor: Actor, | ||||||
|     cancel_scope: CancelScope, |     cancel_scope: CancelScope, | ||||||
|  | @ -117,7 +108,6 @@ async def _invoke_non_context( | ||||||
|     ] = trio.TASK_STATUS_IGNORED, |     ] = trio.TASK_STATUS_IGNORED, | ||||||
| ): | ): | ||||||
|     __tracebackhide__: bool = True |     __tracebackhide__: bool = True | ||||||
|     cs: CancelScope|None = None  # ref when activated |  | ||||||
| 
 | 
 | ||||||
|     # TODO: can we unify this with the `context=True` impl below? |     # TODO: can we unify this with the `context=True` impl below? | ||||||
|     if inspect.isasyncgen(coro): |     if inspect.isasyncgen(coro): | ||||||
|  | @ -170,6 +160,10 @@ async def _invoke_non_context( | ||||||
|                 functype='asyncgen', |                 functype='asyncgen', | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
|  |         # XXX: the async-func may spawn further tasks which push | ||||||
|  |         # back values like an async-generator would but must | ||||||
|  |         # manualy construct the response dict-packet-responses as | ||||||
|  |         # above | ||||||
|         with cancel_scope as cs: |         with cancel_scope as cs: | ||||||
|             ctx._scope = cs |             ctx._scope = cs | ||||||
|             task_status.started(ctx) |             task_status.started(ctx) | ||||||
|  | @ -181,13 +175,15 @@ async def _invoke_non_context( | ||||||
|             await chan.send( |             await chan.send( | ||||||
|                 Stop(cid=cid) |                 Stop(cid=cid) | ||||||
|             ) |             ) | ||||||
| 
 |  | ||||||
|     # simplest function/method request-response pattern |  | ||||||
|     # XXX: in the most minimally used case, just a scheduled internal runtime |  | ||||||
|     # call to `Actor._cancel_task()` from the ctx-peer task since we |  | ||||||
|     # don't (yet) have a dedicated IPC msg. |  | ||||||
|     # ------ - ------ |  | ||||||
|     else: |     else: | ||||||
|  |         # regular async function/method | ||||||
|  |         # XXX: possibly just a scheduled `Actor._cancel_task()` | ||||||
|  |         # from a remote request to cancel some `Context`. | ||||||
|  |         # ------ - ------ | ||||||
|  |         # TODO: ideally we unify this with the above `context=True` | ||||||
|  |         # block such that for any remote invocation ftype, we | ||||||
|  |         # always invoke the far end RPC task scheduling the same | ||||||
|  |         # way: using the linked IPC context machinery. | ||||||
|         failed_resp: bool = False |         failed_resp: bool = False | ||||||
|         try: |         try: | ||||||
|             ack = StartAck( |             ack = StartAck( | ||||||
|  | @ -358,14 +354,8 @@ async def _errors_relayed_via_ipc( | ||||||
|             # channel. |             # channel. | ||||||
|             task_status.started(err) |             task_status.started(err) | ||||||
| 
 | 
 | ||||||
|         # always reraise KBIs so they propagate at the sys-process |         # always reraise KBIs so they propagate at the sys-process level. | ||||||
|         # level. |         if isinstance(err, KeyboardInterrupt): | ||||||
|         # XXX LOL, except when running in asyncio mode XD |  | ||||||
|         # cmon guys, wtf.. |  | ||||||
|         if ( |  | ||||||
|             isinstance(err, KeyboardInterrupt) |  | ||||||
|             # and not actor.is_infected_aio() |  | ||||||
|         ): |  | ||||||
|             raise |             raise | ||||||
| 
 | 
 | ||||||
|     # RPC task bookeeping. |     # RPC task bookeeping. | ||||||
|  | @ -468,6 +458,7 @@ async def _invoke( | ||||||
|     # tb: TracebackType = None |     # tb: TracebackType = None | ||||||
| 
 | 
 | ||||||
|     cancel_scope = CancelScope() |     cancel_scope = CancelScope() | ||||||
|  |     cs: CancelScope|None = None  # ref when activated | ||||||
|     ctx = actor.get_context( |     ctx = actor.get_context( | ||||||
|         chan=chan, |         chan=chan, | ||||||
|         cid=cid, |         cid=cid, | ||||||
|  | @ -616,8 +607,6 @@ async def _invoke( | ||||||
|         #     `@context` marked RPC function. |         #     `@context` marked RPC function. | ||||||
|         # - `._portal` is never set. |         # - `._portal` is never set. | ||||||
|         try: |         try: | ||||||
|             tn: trio.Nursery |  | ||||||
|             rpc_ctx_cs: CancelScope |  | ||||||
|             async with ( |             async with ( | ||||||
|                 trio.open_nursery() as tn, |                 trio.open_nursery() as tn, | ||||||
|                 msgops.maybe_limit_plds( |                 msgops.maybe_limit_plds( | ||||||
|  | @ -627,7 +616,7 @@ async def _invoke( | ||||||
|                 ), |                 ), | ||||||
|             ): |             ): | ||||||
|                 ctx._scope_nursery = tn |                 ctx._scope_nursery = tn | ||||||
|                 rpc_ctx_cs = ctx._scope = tn.cancel_scope |                 ctx._scope = tn.cancel_scope | ||||||
|                 task_status.started(ctx) |                 task_status.started(ctx) | ||||||
| 
 | 
 | ||||||
|                 # TODO: better `trionics` tooling: |                 # TODO: better `trionics` tooling: | ||||||
|  | @ -653,7 +642,7 @@ async def _invoke( | ||||||
|             #   itself calls `ctx._maybe_cancel_and_set_remote_error()` |             #   itself calls `ctx._maybe_cancel_and_set_remote_error()` | ||||||
|             #   which cancels the scope presuming the input error |             #   which cancels the scope presuming the input error | ||||||
|             #   is not a `.cancel_acked` pleaser. |             #   is not a `.cancel_acked` pleaser. | ||||||
|             if rpc_ctx_cs.cancelled_caught: |             if ctx._scope.cancelled_caught: | ||||||
|                 our_uid: tuple = actor.uid |                 our_uid: tuple = actor.uid | ||||||
| 
 | 
 | ||||||
|                 # first check for and raise any remote error |                 # first check for and raise any remote error | ||||||
|  | @ -663,7 +652,9 @@ async def _invoke( | ||||||
|                 if re := ctx._remote_error: |                 if re := ctx._remote_error: | ||||||
|                     ctx._maybe_raise_remote_err(re) |                     ctx._maybe_raise_remote_err(re) | ||||||
| 
 | 
 | ||||||
|                 if rpc_ctx_cs.cancel_called: |                 cs: CancelScope = ctx._scope | ||||||
|  | 
 | ||||||
|  |                 if cs.cancel_called: | ||||||
|                     canceller: tuple = ctx.canceller |                     canceller: tuple = ctx.canceller | ||||||
|                     explain: str = f'{ctx.side!r}-side task was cancelled by ' |                     explain: str = f'{ctx.side!r}-side task was cancelled by ' | ||||||
| 
 | 
 | ||||||
|  | @ -689,14 +680,8 @@ async def _invoke( | ||||||
|                     elif canceller == ctx.chan.uid: |                     elif canceller == ctx.chan.uid: | ||||||
|                         explain += f'its {ctx.peer_side!r}-side peer' |                         explain += f'its {ctx.peer_side!r}-side peer' | ||||||
| 
 | 
 | ||||||
|                     elif canceller == our_uid: |  | ||||||
|                         explain += 'itself' |  | ||||||
| 
 |  | ||||||
|                     elif canceller: |  | ||||||
|                         explain += 'a remote peer' |  | ||||||
| 
 |  | ||||||
|                     else: |                     else: | ||||||
|                         explain += 'an unknown cause?' |                         explain += 'a remote peer' | ||||||
| 
 | 
 | ||||||
|                     explain += ( |                     explain += ( | ||||||
|                         add_div(message=explain) |                         add_div(message=explain) | ||||||
|  | @ -1253,7 +1238,7 @@ async def process_messages( | ||||||
|                 'Exiting IPC msg loop with final msg\n\n' |                 'Exiting IPC msg loop with final msg\n\n' | ||||||
|                 f'<= peer: {chan.uid}\n' |                 f'<= peer: {chan.uid}\n' | ||||||
|                 f'  |_{chan}\n\n' |                 f'  |_{chan}\n\n' | ||||||
|                 # f'{pretty_struct.pformat(msg)}' |                 f'{pretty_struct.pformat(msg)}' | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         log.runtime(message) |         log.runtime(message) | ||||||
|  |  | ||||||
|  | @ -1046,10 +1046,6 @@ class Actor: | ||||||
|                 # TODO: another `Struct` for rtvs.. |                 # TODO: another `Struct` for rtvs.. | ||||||
|                 rvs: dict[str, Any] = spawnspec._runtime_vars |                 rvs: dict[str, Any] = spawnspec._runtime_vars | ||||||
|                 if rvs['_debug_mode']: |                 if rvs['_debug_mode']: | ||||||
|                     from .devx import ( |  | ||||||
|                         enable_stack_on_sig, |  | ||||||
|                         maybe_init_greenback, |  | ||||||
|                     ) |  | ||||||
|                     try: |                     try: | ||||||
|                         # TODO: maybe return some status msgs upward |                         # TODO: maybe return some status msgs upward | ||||||
|                         # to that we can emit them in `con_status` |                         # to that we can emit them in `con_status` | ||||||
|  | @ -1057,27 +1053,13 @@ class Actor: | ||||||
|                         log.devx( |                         log.devx( | ||||||
|                             'Enabling `stackscope` traces on SIGUSR1' |                             'Enabling `stackscope` traces on SIGUSR1' | ||||||
|                         ) |                         ) | ||||||
|  |                         from .devx import enable_stack_on_sig | ||||||
|                         enable_stack_on_sig() |                         enable_stack_on_sig() | ||||||
| 
 |  | ||||||
|                     except ImportError: |                     except ImportError: | ||||||
|                         log.warning( |                         log.warning( | ||||||
|                             '`stackscope` not installed for use in debug mode!' |                             '`stackscope` not installed for use in debug mode!' | ||||||
|                         ) |                         ) | ||||||
| 
 | 
 | ||||||
|                     if rvs.get('use_greenback', False): |  | ||||||
|                         maybe_mod: ModuleType|None = await maybe_init_greenback() |  | ||||||
|                         if maybe_mod: |  | ||||||
|                             log.devx( |  | ||||||
|                                 'Activated `greenback` ' |  | ||||||
|                                 'for `tractor.pause_from_sync()` support!' |  | ||||||
|                             ) |  | ||||||
|                         else: |  | ||||||
|                             rvs['use_greenback'] = False |  | ||||||
|                             log.warning( |  | ||||||
|                                 '`greenback` not installed for use in debug mode!\n' |  | ||||||
|                                 '`tractor.pause_from_sync()` not available!' |  | ||||||
|                             ) |  | ||||||
| 
 |  | ||||||
|                 rvs['_is_root'] = False |                 rvs['_is_root'] = False | ||||||
|                 _state._runtime_vars.update(rvs) |                 _state._runtime_vars.update(rvs) | ||||||
| 
 | 
 | ||||||
|  | @ -1735,8 +1717,8 @@ async def async_main( | ||||||
| 
 | 
 | ||||||
|                 # Register with the arbiter if we're told its addr |                 # Register with the arbiter if we're told its addr | ||||||
|                 log.runtime( |                 log.runtime( | ||||||
|                     f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' |                     f'Registering `{actor.name}` ->\n' | ||||||
|                     # ^-TODO-^ we should instead show the maddr here^^ |                     f'{pformat(accept_addrs)}' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|                 # TODO: ideally we don't fan out to all registrars |                 # TODO: ideally we don't fan out to all registrars | ||||||
|  | @ -1794,15 +1776,9 @@ async def async_main( | ||||||
| 
 | 
 | ||||||
|         # Blocks here as expected until the root nursery is |         # Blocks here as expected until the root nursery is | ||||||
|         # killed (i.e. this actor is cancelled or signalled by the parent) |         # killed (i.e. this actor is cancelled or signalled by the parent) | ||||||
|     except Exception as internal_err: |     except Exception as err: | ||||||
|         # ls: ExitStack = actor.lifetime_stack |         log.runtime("Closing all actor lifetime contexts") | ||||||
|         # log.cancel( |         actor.lifetime_stack.close() | ||||||
|         #     'Closing all actor-lifetime exec scopes\n\n' |  | ||||||
|         #     f'|_{ls}\n' |  | ||||||
|         # ) |  | ||||||
|         # # _debug.pause_from_sync() |  | ||||||
|         # # await _debug.pause(shield=True) |  | ||||||
|         # ls.close() |  | ||||||
| 
 | 
 | ||||||
|         if not is_registered: |         if not is_registered: | ||||||
|             # TODO: I guess we could try to connect back |             # TODO: I guess we could try to connect back | ||||||
|  | @ -1810,8 +1786,7 @@ async def async_main( | ||||||
|             # once we have that all working with std streams locking? |             # once we have that all working with std streams locking? | ||||||
|             log.exception( |             log.exception( | ||||||
|                 f"Actor errored and failed to register with arbiter " |                 f"Actor errored and failed to register with arbiter " | ||||||
|                 f"@ {actor.reg_addrs[0]}?" |                 f"@ {actor.reg_addrs[0]}?") | ||||||
|             ) |  | ||||||
|             log.error( |             log.error( | ||||||
|                 "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" |                 "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n" | ||||||
|                 "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" |                 "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n" | ||||||
|  | @ -1824,44 +1799,25 @@ async def async_main( | ||||||
|         if actor._parent_chan: |         if actor._parent_chan: | ||||||
|             await try_ship_error_to_remote( |             await try_ship_error_to_remote( | ||||||
|                 actor._parent_chan, |                 actor._parent_chan, | ||||||
|                 internal_err, |                 err, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         # always! |         # always! | ||||||
|         match internal_err: |         match err: | ||||||
|             case ContextCancelled(): |             case ContextCancelled(): | ||||||
|                 log.cancel( |                 log.cancel( | ||||||
|                     f'Actor: {actor.uid} was task-context-cancelled with,\n' |                     f'Actor: {actor.uid} was task-context-cancelled with,\n' | ||||||
|                     f'str(internal_err)' |                     f'str(err)' | ||||||
|                 ) |                 ) | ||||||
|             case _: |             case _: | ||||||
|                 log.exception("Actor errored:") |                 log.exception("Actor errored:") | ||||||
|         raise |         raise | ||||||
| 
 | 
 | ||||||
|     finally: |     finally: | ||||||
|         teardown_msg: str = ( |         log.runtime( | ||||||
|             'Runtime nursery complete' |             'Runtime nursery complete' | ||||||
|  |             '-> Closing all actor lifetime contexts..' | ||||||
|         ) |         ) | ||||||
| 
 |  | ||||||
|         ls: ExitStack = actor.lifetime_stack |  | ||||||
|         cbs: list[Callable] = [ |  | ||||||
|             repr(tup[1].__wrapped__) |  | ||||||
|             for tup in ls._exit_callbacks |  | ||||||
|         ] |  | ||||||
|         if cbs: |  | ||||||
|             cbs_str: str = '\n'.join(cbs) |  | ||||||
|             teardown_msg += ( |  | ||||||
|                 '-> Closing all actor-lifetime callbacks\n\n' |  | ||||||
|                 f'|_{cbs_str}\n' |  | ||||||
|             ) |  | ||||||
|             # XXX NOTE XXX this will cause an error which |  | ||||||
|             # prevents any `infected_aio` actor from continuing |  | ||||||
|             # and any callbacks in the `ls` here WILL NOT be |  | ||||||
|             # called!! |  | ||||||
|             # await _debug.pause(shield=True) |  | ||||||
| 
 |  | ||||||
|         ls.close() |  | ||||||
| 
 |  | ||||||
|         # tear down all lifetime contexts if not in guest mode |         # tear down all lifetime contexts if not in guest mode | ||||||
|         # XXX: should this just be in the entrypoint? |         # XXX: should this just be in the entrypoint? | ||||||
|         actor.lifetime_stack.close() |         actor.lifetime_stack.close() | ||||||
|  | @ -1900,28 +1856,23 @@ async def async_main( | ||||||
|                     failed = True |                     failed = True | ||||||
| 
 | 
 | ||||||
|                 if failed: |                 if failed: | ||||||
|                     teardown_msg += ( |                     log.warning( | ||||||
|                         f'-> Failed to unregister {actor.name} from ' |                         f'Failed to unregister {actor.name} from ' | ||||||
|                         f'registar @ {addr}\n' |                         f'registar @ {addr}' | ||||||
|                     ) |                     ) | ||||||
|                     # log.warning( |  | ||||||
| 
 | 
 | ||||||
|         # Ensure all peers (actors connected to us as clients) are finished |         # Ensure all peers (actors connected to us as clients) are finished | ||||||
|         if not actor._no_more_peers.is_set(): |         if not actor._no_more_peers.is_set(): | ||||||
|             if any( |             if any( | ||||||
|                 chan.connected() for chan in chain(*actor._peers.values()) |                 chan.connected() for chan in chain(*actor._peers.values()) | ||||||
|             ): |             ): | ||||||
|                 teardown_msg += ( |                 log.runtime( | ||||||
|                     f'-> Waiting for remaining peers {actor._peers} to clear..\n' |                     f"Waiting for remaining peers {actor._peers} to clear") | ||||||
|                 ) |  | ||||||
|                 log.runtime(teardown_msg) |  | ||||||
|                 with CancelScope(shield=True): |                 with CancelScope(shield=True): | ||||||
|                     await actor._no_more_peers.wait() |                     await actor._no_more_peers.wait() | ||||||
|  |         log.runtime("All peer channels are complete") | ||||||
| 
 | 
 | ||||||
|         teardown_msg += ('-> All peer channels are complete\n') |     log.runtime("Runtime completed") | ||||||
| 
 |  | ||||||
|     teardown_msg += ('Actor runtime completed') |  | ||||||
|     log.info(teardown_msg) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: rename to `Registry` and move to `._discovery`! | # TODO: rename to `Registry` and move to `._discovery`! | ||||||
|  |  | ||||||
|  | @ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = { | ||||||
|     '_root_mailbox': (None, None), |     '_root_mailbox': (None, None), | ||||||
|     '_registry_addrs': [], |     '_registry_addrs': [], | ||||||
| 
 | 
 | ||||||
|     # for `tractor.pause_from_sync()` & `breakpoint()` support |     # for `breakpoint()` support | ||||||
|     'use_greenback': False, |     'use_greenback': False, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel): | ||||||
|     @property |     @property | ||||||
|     def ctx(self) -> Context: |     def ctx(self) -> Context: | ||||||
|         ''' |         ''' | ||||||
|         A read-only ref to this stream's inter-actor-task `Context`. |         This stream's IPC `Context` ref. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         return self._ctx |         return self._ctx | ||||||
|  |  | ||||||
|  | @ -80,7 +80,6 @@ class ActorNursery: | ||||||
|     ''' |     ''' | ||||||
|     def __init__( |     def __init__( | ||||||
|         self, |         self, | ||||||
|         # TODO: maybe def these as fields of a struct looking type? |  | ||||||
|         actor: Actor, |         actor: Actor, | ||||||
|         ria_nursery: trio.Nursery, |         ria_nursery: trio.Nursery, | ||||||
|         da_nursery: trio.Nursery, |         da_nursery: trio.Nursery, | ||||||
|  | @ -89,10 +88,8 @@ class ActorNursery: | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         # self.supervisor = supervisor  # TODO |         # self.supervisor = supervisor  # TODO | ||||||
|         self._actor: Actor = actor |         self._actor: Actor = actor | ||||||
| 
 |         self._ria_nursery = ria_nursery | ||||||
|         # TODO: rename to `._tn` for our conventional "task-nursery" |  | ||||||
|         self._da_nursery = da_nursery |         self._da_nursery = da_nursery | ||||||
| 
 |  | ||||||
|         self._children: dict[ |         self._children: dict[ | ||||||
|             tuple[str, str], |             tuple[str, str], | ||||||
|             tuple[ |             tuple[ | ||||||
|  | @ -101,13 +98,15 @@ class ActorNursery: | ||||||
|                 Portal | None, |                 Portal | None, | ||||||
|             ] |             ] | ||||||
|         ] = {} |         ] = {} | ||||||
| 
 |         # portals spawned with ``run_in_actor()`` are | ||||||
|  |         # cancelled when their "main" result arrives | ||||||
|  |         self._cancel_after_result_on_exit: set = set() | ||||||
|         self.cancelled: bool = False |         self.cancelled: bool = False | ||||||
|         self._join_procs = trio.Event() |         self._join_procs = trio.Event() | ||||||
|         self._at_least_one_child_in_debug: bool = False |         self._at_least_one_child_in_debug: bool = False | ||||||
|         self.errors = errors |         self.errors = errors | ||||||
|         self._scope_error: BaseException|None = None |  | ||||||
|         self.exited = trio.Event() |         self.exited = trio.Event() | ||||||
|  |         self._scope_error: BaseException|None = None | ||||||
| 
 | 
 | ||||||
|         # NOTE: when no explicit call is made to |         # NOTE: when no explicit call is made to | ||||||
|         # `.open_root_actor()` by application code, |         # `.open_root_actor()` by application code, | ||||||
|  | @ -117,13 +116,6 @@ class ActorNursery: | ||||||
|         # and syncing purposes to any actor opened nurseries. |         # and syncing purposes to any actor opened nurseries. | ||||||
|         self._implicit_runtime_started: bool = False |         self._implicit_runtime_started: bool = False | ||||||
| 
 | 
 | ||||||
|         # TODO: remove the `.run_in_actor()` API and thus this 2ndary |  | ||||||
|         # nursery when that API get's moved outside this primitive! |  | ||||||
|         self._ria_nursery = ria_nursery |  | ||||||
|         # portals spawned with ``run_in_actor()`` are |  | ||||||
|         # cancelled when their "main" result arrives |  | ||||||
|         self._cancel_after_result_on_exit: set = set() |  | ||||||
| 
 |  | ||||||
|     async def start_actor( |     async def start_actor( | ||||||
|         self, |         self, | ||||||
|         name: str, |         name: str, | ||||||
|  | @ -134,14 +126,10 @@ class ActorNursery: | ||||||
|         rpc_module_paths: list[str]|None = None, |         rpc_module_paths: list[str]|None = None, | ||||||
|         enable_modules: list[str]|None = None, |         enable_modules: list[str]|None = None, | ||||||
|         loglevel: str|None = None,  # set log level per subactor |         loglevel: str|None = None,  # set log level per subactor | ||||||
|  |         nursery: trio.Nursery|None = None, | ||||||
|         debug_mode: bool|None = None, |         debug_mode: bool|None = None, | ||||||
|         infect_asyncio: bool = False, |         infect_asyncio: bool = False, | ||||||
| 
 | 
 | ||||||
|         # TODO: ideally we can rm this once we no longer have |  | ||||||
|         # a `._ria_nursery` since the dependent APIs have been |  | ||||||
|         # removed! |  | ||||||
|         nursery: trio.Nursery|None = None, |  | ||||||
| 
 |  | ||||||
|     ) -> Portal: |     ) -> Portal: | ||||||
|         ''' |         ''' | ||||||
|         Start a (daemon) actor: an process that has no designated |         Start a (daemon) actor: an process that has no designated | ||||||
|  | @ -212,7 +200,6 @@ class ActorNursery: | ||||||
|     #  |_ dynamic @context decoration on child side |     #  |_ dynamic @context decoration on child side | ||||||
|     #  |_ implicit `Portal.open_context() as (ctx, first):` |     #  |_ implicit `Portal.open_context() as (ctx, first):` | ||||||
|     #    and `return first` on parent side. |     #    and `return first` on parent side. | ||||||
|     #  |_ mention how it's similar to `trio-parallel` API? |  | ||||||
|     # -[ ] use @api_frame on the wrapper |     # -[ ] use @api_frame on the wrapper | ||||||
|     async def run_in_actor( |     async def run_in_actor( | ||||||
|         self, |         self, | ||||||
|  | @ -282,14 +269,11 @@ class ActorNursery: | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         Cancel this actor-nursery by instructing each subactor's |         Cancel this nursery by instructing each subactor to cancel | ||||||
|         runtime to cancel and wait for all underlying sub-processes |         itself and wait for all subactors to terminate. | ||||||
|         to terminate. |  | ||||||
| 
 | 
 | ||||||
|         If `hard_kill` is set then kill the processes directly using |         If ``hard_killl`` is set to ``True`` then kill the processes | ||||||
|         the spawning-backend's API/OS-machinery without any attempt |         directly without any far end graceful ``trio`` cancellation. | ||||||
|         at (graceful) `trio`-style cancellation using our |  | ||||||
|         `Actor.cancel()`. |  | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __runtimeframe__: int = 1  # noqa |         __runtimeframe__: int = 1  # noqa | ||||||
|  | @ -645,12 +629,8 @@ async def open_nursery( | ||||||
|             f'|_{an}\n' |             f'|_{an}\n' | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |         # shutdown runtime if it was started | ||||||
|         if implicit_runtime: |         if implicit_runtime: | ||||||
|             # shutdown runtime if it was started and report noisly |  | ||||||
|             # that we're did so. |  | ||||||
|             msg += '=> Shutting down actor runtime <=\n' |             msg += '=> Shutting down actor runtime <=\n' | ||||||
|             log.info(msg) |  | ||||||
| 
 | 
 | ||||||
|         else: |         log.info(msg) | ||||||
|             # keep noise low during std operation. |  | ||||||
|             log.runtime(msg) |  | ||||||
|  |  | ||||||
|  | @ -29,7 +29,6 @@ from ._debug import ( | ||||||
|     shield_sigint_handler as shield_sigint_handler, |     shield_sigint_handler as shield_sigint_handler, | ||||||
|     open_crash_handler as open_crash_handler, |     open_crash_handler as open_crash_handler, | ||||||
|     maybe_open_crash_handler as maybe_open_crash_handler, |     maybe_open_crash_handler as maybe_open_crash_handler, | ||||||
|     maybe_init_greenback as maybe_init_greenback, |  | ||||||
|     post_mortem as post_mortem, |     post_mortem as post_mortem, | ||||||
|     mk_pdb as mk_pdb, |     mk_pdb as mk_pdb, | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | @ -69,7 +69,6 @@ from trio import ( | ||||||
| import tractor | import tractor | ||||||
| from tractor.log import get_logger | from tractor.log import get_logger | ||||||
| from tractor._context import Context | from tractor._context import Context | ||||||
| from tractor import _state |  | ||||||
| from tractor._state import ( | from tractor._state import ( | ||||||
|     current_actor, |     current_actor, | ||||||
|     is_root_process, |     is_root_process, | ||||||
|  | @ -88,6 +87,9 @@ if TYPE_CHECKING: | ||||||
|     from tractor._runtime import ( |     from tractor._runtime import ( | ||||||
|         Actor, |         Actor, | ||||||
|     ) |     ) | ||||||
|  |     from tractor.msg import ( | ||||||
|  |         _codec, | ||||||
|  |     ) | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
|  | @ -1597,13 +1599,11 @@ async def _pause( | ||||||
|     try: |     try: | ||||||
|         task: Task = current_task() |         task: Task = current_task() | ||||||
|     except RuntimeError as rte: |     except RuntimeError as rte: | ||||||
|         __tracebackhide__: bool = False |  | ||||||
|         log.exception('Failed to get current task?') |         log.exception('Failed to get current task?') | ||||||
|         if actor.is_infected_aio(): |         if actor.is_infected_aio(): | ||||||
|             # mk_pdb().set_trace() |  | ||||||
|             raise RuntimeError( |             raise RuntimeError( | ||||||
|                 '`tractor.pause[_from_sync]()` not yet supported ' |                 '`tractor.pause[_from_sync]()` not yet supported ' | ||||||
|                 'directly (infected) `asyncio` tasks!' |                 'for infected `asyncio` mode!' | ||||||
|             ) from rte |             ) from rte | ||||||
| 
 | 
 | ||||||
|         raise |         raise | ||||||
|  | @ -2163,22 +2163,22 @@ def maybe_import_greenback( | ||||||
|         return False |         return False | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def maybe_init_greenback(**kwargs) -> None|ModuleType: | async def maybe_init_greenback( | ||||||
|     try: |     **kwargs, | ||||||
|         if mod := maybe_import_greenback(**kwargs): | ) -> None|ModuleType: | ||||||
|             await mod.ensure_portal() | 
 | ||||||
|             log.devx( |     if mod := maybe_import_greenback(**kwargs): | ||||||
|                 '`greenback` portal opened!\n' |         await mod.ensure_portal() | ||||||
|                 'Sync debug support activated!\n' |         log.devx( | ||||||
|             ) |             '`greenback` portal opened!\n' | ||||||
|             return mod |             'Sync debug support activated!\n' | ||||||
|     except BaseException: |         ) | ||||||
|         log.exception('Failed to init `greenback`..') |         return mod | ||||||
|         raise |  | ||||||
| 
 | 
 | ||||||
|     return None |     return None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| async def _pause_from_bg_root_thread( | async def _pause_from_bg_root_thread( | ||||||
|     behalf_of_thread: Thread, |     behalf_of_thread: Thread, | ||||||
|     repl: PdbREPL, |     repl: PdbREPL, | ||||||
|  | @ -2399,37 +2399,18 @@ def pause_from_sync( | ||||||
|         else:  # we are presumably the `trio.run()` + main thread |         else:  # we are presumably the `trio.run()` + main thread | ||||||
|             # raises on not-found by default |             # raises on not-found by default | ||||||
|             greenback: ModuleType = maybe_import_greenback() |             greenback: ModuleType = maybe_import_greenback() | ||||||
| 
 |  | ||||||
|             # TODO: how to ensure this is either dynamically (if |  | ||||||
|             # needed) called here (in some bg tn??) or that the |  | ||||||
|             # subactor always already called it? |  | ||||||
|             # greenback: ModuleType = await maybe_init_greenback() |  | ||||||
| 
 |  | ||||||
|             message += f'-> imported {greenback}\n' |             message += f'-> imported {greenback}\n' | ||||||
|             repl_owner: Task = current_task() |             repl_owner: Task = current_task() | ||||||
|             message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' |             message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' | ||||||
|             try: |             out = greenback.await_( | ||||||
|                 out = greenback.await_( |                 _pause( | ||||||
|                     _pause( |                     debug_func=None, | ||||||
|                         debug_func=None, |                     repl=repl, | ||||||
|                         repl=repl, |                     hide_tb=hide_tb, | ||||||
|                         hide_tb=hide_tb, |                     called_from_sync=True, | ||||||
|                         called_from_sync=True, |                     **_pause_kwargs, | ||||||
|                         **_pause_kwargs, |  | ||||||
|                     ) |  | ||||||
|                 ) |                 ) | ||||||
|             except RuntimeError as rte: |             ) | ||||||
|                 if not _state._runtime_vars.get( |  | ||||||
|                         'use_greenback', |  | ||||||
|                         False, |  | ||||||
|                 ): |  | ||||||
|                     raise RuntimeError( |  | ||||||
|                         '`greenback` was never initialized in this actor!?\n\n' |  | ||||||
|                         f'{_state._runtime_vars}\n' |  | ||||||
|                     ) from rte |  | ||||||
| 
 |  | ||||||
|                 raise |  | ||||||
| 
 |  | ||||||
|             if out: |             if out: | ||||||
|                 bg_task, repl = out |                 bg_task, repl = out | ||||||
|                 assert repl is repl |                 assert repl is repl | ||||||
|  | @ -2820,10 +2801,10 @@ def open_crash_handler( | ||||||
|       `trio.run()`. |       `trio.run()`. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     err: BaseException |  | ||||||
|     try: |     try: | ||||||
|         yield |         yield | ||||||
|     except tuple(catch) as err: |     except tuple(catch) as err: | ||||||
|  | 
 | ||||||
|         if type(err) not in ignore: |         if type(err) not in ignore: | ||||||
|             pdbp.xpm() |             pdbp.xpm() | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -234,7 +234,7 @@ def find_caller_info( | ||||||
| _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} | _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: -[x] move all this into new `.devx._frame_stack`! | # TODO: -[x] move all this into new `.devx._code`! | ||||||
| # -[ ] consider rename to _callstack? | # -[ ] consider rename to _callstack? | ||||||
| # -[ ] prolly create a `@runtime_api` dec? | # -[ ] prolly create a `@runtime_api` dec? | ||||||
| #   |_ @api_frame seems better? | #   |_ @api_frame seems better? | ||||||
|  | @ -286,18 +286,3 @@ def api_frame( | ||||||
|     wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache |     wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache | ||||||
|     wrapped.__api_func__: bool = True |     wrapped.__api_func__: bool = True | ||||||
|     return wrapper(wrapped) |     return wrapper(wrapped) | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # TODO: something like this instead of the adhoc frame-unhiding |  | ||||||
| # blocks all over the runtime!! XD |  | ||||||
| # -[ ] ideally we can expect a certain error (set) and if something |  | ||||||
| #     else is raised then all frames below the wrapped one will be |  | ||||||
| #     un-hidden via `__tracebackhide__: bool = False`. |  | ||||||
| # |_ might need to dynamically mutate the code objs like |  | ||||||
| #    `pdbp.hideframe()` does? |  | ||||||
| # -[ ] use this as a `@acm` decorator as introed in 3.10? |  | ||||||
| # @acm |  | ||||||
| # async def unhide_frame_when_not( |  | ||||||
| #     error_set: set[BaseException], |  | ||||||
| # ) -> TracebackType: |  | ||||||
| #     ... |  | ||||||
|  |  | ||||||
|  | @ -1,134 +0,0 @@ | ||||||
| # tractor: structured concurrent "actors". |  | ||||||
| # Copyright 2024-eternity Tyler Goodlet. |  | ||||||
| 
 |  | ||||||
| # This program is free software: you can redistribute it and/or modify |  | ||||||
| # it under the terms of the GNU Affero General Public License as published by |  | ||||||
| # the Free Software Foundation, either version 3 of the License, or |  | ||||||
| # (at your option) any later version. |  | ||||||
| 
 |  | ||||||
| # This program is distributed in the hope that it will be useful, |  | ||||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |  | ||||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the |  | ||||||
| # GNU Affero General Public License for more details. |  | ||||||
| 
 |  | ||||||
| # You should have received a copy of the GNU Affero General Public License |  | ||||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. |  | ||||||
| 
 |  | ||||||
| ''' |  | ||||||
| Daemon subactor as service(s) management and supervision primitives |  | ||||||
| and API. |  | ||||||
| 
 |  | ||||||
| ''' |  | ||||||
| from __future__ import annotations |  | ||||||
| from contextlib import ( |  | ||||||
|     # asynccontextmanager as acm, |  | ||||||
|     contextmanager as cm, |  | ||||||
| ) |  | ||||||
| from collections import defaultdict |  | ||||||
| from typing import ( |  | ||||||
|     Callable, |  | ||||||
|     Any, |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| import trio |  | ||||||
| from trio import TaskStatus |  | ||||||
| from tractor import ( |  | ||||||
|     ActorNursery, |  | ||||||
|     current_actor, |  | ||||||
|     ContextCancelled, |  | ||||||
|     Context, |  | ||||||
|     Portal, |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| from ._util import ( |  | ||||||
|     log,  # sub-sys logger |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # TODO: implement a `@singleton` deco-API for wrapping the below |  | ||||||
| # factory's impl for general actor-singleton use? |  | ||||||
| # |  | ||||||
| # -[ ] go through the options peeps on SO did? |  | ||||||
| #  * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python |  | ||||||
| #  * including @mikenerone's answer |  | ||||||
| #   |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 |  | ||||||
| # |  | ||||||
| # -[ ] put it in `tractor.lowlevel._globals` ? |  | ||||||
| #  * fits with our oustanding actor-local/global feat req? |  | ||||||
| #   |_ https://github.com/goodboy/tractor/issues/55 |  | ||||||
| #  * how can it relate to the `Actor.lifetime_stack` that was |  | ||||||
| #    silently patched in? |  | ||||||
| #   |_ we could implicitly call both of these in the same |  | ||||||
| #     spot in the runtime using the lifetime stack? |  | ||||||
| #    - `open_singleton_cm().__exit__()` |  | ||||||
| #    -`del_singleton()` |  | ||||||
| #   |_ gives SC fixtue semantics to sync code oriented around |  | ||||||
| #     sub-process lifetime? |  | ||||||
| #  * what about with `trio.RunVar`? |  | ||||||
| #   |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar |  | ||||||
| #    - which we'll need for no-GIL cpython (right?) presuming |  | ||||||
| #      multiple `trio.run()` calls in process? |  | ||||||
| # |  | ||||||
| # |  | ||||||
| # @singleton |  | ||||||
| # async def open_service_mngr( |  | ||||||
| #     **init_kwargs, |  | ||||||
| # ) -> ServiceMngr: |  | ||||||
| #     ''' |  | ||||||
| #     Note this function body is invoke IFF no existing singleton instance already |  | ||||||
| #     exists in this proc's memory. |  | ||||||
| 
 |  | ||||||
| #     ''' |  | ||||||
| #     # setup |  | ||||||
| #     yield ServiceMngr(**init_kwargs) |  | ||||||
| #     # teardown |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # a deletion API for explicit instance de-allocation? |  | ||||||
| # @open_service_mngr.deleter |  | ||||||
| # def del_service_mngr() -> None: |  | ||||||
| #     mngr = open_service_mngr._singleton[0] |  | ||||||
| #     open_service_mngr._singleton[0] = None |  | ||||||
| #     del mngr |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # TODO: singleton factory API instead of a class API |  | ||||||
| @cm |  | ||||||
| def open_service_mngr( |  | ||||||
|     *, |  | ||||||
|     _singleton: list[ServiceMngr|None] = [None], |  | ||||||
|     # NOTE; since default values for keyword-args are effectively |  | ||||||
|     # module-vars/globals as per the note from, |  | ||||||
|     # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values |  | ||||||
|     # |  | ||||||
|     # > "The default value is evaluated only once. This makes |  | ||||||
|     #   a difference when the default is a mutable object such as |  | ||||||
|     #   a list, dictionary, or instances of most classes" |  | ||||||
|     # |  | ||||||
|     **init_kwargs, |  | ||||||
| 
 |  | ||||||
| ) -> ServiceMngr: |  | ||||||
|     ''' |  | ||||||
|     Open a multi-subactor-as-service-daemon tree supervisor. |  | ||||||
| 
 |  | ||||||
|     The delivered `ServiceMngr` is a singleton instance for each |  | ||||||
|     actor-process and is allocated on first open and never |  | ||||||
|     de-allocated unless explicitly deleted by al call to |  | ||||||
|     `del_service_mngr()`. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     mngr: ServiceMngr|None |  | ||||||
|     if (mngr := _singleton[0]) is None: |  | ||||||
|         log.info('Allocating a new service mngr!') |  | ||||||
|         mngr = _singleton[0] = ServiceMngr(**init_kwargs) |  | ||||||
|     else: |  | ||||||
|         log.info( |  | ||||||
|             'Using extant service mngr!\n\n' |  | ||||||
|             f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|     with mngr: |  | ||||||
|         yield mngr |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
|  | @ -54,12 +54,11 @@ LOG_FORMAT = ( | ||||||
| DATE_FORMAT = '%b %d %H:%M:%S' | DATE_FORMAT = '%b %d %H:%M:%S' | ||||||
| 
 | 
 | ||||||
| # FYI, ERROR is 40 | # FYI, ERROR is 40 | ||||||
| # TODO: use a `bidict` to avoid the :155 check? |  | ||||||
| CUSTOM_LEVELS: dict[str, int] = { | CUSTOM_LEVELS: dict[str, int] = { | ||||||
|     'TRANSPORT': 5, |     'TRANSPORT': 5, | ||||||
|     'RUNTIME': 15, |     'RUNTIME': 15, | ||||||
|     'DEVX': 17, |     'DEVX': 17, | ||||||
|     'CANCEL': 22, |     'CANCEL': 18, | ||||||
|     'PDB': 500, |     'PDB': 500, | ||||||
| } | } | ||||||
| STD_PALETTE = { | STD_PALETTE = { | ||||||
|  | @ -148,8 +147,6 @@ class StackLevelAdapter(LoggerAdapter): | ||||||
|         Delegate a log call to the underlying logger, after adding |         Delegate a log call to the underlying logger, after adding | ||||||
|         contextual information from this adapter instance. |         contextual information from this adapter instance. | ||||||
| 
 | 
 | ||||||
|         NOTE: all custom level methods (above) delegate to this! |  | ||||||
| 
 |  | ||||||
|         ''' |         ''' | ||||||
|         if self.isEnabledFor(level): |         if self.isEnabledFor(level): | ||||||
|             stacklevel: int = 3 |             stacklevel: int = 3 | ||||||
|  |  | ||||||
|  | @ -41,10 +41,8 @@ import textwrap | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     Callable, |     Callable, | ||||||
|     Protocol, |  | ||||||
|     Type, |     Type, | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
|     TypeVar, |  | ||||||
|     Union, |     Union, | ||||||
| ) | ) | ||||||
| from types import ModuleType | from types import ModuleType | ||||||
|  | @ -183,11 +181,7 @@ def mk_dec( | ||||||
|     dec_hook: Callable|None = None, |     dec_hook: Callable|None = None, | ||||||
| 
 | 
 | ||||||
| ) -> MsgDec: | ) -> MsgDec: | ||||||
|     ''' |  | ||||||
|     Create an IPC msg decoder, normally used as the |  | ||||||
|     `PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`. |  | ||||||
| 
 | 
 | ||||||
|     ''' |  | ||||||
|     return MsgDec( |     return MsgDec( | ||||||
|         _dec=msgpack.Decoder( |         _dec=msgpack.Decoder( | ||||||
|             type=spec,  # like `MsgType[Any]` |             type=spec,  # like `MsgType[Any]` | ||||||
|  | @ -233,13 +227,6 @@ def pformat_msgspec( | ||||||
|     join_char: str = '\n', |     join_char: str = '\n', | ||||||
| 
 | 
 | ||||||
| ) -> str: | ) -> str: | ||||||
|     ''' |  | ||||||
|     Pretty `str` format the `msgspec.msgpack.Decoder.type` attributed |  | ||||||
|     for display in log messages as a nice (maybe multiline) |  | ||||||
|     presentation of all the supported `Struct`s availed for typed |  | ||||||
|     decoding. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     dec: msgpack.Decoder = getattr(codec, 'dec', codec) |     dec: msgpack.Decoder = getattr(codec, 'dec', codec) | ||||||
|     return join_char.join( |     return join_char.join( | ||||||
|         mk_msgspec_table( |         mk_msgspec_table( | ||||||
|  | @ -643,57 +630,31 @@ def limit_msg_spec( | ||||||
| #         # import pdbp; pdbp.set_trace() | #         # import pdbp; pdbp.set_trace() | ||||||
| #         assert ext_codec.pld_spec == extended_spec | #         assert ext_codec.pld_spec == extended_spec | ||||||
| #         yield ext_codec | #         yield ext_codec | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: make something similar to this inside `._codec` such that | ||||||
|  | # user can just pass a type table of some sort? | ||||||
|  | # -[ ] we would need to decode all msgs to `pretty_struct.Struct` | ||||||
|  | #     and then call `.to_dict()` on them? | ||||||
|  | # -[x] we're going to need to re-impl all the stuff changed in the | ||||||
|  | #    runtime port such that it can handle dicts or `Msg`s? | ||||||
| # | # | ||||||
| # ^-TODO-^ is it impossible to make something like this orr!? | # def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: | ||||||
| 
 | #     ''' | ||||||
| # TODO: make an auto-custom hook generator from a set of input custom | #     Deliver a `enc_hook()`/`dec_hook()` pair which does | ||||||
| # types? | #     manual convertion from our above native `Msg` set | ||||||
| # -[ ] below is a proto design using a `TypeCodec` idea? | #     to `dict` equivalent (wire msgs) in order to keep legacy compat | ||||||
|  | #     with the original runtime implementation. | ||||||
| # | # | ||||||
| # type var for the expected interchange-lib's | #     Note: this is is/was primarly used while moving the core | ||||||
| # IPC-transport type when not available as a built-in | #     runtime over to using native `Msg`-struct types wherein we | ||||||
| # serialization output. | #     start with the send side emitting without loading | ||||||
| WireT = TypeVar('WireT') | #     a typed-decoder and then later flipping the switch over to | ||||||
| 
 | #     load to the native struct types once all runtime usage has | ||||||
| 
 | #     been adjusted appropriately. | ||||||
| # TODO: some kinda (decorator) API for built-in subtypes | # | ||||||
| # that builds this implicitly by inspecting the `mro()`? | #     ''' | ||||||
| class TypeCodec(Protocol): | #     return ( | ||||||
|     ''' | #         # enc_to_dict, | ||||||
|     A per-custom-type wire-transport serialization translator | #         dec_from_dict, | ||||||
|     description type. | #     ) | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     src_type: Type |  | ||||||
|     wire_type: WireT |  | ||||||
| 
 |  | ||||||
|     def encode(obj: Type) -> WireT: |  | ||||||
|         ... |  | ||||||
| 
 |  | ||||||
|     def decode( |  | ||||||
|         obj_type: Type[WireT], |  | ||||||
|         obj: WireT, |  | ||||||
|     ) -> Type: |  | ||||||
|         ... |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| class MsgpackTypeCodec(TypeCodec): |  | ||||||
|     ... |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def mk_codec_hooks( |  | ||||||
|     type_codecs: list[TypeCodec], |  | ||||||
| 
 |  | ||||||
| ) -> tuple[Callable, Callable]: |  | ||||||
|     ''' |  | ||||||
|     Deliver a `enc_hook()`/`dec_hook()` pair which handle |  | ||||||
|     manual convertion from an input `Type` set such that whenever |  | ||||||
|     the `TypeCodec.filter()` predicate matches the |  | ||||||
|     `TypeCodec.decode()` is called on the input native object by |  | ||||||
|     the `dec_hook()` and whenever the |  | ||||||
|     `isiinstance(obj, TypeCodec.type)` matches against an |  | ||||||
|     `enc_hook(obj=obj)` the return value is taken from a |  | ||||||
|     `TypeCodec.encode(obj)` callback. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     ... |  | ||||||
|  |  | ||||||
|  | @ -580,15 +580,12 @@ async def drain_to_final_msg( | ||||||
|         # 2. WE DID NOT REQUEST that cancel and thus |         # 2. WE DID NOT REQUEST that cancel and thus | ||||||
|         #    SHOULD RAISE HERE! |         #    SHOULD RAISE HERE! | ||||||
|         except trio.Cancelled as taskc: |         except trio.Cancelled as taskc: | ||||||
|             # from tractor.devx._debug import pause |  | ||||||
|             # await pause(shield=True) |  | ||||||
| 
 | 
 | ||||||
|             # CASE 2: mask the local cancelled-error(s) |             # CASE 2: mask the local cancelled-error(s) | ||||||
|             # only when we are sure the remote error is |             # only when we are sure the remote error is | ||||||
|             # the source cause of this local task's |             # the source cause of this local task's | ||||||
|             # cancellation. |             # cancellation. | ||||||
|             ctx.maybe_raise( |             ctx.maybe_raise( | ||||||
|                 hide_tb=hide_tb, |  | ||||||
|                 # TODO: when use this/ |                 # TODO: when use this/ | ||||||
|                 # from_src_exc=taskc, |                 # from_src_exc=taskc, | ||||||
|             ) |             ) | ||||||
|  |  | ||||||
|  | @ -34,9 +34,6 @@ from pprint import ( | ||||||
|     saferepr, |     saferepr, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| from tractor.log import get_logger |  | ||||||
| 
 |  | ||||||
| log = get_logger() |  | ||||||
| # TODO: auto-gen type sig for input func both for | # TODO: auto-gen type sig for input func both for | ||||||
| # type-msgs and logging of RPC tasks? | # type-msgs and logging of RPC tasks? | ||||||
| # taken and modified from: | # taken and modified from: | ||||||
|  | @ -146,13 +143,7 @@ def pformat( | ||||||
| 
 | 
 | ||||||
|         else:  # the `pprint` recursion-safe format: |         else:  # the `pprint` recursion-safe format: | ||||||
|             # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr |             # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr | ||||||
|             try: |             val_str: str = saferepr(v) | ||||||
|                 val_str: str = saferepr(v) |  | ||||||
|             except Exception: |  | ||||||
|                 log.exception( |  | ||||||
|                     'Failed to `saferepr({type(struct)})` !?\n' |  | ||||||
|                 ) |  | ||||||
|             return _Struct.__repr__(struct) |  | ||||||
| 
 | 
 | ||||||
|         # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! |         # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! | ||||||
|         obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') |         obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') | ||||||
|  | @ -203,20 +194,12 @@ class Struct( | ||||||
|         return sin_props |         return sin_props | ||||||
| 
 | 
 | ||||||
|     pformat = pformat |     pformat = pformat | ||||||
|     # __repr__ = pformat |  | ||||||
|     # __str__ = __repr__ = pformat |     # __str__ = __repr__ = pformat | ||||||
|     # TODO: use a pprint.PrettyPrinter instance around ONLY rendering |     # TODO: use a pprint.PrettyPrinter instance around ONLY rendering | ||||||
|     # inside a known tty? |     # inside a known tty? | ||||||
|     # def __repr__(self) -> str: |     # def __repr__(self) -> str: | ||||||
|     #     ... |     #     ... | ||||||
|     def __repr__(self) -> str: |     __repr__ = pformat | ||||||
|         try: |  | ||||||
|             return pformat(self) |  | ||||||
|         except Exception: |  | ||||||
|             log.exception( |  | ||||||
|                 f'Failed to `pformat({type(self)})` !?\n' |  | ||||||
|             ) |  | ||||||
|             return _Struct.__repr__(self) |  | ||||||
| 
 | 
 | ||||||
|     def copy( |     def copy( | ||||||
|         self, |         self, | ||||||
|  |  | ||||||
|  | @ -156,12 +156,11 @@ class BroadcastState(Struct): | ||||||
| 
 | 
 | ||||||
| class BroadcastReceiver(ReceiveChannel): | class BroadcastReceiver(ReceiveChannel): | ||||||
|     ''' |     ''' | ||||||
|     A memory receive channel broadcaster which is non-lossy for |     A memory receive channel broadcaster which is non-lossy for the | ||||||
|     the fastest consumer. |     fastest consumer. | ||||||
| 
 | 
 | ||||||
|     Additional consumer tasks can receive all produced values by |     Additional consumer tasks can receive all produced values by registering | ||||||
|     registering with ``.subscribe()`` and receiving from the new |     with ``.subscribe()`` and receiving from the new instance it delivers. | ||||||
|     instance it delivers. |  | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     def __init__( |     def __init__( | ||||||
|  |  | ||||||
|  | @ -18,12 +18,8 @@ | ||||||
| Async context manager primitives with hard ``trio``-aware semantics | Async context manager primitives with hard ``trio``-aware semantics | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from __future__ import annotations | from contextlib import asynccontextmanager as acm | ||||||
| from contextlib import ( |  | ||||||
|     asynccontextmanager as acm, |  | ||||||
| ) |  | ||||||
| import inspect | import inspect | ||||||
| from types import ModuleType |  | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     AsyncContextManager, |     AsyncContextManager, | ||||||
|  | @ -34,16 +30,13 @@ from typing import ( | ||||||
|     Optional, |     Optional, | ||||||
|     Sequence, |     Sequence, | ||||||
|     TypeVar, |     TypeVar, | ||||||
|     TYPE_CHECKING, |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| import trio | import trio | ||||||
|  | 
 | ||||||
| from tractor._state import current_actor | from tractor._state import current_actor | ||||||
| from tractor.log import get_logger | from tractor.log import get_logger | ||||||
| 
 | 
 | ||||||
| if TYPE_CHECKING: |  | ||||||
|     from tractor import ActorNursery |  | ||||||
| 
 |  | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
|  | @ -53,10 +46,8 @@ T = TypeVar("T") | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def maybe_open_nursery( | async def maybe_open_nursery( | ||||||
|     nursery: trio.Nursery|ActorNursery|None = None, |     nursery: trio.Nursery | None = None, | ||||||
|     shield: bool = False, |     shield: bool = False, | ||||||
|     lib: ModuleType = trio, |  | ||||||
| 
 |  | ||||||
| ) -> AsyncGenerator[trio.Nursery, Any]: | ) -> AsyncGenerator[trio.Nursery, Any]: | ||||||
|     ''' |     ''' | ||||||
|     Create a new nursery if None provided. |     Create a new nursery if None provided. | ||||||
|  | @ -67,12 +58,13 @@ async def maybe_open_nursery( | ||||||
|     if nursery is not None: |     if nursery is not None: | ||||||
|         yield nursery |         yield nursery | ||||||
|     else: |     else: | ||||||
|         async with lib.open_nursery() as nursery: |         async with trio.open_nursery() as nursery: | ||||||
|             nursery.cancel_scope.shield = shield |             nursery.cancel_scope.shield = shield | ||||||
|             yield nursery |             yield nursery | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def _enter_and_wait( | async def _enter_and_wait( | ||||||
|  | 
 | ||||||
|     mngr: AsyncContextManager[T], |     mngr: AsyncContextManager[T], | ||||||
|     unwrapped: dict[int, T], |     unwrapped: dict[int, T], | ||||||
|     all_entered: trio.Event, |     all_entered: trio.Event, | ||||||
|  | @ -99,6 +91,7 @@ async def _enter_and_wait( | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def gather_contexts( | async def gather_contexts( | ||||||
|  | 
 | ||||||
|     mngrs: Sequence[AsyncContextManager[T]], |     mngrs: Sequence[AsyncContextManager[T]], | ||||||
| 
 | 
 | ||||||
| ) -> AsyncGenerator[ | ) -> AsyncGenerator[ | ||||||
|  | @ -109,17 +102,15 @@ async def gather_contexts( | ||||||
|     None, |     None, | ||||||
| ]: | ]: | ||||||
|     ''' |     ''' | ||||||
|     Concurrently enter a sequence of async context managers (acms), |     Concurrently enter a sequence of async context managers, each in | ||||||
|     each from a separate `trio` task and deliver the unwrapped |     a separate ``trio`` task and deliver the unwrapped values in the | ||||||
|     `yield`-ed values in the same order once all managers have entered. |     same order once all managers have entered. On exit all contexts are | ||||||
|  |     subsequently and concurrently exited. | ||||||
| 
 | 
 | ||||||
|     On exit, all acms are subsequently and concurrently exited. |     This function is somewhat similar to common usage of | ||||||
| 
 |     ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in | ||||||
|     This function is somewhat similar to a batch of non-blocking |     combo with ``asyncio.gather()`` except the managers are concurrently | ||||||
|     calls to `contextlib.AsyncExitStack.enter_async_context()` |     entered and exited, and cancellation just works. | ||||||
|     (inside a loop) *in combo with* a `asyncio.gather()` to get the |  | ||||||
|     `.__aenter__()`-ed values, except the managers are both |  | ||||||
|     concurrently entered and exited and *cancellation just works*(R). |  | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     seed: int = id(mngrs) |     seed: int = id(mngrs) | ||||||
|  | @ -219,10 +210,9 @@ async def maybe_open_context( | ||||||
| 
 | 
 | ||||||
| ) -> AsyncIterator[tuple[bool, T]]: | ) -> AsyncIterator[tuple[bool, T]]: | ||||||
|     ''' |     ''' | ||||||
|     Maybe open an async-context-manager (acm) if there is not already |     Maybe open a context manager if there is not already a _Cached | ||||||
|     a `_Cached` version for the provided (input) `key` for *this* actor. |     version for the provided ``key`` for *this* actor. Return the | ||||||
| 
 |     _Cached instance on a _Cache hit. | ||||||
|     Return the `_Cached` instance on a _Cache hit. |  | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     fid = id(acm_func) |     fid = id(acm_func) | ||||||
|  | @ -283,13 +273,8 @@ async def maybe_open_context( | ||||||
|     else: |     else: | ||||||
|         _Cache.users += 1 |         _Cache.users += 1 | ||||||
|         log.runtime( |         log.runtime( | ||||||
|             f'Re-using cached resource for user {_Cache.users}\n\n' |             f'Reusing resource for `_Cache` user {_Cache.users}\n\n' | ||||||
|             f'{ctx_key!r} -> {type(yielded)}\n' |             f'{ctx_key!r} -> {yielded!r}\n' | ||||||
| 
 |  | ||||||
|             # TODO: make this work with values but without |  | ||||||
|             # `msgspec.Struct` causing frickin crashes on field-type |  | ||||||
|             # lookups.. |  | ||||||
|             # f'{ctx_key!r} -> {yielded!r}\n' |  | ||||||
|         ) |         ) | ||||||
|         lock.release() |         lock.release() | ||||||
|         yield True, yielded |         yield True, yielded | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue