Compare commits
	
		
			7 Commits 
		
	
	
		
			4fbd469c33
			...
			af3745684c
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | af3745684c | |
|  | 3907cba68e | |
|  | e3d59964af | |
|  | ba83bab776 | |
|  | 18d440c207 | |
|  | edac717613 | |
|  | 7e93b81a83 | 
|  | @ -11,9 +11,6 @@ from typing import ( | |||
|     Type, | ||||
|     Union, | ||||
| ) | ||||
| from contextvars import ( | ||||
|     Context, | ||||
| ) | ||||
| 
 | ||||
| from msgspec import ( | ||||
|     structs, | ||||
|  | @ -27,6 +24,7 @@ import tractor | |||
| from tractor import ( | ||||
|     _state, | ||||
|     MsgTypeError, | ||||
|     Context, | ||||
| ) | ||||
| from tractor.msg import ( | ||||
|     _codec, | ||||
|  | @ -41,7 +39,7 @@ from tractor.msg import ( | |||
| from tractor.msg.types import ( | ||||
|     _payload_msgs, | ||||
|     log, | ||||
|     Msg, | ||||
|     PayloadMsg, | ||||
|     Started, | ||||
|     mk_msg_spec, | ||||
| ) | ||||
|  | @ -61,7 +59,7 @@ def mk_custom_codec( | |||
|     uid: tuple[str, str] = tractor.current_actor().uid | ||||
| 
 | ||||
|     # XXX NOTE XXX: despite defining `NamespacePath` as a type | ||||
|     # field on our `Msg.pld`, we still need a enc/dec_hook() pair | ||||
|     # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair | ||||
|     # to cast to/from that type on the wire. See the docs: | ||||
|     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||
| 
 | ||||
|  | @ -321,12 +319,12 @@ def dec_type_union( | |||
|     import importlib | ||||
|     types: list[Type] = [] | ||||
|     for type_name in type_names: | ||||
|         for ns in [ | ||||
|         for mod in [ | ||||
|             typing, | ||||
|             importlib.import_module(__name__), | ||||
|         ]: | ||||
|             if type_ref := getattr( | ||||
|                 ns, | ||||
|                 mod, | ||||
|                 type_name, | ||||
|                 False, | ||||
|             ): | ||||
|  | @ -744,7 +742,7 @@ def chk_pld_type( | |||
|     # 'Error',  .pld: ErrorData | ||||
| 
 | ||||
|     codec: MsgCodec = mk_codec( | ||||
|         # NOTE: this ONLY accepts `Msg.pld` fields of a specified | ||||
|         # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified | ||||
|         # type union. | ||||
|         ipc_pld_spec=payload_spec, | ||||
|     ) | ||||
|  | @ -752,7 +750,7 @@ def chk_pld_type( | |||
|     # make a one-off dec to compare with our `MsgCodec` instance | ||||
|     # which does the below `mk_msg_spec()` call internally | ||||
|     ipc_msg_spec: Union[Type[Struct]] | ||||
|     msg_types: list[Msg[payload_spec]] | ||||
|     msg_types: list[PayloadMsg[payload_spec]] | ||||
|     ( | ||||
|         ipc_msg_spec, | ||||
|         msg_types, | ||||
|  | @ -761,7 +759,7 @@ def chk_pld_type( | |||
|     ) | ||||
|     _enc = msgpack.Encoder() | ||||
|     _dec = msgpack.Decoder( | ||||
|         type=ipc_msg_spec or Any,  # like `Msg[Any]` | ||||
|         type=ipc_msg_spec or Any,  # like `PayloadMsg[Any]` | ||||
|     ) | ||||
| 
 | ||||
|     assert ( | ||||
|  | @ -806,7 +804,7 @@ def chk_pld_type( | |||
|             'cid': '666', | ||||
|             'pld': pld, | ||||
|         } | ||||
|         enc_msg: Msg = typedef(**kwargs) | ||||
|         enc_msg: PayloadMsg = typedef(**kwargs) | ||||
| 
 | ||||
|         _wire_bytes: bytes = _enc.encode(enc_msg) | ||||
|         wire_bytes: bytes = codec.enc.encode(enc_msg) | ||||
|  | @ -883,25 +881,16 @@ def test_limit_msgspec(): | |||
|             debug_mode=True | ||||
|         ): | ||||
| 
 | ||||
|             # ensure we can round-trip a boxing `Msg` | ||||
|             # ensure we can round-trip a boxing `PayloadMsg` | ||||
|             assert chk_pld_type( | ||||
|                 # Msg, | ||||
|                 Any, | ||||
|                 None, | ||||
|                 payload_spec=Any, | ||||
|                 pld=None, | ||||
|                 expect_roundtrip=True, | ||||
|             ) | ||||
| 
 | ||||
|             # TODO: don't need this any more right since | ||||
|             # `msgspec>=0.15` has the nice generics stuff yah?? | ||||
|             # | ||||
|             # manually override the type annot of the payload | ||||
|             # field and ensure it propagates to all msg-subtypes. | ||||
|             # Msg.__annotations__['pld'] = Any | ||||
| 
 | ||||
|             # verify that a mis-typed payload value won't decode | ||||
|             assert not chk_pld_type( | ||||
|                 # Msg, | ||||
|                 int, | ||||
|                 payload_spec=int, | ||||
|                 pld='doggy', | ||||
|             ) | ||||
| 
 | ||||
|  | @ -913,18 +902,16 @@ def test_limit_msgspec(): | |||
|                 value: Any | ||||
| 
 | ||||
|             assert not chk_pld_type( | ||||
|                 # Msg, | ||||
|                 CustomPayload, | ||||
|                 payload_spec=CustomPayload, | ||||
|                 pld='doggy', | ||||
|             ) | ||||
| 
 | ||||
|             assert chk_pld_type( | ||||
|                 # Msg, | ||||
|                 CustomPayload, | ||||
|                 payload_spec=CustomPayload, | ||||
|                 pld=CustomPayload(name='doggy', value='urmom') | ||||
|             ) | ||||
| 
 | ||||
|             # uhh bc we can `.pause_from_sync()` now! :surfer: | ||||
|             # yah, we can `.pause_from_sync()` now! | ||||
|             # breakpoint() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  |  | |||
|  | @ -1336,6 +1336,23 @@ def test_shield_pause( | |||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: better error for "non-ideal" usage from the root actor. | ||||
| # -[ ] if called from an async scope emit a message that suggests | ||||
| #    using `await tractor.pause()` instead since it's less overhead | ||||
| #    (in terms of `greenback` and/or extra threads) and if it's from | ||||
| #    a sync scope suggest that usage must first call | ||||
| #    `ensure_portal()` in the (eventual parent) async calling scope? | ||||
| def test_sync_pause_from_bg_task_in_root_actor_(): | ||||
|     ''' | ||||
|     When used from the root actor, normally we can only implicitly | ||||
|     support `.pause_from_sync()` from the main-parent-task (that | ||||
|     opens the runtime via `open_root_actor()`) since `greenback` | ||||
|     requires a `.ensure_portal()` call per `trio.Task` where it is | ||||
|     used. | ||||
| 
 | ||||
|     ''' | ||||
|     ... | ||||
| 
 | ||||
| # TODO: needs ANSI code stripping tho, see `assert_before()` # above! | ||||
| def test_correct_frames_below_hidden(): | ||||
|     ''' | ||||
|  |  | |||
|  | @ -19,7 +19,7 @@ from tractor._testing import ( | |||
| @pytest.fixture | ||||
| def run_example_in_subproc( | ||||
|     loglevel: str, | ||||
|     testdir, | ||||
|     testdir: pytest.Testdir, | ||||
|     reg_addr: tuple[str, int], | ||||
| ): | ||||
| 
 | ||||
|  |  | |||
|  | @ -49,6 +49,7 @@ from ._exceptions import ( | |||
|     ModuleNotExposed as ModuleNotExposed, | ||||
|     MsgTypeError as MsgTypeError, | ||||
|     RemoteActorError as RemoteActorError, | ||||
|     TransportClosed as TransportClosed, | ||||
| ) | ||||
| from .devx import ( | ||||
|     breakpoint as breakpoint, | ||||
|  |  | |||
|  | @ -906,8 +906,59 @@ class StreamOverrun( | |||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| class TransportClosed(trio.ClosedResourceError): | ||||
|     "Underlying channel transport was closed prior to use" | ||||
| class TransportClosed(trio.BrokenResourceError): | ||||
|     ''' | ||||
|     IPC transport (protocol) connection was closed or broke and | ||||
|     indicates that the wrapping communication `Channel` can no longer | ||||
|     be used to send/receive msgs from the remote peer. | ||||
| 
 | ||||
|     ''' | ||||
|     def __init__( | ||||
|         self, | ||||
|         message: str, | ||||
|         loglevel: str = 'transport', | ||||
|         cause: BaseException|None = None, | ||||
|         raise_on_report: bool = False, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         self.message: str = message | ||||
|         self._loglevel = loglevel | ||||
|         super().__init__(message) | ||||
| 
 | ||||
|         if cause is not None: | ||||
|             self.__cause__ = cause | ||||
| 
 | ||||
|         # flag to toggle whether the msg loop should raise | ||||
|         # the exc in its `TransportClosed` handler block. | ||||
|         self._raise_on_report = raise_on_report | ||||
| 
 | ||||
|     def report_n_maybe_raise( | ||||
|         self, | ||||
|         message: str|None = None, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Using the init-specified log level emit a logging report | ||||
|         for this error. | ||||
| 
 | ||||
|         ''' | ||||
|         message: str = message or self.message | ||||
|         # when a cause is set, slap it onto the log emission. | ||||
|         if cause := self.__cause__: | ||||
|             cause_tb_str: str = ''.join( | ||||
|                 traceback.format_tb(cause.__traceback__) | ||||
|             ) | ||||
|             message += ( | ||||
|                 f'{cause_tb_str}\n'  # tb | ||||
|                 f'    {cause}\n'  # exc repr | ||||
|             ) | ||||
| 
 | ||||
|         getattr(log, self._loglevel)(message) | ||||
| 
 | ||||
|         # some errors we want to blow up from | ||||
|         # inside the RPC msg loop | ||||
|         if self._raise_on_report: | ||||
|             raise self from cause | ||||
| 
 | ||||
| 
 | ||||
| class NoResult(RuntimeError): | ||||
|  |  | |||
							
								
								
									
										284
									
								
								tractor/_ipc.py
								
								
								
								
							
							
						
						
									
										284
									
								
								tractor/_ipc.py
								
								
								
								
							|  | @ -54,7 +54,7 @@ from tractor._exceptions import ( | |||
| ) | ||||
| from tractor.msg import ( | ||||
|     _ctxvar_MsgCodec, | ||||
|     _codec, | ||||
|     # _codec,  XXX see `self._codec` sanity/debug checks | ||||
|     MsgCodec, | ||||
|     types as msgtypes, | ||||
|     pretty_struct, | ||||
|  | @ -65,8 +65,18 @@ log = get_logger(__name__) | |||
| _is_windows = platform.system() == 'Windows' | ||||
| 
 | ||||
| 
 | ||||
| def get_stream_addrs(stream: trio.SocketStream) -> tuple: | ||||
|     # should both be IP sockets | ||||
| def get_stream_addrs( | ||||
|     stream: trio.SocketStream | ||||
| ) -> tuple[ | ||||
|     tuple[str, int],  # local | ||||
|     tuple[str, int],  # remote | ||||
| ]: | ||||
|     ''' | ||||
|     Return the `trio` streaming transport prot's socket-addrs for | ||||
|     both the local and remote sides as a pair. | ||||
| 
 | ||||
|     ''' | ||||
|     # rn, should both be IP sockets | ||||
|     lsockname = stream.socket.getsockname() | ||||
|     rsockname = stream.socket.getpeername() | ||||
|     return ( | ||||
|  | @ -75,17 +85,22 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple: | |||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: this should be our `Union[*msgtypes.__spec__]` now right? | ||||
| MsgType = TypeVar("MsgType") | ||||
| 
 | ||||
| # TODO: consider using a generic def and indexing with our eventual | ||||
| # msg definition/types? | ||||
| # - https://docs.python.org/3/library/typing.html#typing.Protocol | ||||
| # - https://jcristharif.com/msgspec/usage.html#structs | ||||
| # from tractor.msg.types import MsgType | ||||
| # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? | ||||
| # => BLEH, except can't bc prots must inherit typevar or param-spec | ||||
| #   vars.. | ||||
| MsgType = TypeVar('MsgType') | ||||
| 
 | ||||
| 
 | ||||
| # TODO: break up this mod into a subpkg so we can start adding new | ||||
| # backends and move this type stuff into a dedicated file.. Bo | ||||
| # | ||||
| @runtime_checkable | ||||
| class MsgTransport(Protocol[MsgType]): | ||||
| # | ||||
| # ^-TODO-^ consider using a generic def and indexing with our | ||||
| # eventual msg definition/types? | ||||
| # - https://docs.python.org/3/library/typing.html#typing.Protocol | ||||
| 
 | ||||
|     stream: trio.SocketStream | ||||
|     drained: list[MsgType] | ||||
|  | @ -120,9 +135,9 @@ class MsgTransport(Protocol[MsgType]): | |||
|         ... | ||||
| 
 | ||||
| 
 | ||||
| # TODO: not sure why we have to inherit here, but it seems to be an | ||||
| # issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; | ||||
| # probably should make a `mypy` issue? | ||||
| # TODO: typing oddity.. not sure why we have to inherit here, but it | ||||
| # seems to be an issue with `get_msg_transport()` returning | ||||
| # a `Type[Protocol]`; probably should make a `mypy` issue? | ||||
| class MsgpackTCPStream(MsgTransport): | ||||
|     ''' | ||||
|     A ``trio.SocketStream`` delivering ``msgpack`` formatted data | ||||
|  | @ -145,7 +160,7 @@ class MsgpackTCPStream(MsgTransport): | |||
|         # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||
|         # | ||||
|         # TODO: define this as a `Codec` struct which can be | ||||
|         # overriden dynamically by the application/runtime. | ||||
|         # overriden dynamically by the application/runtime? | ||||
|         codec: tuple[ | ||||
|             Callable[[Any], Any]|None,  # coder | ||||
|             Callable[[type, Any], Any]|None,  # decoder | ||||
|  | @ -160,7 +175,7 @@ class MsgpackTCPStream(MsgTransport): | |||
|         self._laddr, self._raddr = get_stream_addrs(stream) | ||||
| 
 | ||||
|         # create read loop instance | ||||
|         self._agen = self._iter_packets() | ||||
|         self._aiter_pkts = self._iter_packets() | ||||
|         self._send_lock = trio.StrictFIFOLock() | ||||
| 
 | ||||
|         # public i guess? | ||||
|  | @ -174,15 +189,12 @@ class MsgpackTCPStream(MsgTransport): | |||
|         # allow for custom IPC msg interchange format | ||||
|         # dynamic override Bo | ||||
|         self._task = trio.lowlevel.current_task() | ||||
|         self._codec: MsgCodec = ( | ||||
|             codec | ||||
|             or | ||||
|             _codec._ctxvar_MsgCodec.get() | ||||
|         ) | ||||
|         # TODO: mask out before release? | ||||
|         # log.runtime( | ||||
|         #     f'New {self} created with codec\n' | ||||
|         #     f'codec: {self._codec}\n' | ||||
| 
 | ||||
|         # XXX for ctxvar debug only! | ||||
|         # self._codec: MsgCodec = ( | ||||
|         #     codec | ||||
|         #     or | ||||
|         #     _codec._ctxvar_MsgCodec.get() | ||||
|         # ) | ||||
| 
 | ||||
|     async def _iter_packets(self) -> AsyncGenerator[dict, None]: | ||||
|  | @ -190,6 +202,11 @@ class MsgpackTCPStream(MsgTransport): | |||
|         Yield `bytes`-blob decoded packets from the underlying TCP | ||||
|         stream using the current task's `MsgCodec`. | ||||
| 
 | ||||
|         This is a streaming routine implemented as an async generator | ||||
|         func (which was the original design, but could be changed?) | ||||
|         and is allocated by a `.__call__()` inside `.__init__()` where | ||||
|         it is assigned to the `._aiter_pkts` attr. | ||||
| 
 | ||||
|         ''' | ||||
|         decodes_failed: int = 0 | ||||
| 
 | ||||
|  | @ -204,16 +221,82 @@ class MsgpackTCPStream(MsgTransport): | |||
|                 # seem to be getting racy failures here on | ||||
|                 # arbiter/registry name subs.. | ||||
|                 trio.BrokenResourceError, | ||||
|             ): | ||||
|                 raise TransportClosed( | ||||
|                     f'transport {self} was already closed prior ro read' | ||||
|                 ) | ||||
| 
 | ||||
|             ) as trans_err: | ||||
| 
 | ||||
|                 loglevel = 'transport' | ||||
|                 match trans_err: | ||||
|                     # case ( | ||||
|                     #     ConnectionResetError() | ||||
|                     # ): | ||||
|                     #     loglevel = 'transport' | ||||
| 
 | ||||
|                     # peer actor (graceful??) TCP EOF but `tricycle` | ||||
|                     # seems to raise a 0-bytes-read? | ||||
|                     case ValueError() if ( | ||||
|                         'unclean EOF' in trans_err.args[0] | ||||
|                     ): | ||||
|                         pass | ||||
| 
 | ||||
|                     # peer actor (task) prolly shutdown quickly due | ||||
|                     # to cancellation | ||||
|                     case trio.BrokenResourceError() if ( | ||||
|                         'Connection reset by peer' in trans_err.args[0] | ||||
|                     ): | ||||
|                         pass | ||||
| 
 | ||||
|                     # unless the disconnect condition falls under "a | ||||
|                     # normal operation breakage" we usualy console warn | ||||
|                     # about it. | ||||
|                     case _: | ||||
|                         loglevel: str = 'warning' | ||||
| 
 | ||||
| 
 | ||||
|                 raise TransportClosed( | ||||
|                     message=( | ||||
|                         f'IPC transport already closed by peer\n' | ||||
|                         f'x)> {type(trans_err)}\n' | ||||
|                         f' |_{self}\n' | ||||
|                     ), | ||||
|                     loglevel=loglevel, | ||||
|                 ) from trans_err | ||||
| 
 | ||||
|             # XXX definitely can happen if transport is closed | ||||
|             # manually by another `trio.lowlevel.Task` in the | ||||
|             # same actor; we use this in some simulated fault | ||||
|             # testing for ex, but generally should never happen | ||||
|             # under normal operation! | ||||
|             # | ||||
|             # NOTE: as such we always re-raise this error from the | ||||
|             #       RPC msg loop! | ||||
|             except trio.ClosedResourceError as closure_err: | ||||
|                 raise TransportClosed( | ||||
|                     message=( | ||||
|                         f'IPC transport already manually closed locally?\n' | ||||
|                         f'x)> {type(closure_err)} \n' | ||||
|                         f' |_{self}\n' | ||||
|                     ), | ||||
|                     loglevel='error', | ||||
|                     raise_on_report=( | ||||
|                         closure_err.args[0] == 'another task closed this fd' | ||||
|                         or | ||||
|                         closure_err.args[0] in ['another task closed this fd'] | ||||
|                     ), | ||||
|                 ) from closure_err | ||||
| 
 | ||||
|             # graceful TCP EOF disconnect | ||||
|             if header == b'': | ||||
|                 raise TransportClosed( | ||||
|                     f'transport {self} was already closed prior ro read' | ||||
|                     message=( | ||||
|                         f'IPC transport already gracefully closed\n' | ||||
|                         f')>\n' | ||||
|                         f'|_{self}\n' | ||||
|                     ), | ||||
|                     loglevel='transport', | ||||
|                     # cause=???  # handy or no? | ||||
|                 ) | ||||
| 
 | ||||
|             size: int | ||||
|             size, = struct.unpack("<I", header) | ||||
| 
 | ||||
|             log.transport(f'received header {size}')  # type: ignore | ||||
|  | @ -225,33 +308,20 @@ class MsgpackTCPStream(MsgTransport): | |||
|                 # the current `MsgCodec`. | ||||
|                 codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||
| 
 | ||||
|                 # TODO: mask out before release? | ||||
|                 if self._codec.pld_spec != codec.pld_spec: | ||||
|                     # assert ( | ||||
|                     #     task := trio.lowlevel.current_task() | ||||
|                     # ) is not self._task | ||||
|                     # self._task = task | ||||
|                     self._codec = codec | ||||
|                     log.runtime( | ||||
|                         f'Using new codec in {self}.recv()\n' | ||||
|                         f'codec: {self._codec}\n\n' | ||||
|                         f'msg_bytes: {msg_bytes}\n' | ||||
|                     ) | ||||
|                 # XXX for ctxvar debug only! | ||||
|                 # if self._codec.pld_spec != codec.pld_spec: | ||||
|                 #     assert ( | ||||
|                 #         task := trio.lowlevel.current_task() | ||||
|                 #     ) is not self._task | ||||
|                 #     self._task = task | ||||
|                 #     self._codec = codec | ||||
|                 #     log.runtime( | ||||
|                 #         f'Using new codec in {self}.recv()\n' | ||||
|                 #         f'codec: {self._codec}\n\n' | ||||
|                 #         f'msg_bytes: {msg_bytes}\n' | ||||
|                 #     ) | ||||
|                 yield codec.decode(msg_bytes) | ||||
| 
 | ||||
|                 # TODO: remove, was only for orig draft impl | ||||
|                 # testing. | ||||
|                 # | ||||
|                 # curr_codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||
|                 # obj = curr_codec.decode(msg_bytes) | ||||
|                 # if ( | ||||
|                 #     curr_codec is not | ||||
|                 #     _codec._def_msgspec_codec | ||||
|                 # ): | ||||
|                 #     print(f'OBJ: {obj}\n') | ||||
|                 # | ||||
|                 # yield obj | ||||
| 
 | ||||
|             # XXX NOTE: since the below error derives from | ||||
|             # `DecodeError` we need to catch is specially | ||||
|             # and always raise such that spec violations | ||||
|  | @ -295,7 +365,8 @@ class MsgpackTCPStream(MsgTransport): | |||
|         msg: msgtypes.MsgType, | ||||
| 
 | ||||
|         strict_types: bool = True, | ||||
|         # hide_tb: bool = False, | ||||
|         hide_tb: bool = False, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Send a msgpack encoded py-object-blob-as-msg over TCP. | ||||
|  | @ -304,21 +375,24 @@ class MsgpackTCPStream(MsgTransport): | |||
|         invalid msg type | ||||
| 
 | ||||
|         ''' | ||||
|         # __tracebackhide__: bool = hide_tb | ||||
|         __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|         # XXX see `trio._sync.AsyncContextManagerMixin` for details | ||||
|         # on the `.acquire()`/`.release()` sequencing.. | ||||
|         async with self._send_lock: | ||||
| 
 | ||||
|             # NOTE: lookup the `trio.Task.context`'s var for | ||||
|             # the current `MsgCodec`. | ||||
|             codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||
| 
 | ||||
|             # TODO: mask out before release? | ||||
|             if self._codec.pld_spec != codec.pld_spec: | ||||
|                 self._codec = codec | ||||
|                 log.runtime( | ||||
|                     f'Using new codec in {self}.send()\n' | ||||
|                     f'codec: {self._codec}\n\n' | ||||
|                     f'msg: {msg}\n' | ||||
|                 ) | ||||
|             # XXX for ctxvar debug only! | ||||
|             # if self._codec.pld_spec != codec.pld_spec: | ||||
|             #     self._codec = codec | ||||
|             #     log.runtime( | ||||
|             #         f'Using new codec in {self}.send()\n' | ||||
|             #         f'codec: {self._codec}\n\n' | ||||
|             #         f'msg: {msg}\n' | ||||
|             #     ) | ||||
| 
 | ||||
|             if type(msg) not in msgtypes.__msg_types__: | ||||
|                 if strict_types: | ||||
|  | @ -352,6 +426,16 @@ class MsgpackTCPStream(MsgTransport): | |||
|             size: bytes = struct.pack("<I", len(bytes_data)) | ||||
|             return await self.stream.send_all(size + bytes_data) | ||||
| 
 | ||||
|         # ?TODO? does it help ever to dynamically show this | ||||
|         # frame? | ||||
|         # try: | ||||
|         #     <the-above_code> | ||||
|         # except BaseException as _err: | ||||
|         #     err = _err | ||||
|         #     if not isinstance(err, MsgTypeError): | ||||
|         #         __tracebackhide__: bool = False | ||||
|         #     raise | ||||
| 
 | ||||
|     @property | ||||
|     def laddr(self) -> tuple[str, int]: | ||||
|         return self._laddr | ||||
|  | @ -361,7 +445,7 @@ class MsgpackTCPStream(MsgTransport): | |||
|         return self._raddr | ||||
| 
 | ||||
|     async def recv(self) -> Any: | ||||
|         return await self._agen.asend(None) | ||||
|         return await self._aiter_pkts.asend(None) | ||||
| 
 | ||||
|     async def drain(self) -> AsyncIterator[dict]: | ||||
|         ''' | ||||
|  | @ -378,7 +462,7 @@ class MsgpackTCPStream(MsgTransport): | |||
|                 yield msg | ||||
| 
 | ||||
|     def __aiter__(self): | ||||
|         return self._agen | ||||
|         return self._aiter_pkts | ||||
| 
 | ||||
|     def connected(self) -> bool: | ||||
|         return self.stream.socket.fileno() != -1 | ||||
|  | @ -433,7 +517,7 @@ class Channel: | |||
|         # set after handshake - always uid of far end | ||||
|         self.uid: tuple[str, str]|None = None | ||||
| 
 | ||||
|         self._agen = self._aiter_recv() | ||||
|         self._aiter_msgs = self._iter_msgs() | ||||
|         self._exc: Exception|None = None  # set if far end actor errors | ||||
|         self._closed: bool = False | ||||
| 
 | ||||
|  | @ -497,8 +581,6 @@ class Channel: | |||
|         ) | ||||
|         return self._transport | ||||
| 
 | ||||
|     # TODO: something simliar at the IPC-`Context` | ||||
|     # level so as to support  | ||||
|     @cm | ||||
|     def apply_codec( | ||||
|         self, | ||||
|  | @ -517,6 +599,7 @@ class Channel: | |||
|         finally: | ||||
|             self._transport.codec = orig | ||||
| 
 | ||||
|     # TODO: do a .src/.dst: str for maddrs? | ||||
|     def __repr__(self) -> str: | ||||
|         if not self._transport: | ||||
|             return '<Channel with inactive transport?>' | ||||
|  | @ -560,27 +643,43 @@ class Channel: | |||
|         ) | ||||
|         return transport | ||||
| 
 | ||||
|     # TODO: something like, | ||||
|     # `pdbp.hideframe_on(errors=[MsgTypeError])` | ||||
|     # instead of the `try/except` hack we have rn.. | ||||
|     # seems like a pretty useful thing to have in general | ||||
|     # along with being able to filter certain stack frame(s / sets) | ||||
|     # possibly based on the current log-level? | ||||
|     async def send( | ||||
|         self, | ||||
|         payload: Any, | ||||
| 
 | ||||
|         # hide_tb: bool = False, | ||||
|         hide_tb: bool = False, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Send a coded msg-blob over the transport. | ||||
| 
 | ||||
|         ''' | ||||
|         # __tracebackhide__: bool = hide_tb | ||||
|         log.transport( | ||||
|             '=> send IPC msg:\n\n' | ||||
|             f'{pformat(payload)}\n' | ||||
|         )  # type: ignore | ||||
|         assert self._transport | ||||
|         await self._transport.send( | ||||
|             payload, | ||||
|             # hide_tb=hide_tb, | ||||
|         ) | ||||
|         __tracebackhide__: bool = hide_tb | ||||
|         try: | ||||
|             log.transport( | ||||
|                 '=> send IPC msg:\n\n' | ||||
|                 f'{pformat(payload)}\n' | ||||
|             ) | ||||
|             # assert self._transport  # but why typing? | ||||
|             await self._transport.send( | ||||
|                 payload, | ||||
|                 hide_tb=hide_tb, | ||||
|             ) | ||||
|         except BaseException as _err: | ||||
|             err = _err  # bind for introspection | ||||
|             if not isinstance(_err, MsgTypeError): | ||||
|                 # assert err | ||||
|                 __tracebackhide__: bool = False | ||||
|             else: | ||||
|                 assert err.cid | ||||
| 
 | ||||
|             raise | ||||
| 
 | ||||
|     async def recv(self) -> Any: | ||||
|         assert self._transport | ||||
|  | @ -617,8 +716,11 @@ class Channel: | |||
|         await self.aclose(*args) | ||||
| 
 | ||||
|     def __aiter__(self): | ||||
|         return self._agen | ||||
|         return self._aiter_msgs | ||||
| 
 | ||||
|     # ?TODO? run any reconnection sequence? | ||||
|     # -[ ] prolly should be impl-ed as deco-API? | ||||
|     # | ||||
|     # async def _reconnect(self) -> None: | ||||
|     #     """Handle connection failures by polling until a reconnect can be | ||||
|     #     established. | ||||
|  | @ -636,7 +738,6 @@ class Channel: | |||
|     #             else: | ||||
|     #                 log.transport("Stream connection re-established!") | ||||
| 
 | ||||
|     #                 # TODO: run any reconnection sequence | ||||
|     #                 # on_recon = self._recon_seq | ||||
|     #                 # if on_recon: | ||||
|     #                 #     await on_recon(self) | ||||
|  | @ -650,11 +751,17 @@ class Channel: | |||
|     #                     " for re-establishment") | ||||
|     #             await trio.sleep(1) | ||||
| 
 | ||||
|     async def _aiter_recv( | ||||
|     async def _iter_msgs( | ||||
|         self | ||||
|     ) -> AsyncGenerator[Any, None]: | ||||
|         ''' | ||||
|         Async iterate items from underlying stream. | ||||
|         Yield `MsgType` IPC msgs decoded and deliverd from | ||||
|         an underlying `MsgTransport` protocol. | ||||
| 
 | ||||
|         This is a streaming routine alo implemented as an async-gen | ||||
|         func (same a `MsgTransport._iter_pkts()`) gets allocated by | ||||
|         a `.__call__()` inside `.__init__()` where it is assigned to | ||||
|         the `._aiter_msgs` attr. | ||||
| 
 | ||||
|         ''' | ||||
|         assert self._transport | ||||
|  | @ -680,15 +787,6 @@ class Channel: | |||
|                         case _: | ||||
|                             yield msg | ||||
| 
 | ||||
|                     # TODO: if we were gonna do this it should be | ||||
|                     # done up at the `MsgStream` layer! | ||||
|                     # | ||||
|                     # sent = yield item | ||||
|                     # if sent is not None: | ||||
|                     #     # optimization, passing None through all the | ||||
|                     #     # time is pointless | ||||
|                     #     await self._transport.send(sent) | ||||
| 
 | ||||
|             except trio.BrokenResourceError: | ||||
| 
 | ||||
|                 # if not self._autorecon: | ||||
|  |  | |||
|  | @ -68,7 +68,7 @@ from .msg import ( | |||
|     MsgCodec, | ||||
|     PayloadT, | ||||
|     NamespacePath, | ||||
|     pretty_struct, | ||||
|     # pretty_struct, | ||||
|     _ops as msgops, | ||||
| ) | ||||
| from tractor.msg.types import ( | ||||
|  | @ -89,6 +89,16 @@ if TYPE_CHECKING: | |||
| log = get_logger('tractor') | ||||
| 
 | ||||
| 
 | ||||
| # ?TODO? move to a `tractor.lowlevel._rpc` with the below | ||||
| # func-type-cases implemented "on top of" `@context` defs: | ||||
| # -[ ] std async func helper decorated with `@rpc_func`? | ||||
| # -[ ] `Portal.open_stream_from()` with async-gens? | ||||
| #  |_ possibly a duplex form of this with a | ||||
| #    `sent_from_peer = yield send_to_peer` form, which would require | ||||
| #    syncing the send/recv side with possibly `.receive_nowait()` | ||||
| #    on each `yield`? | ||||
| # -[ ] some kinda `@rpc_acm` maybe that does a fixture style with | ||||
| #     user only defining a single-`yield` generator-func? | ||||
| async def _invoke_non_context( | ||||
|     actor: Actor, | ||||
|     cancel_scope: CancelScope, | ||||
|  | @ -108,8 +118,9 @@ async def _invoke_non_context( | |||
|     ] = trio.TASK_STATUS_IGNORED, | ||||
| ): | ||||
|     __tracebackhide__: bool = True | ||||
|     cs: CancelScope|None = None  # ref when activated | ||||
| 
 | ||||
|     # TODO: can we unify this with the `context=True` impl below? | ||||
|     # ?TODO? can we unify this with the `context=True` impl below? | ||||
|     if inspect.isasyncgen(coro): | ||||
|         await chan.send( | ||||
|             StartAck( | ||||
|  | @ -160,10 +171,6 @@ async def _invoke_non_context( | |||
|                 functype='asyncgen', | ||||
|             ) | ||||
|         ) | ||||
|         # XXX: the async-func may spawn further tasks which push | ||||
|         # back values like an async-generator would but must | ||||
|         # manualy construct the response dict-packet-responses as | ||||
|         # above | ||||
|         with cancel_scope as cs: | ||||
|             ctx._scope = cs | ||||
|             task_status.started(ctx) | ||||
|  | @ -175,15 +182,13 @@ async def _invoke_non_context( | |||
|             await chan.send( | ||||
|                 Stop(cid=cid) | ||||
|             ) | ||||
| 
 | ||||
|     # simplest function/method request-response pattern | ||||
|     # XXX: in the most minimally used case, just a scheduled internal runtime | ||||
|     # call to `Actor._cancel_task()` from the ctx-peer task since we | ||||
|     # don't (yet) have a dedicated IPC msg. | ||||
|     # ------ - ------ | ||||
|     else: | ||||
|         # regular async function/method | ||||
|         # XXX: possibly just a scheduled `Actor._cancel_task()` | ||||
|         # from a remote request to cancel some `Context`. | ||||
|         # ------ - ------ | ||||
|         # TODO: ideally we unify this with the above `context=True` | ||||
|         # block such that for any remote invocation ftype, we | ||||
|         # always invoke the far end RPC task scheduling the same | ||||
|         # way: using the linked IPC context machinery. | ||||
|         failed_resp: bool = False | ||||
|         try: | ||||
|             ack = StartAck( | ||||
|  | @ -354,8 +359,15 @@ async def _errors_relayed_via_ipc( | |||
|             # channel. | ||||
|             task_status.started(err) | ||||
| 
 | ||||
|         # always reraise KBIs so they propagate at the sys-process level. | ||||
|         if isinstance(err, KeyboardInterrupt): | ||||
|         # always propagate KBIs at the sys-process level. | ||||
|         if ( | ||||
|             isinstance(err, KeyboardInterrupt) | ||||
| 
 | ||||
|             # ?TODO? except when running in asyncio mode? | ||||
|             # |_ wut if you want to open a `@context` FROM an | ||||
|             # infected_aio task? | ||||
|             # and not actor.is_infected_aio() | ||||
|         ): | ||||
|             raise | ||||
| 
 | ||||
|     # RPC task bookeeping. | ||||
|  | @ -458,7 +470,6 @@ async def _invoke( | |||
|     # tb: TracebackType = None | ||||
| 
 | ||||
|     cancel_scope = CancelScope() | ||||
|     cs: CancelScope|None = None  # ref when activated | ||||
|     ctx = actor.get_context( | ||||
|         chan=chan, | ||||
|         cid=cid, | ||||
|  | @ -607,6 +618,8 @@ async def _invoke( | |||
|         #     `@context` marked RPC function. | ||||
|         # - `._portal` is never set. | ||||
|         try: | ||||
|             tn: trio.Nursery | ||||
|             rpc_ctx_cs: CancelScope | ||||
|             async with ( | ||||
|                 trio.open_nursery() as tn, | ||||
|                 msgops.maybe_limit_plds( | ||||
|  | @ -616,7 +629,7 @@ async def _invoke( | |||
|                 ), | ||||
|             ): | ||||
|                 ctx._scope_nursery = tn | ||||
|                 ctx._scope = tn.cancel_scope | ||||
|                 rpc_ctx_cs = ctx._scope = tn.cancel_scope | ||||
|                 task_status.started(ctx) | ||||
| 
 | ||||
|                 # TODO: better `trionics` tooling: | ||||
|  | @ -642,7 +655,7 @@ async def _invoke( | |||
|             #   itself calls `ctx._maybe_cancel_and_set_remote_error()` | ||||
|             #   which cancels the scope presuming the input error | ||||
|             #   is not a `.cancel_acked` pleaser. | ||||
|             if ctx._scope.cancelled_caught: | ||||
|             if rpc_ctx_cs.cancelled_caught: | ||||
|                 our_uid: tuple = actor.uid | ||||
| 
 | ||||
|                 # first check for and raise any remote error | ||||
|  | @ -652,9 +665,7 @@ async def _invoke( | |||
|                 if re := ctx._remote_error: | ||||
|                     ctx._maybe_raise_remote_err(re) | ||||
| 
 | ||||
|                 cs: CancelScope = ctx._scope | ||||
| 
 | ||||
|                 if cs.cancel_called: | ||||
|                 if rpc_ctx_cs.cancel_called: | ||||
|                     canceller: tuple = ctx.canceller | ||||
|                     explain: str = f'{ctx.side!r}-side task was cancelled by ' | ||||
| 
 | ||||
|  | @ -680,9 +691,15 @@ async def _invoke( | |||
|                     elif canceller == ctx.chan.uid: | ||||
|                         explain += f'its {ctx.peer_side!r}-side peer' | ||||
| 
 | ||||
|                     else: | ||||
|                     elif canceller == our_uid: | ||||
|                         explain += 'itself' | ||||
| 
 | ||||
|                     elif canceller: | ||||
|                         explain += 'a remote peer' | ||||
| 
 | ||||
|                     else: | ||||
|                         explain += 'an unknown cause?' | ||||
| 
 | ||||
|                     explain += ( | ||||
|                         add_div(message=explain) | ||||
|                         + | ||||
|  | @ -911,7 +928,10 @@ async def process_messages( | |||
|                     f'IPC msg from peer\n' | ||||
|                     f'<= {chan.uid}\n\n' | ||||
| 
 | ||||
|                     # TODO: avoid fmting depending on loglevel for perf? | ||||
|                     # TODO: use of the pprinting of structs is | ||||
|                     # FRAGILE and should prolly not be | ||||
|                     # | ||||
|                     # avoid fmting depending on loglevel for perf? | ||||
|                     # -[ ] specifically `pretty_struct.pformat()` sub-call..? | ||||
|                     #   - how to only log-level-aware actually call this? | ||||
|                     # -[ ] use `.msg.pretty_struct` here now instead! | ||||
|  | @ -1177,7 +1197,7 @@ async def process_messages( | |||
|                 parent_chan=chan, | ||||
|             ) | ||||
| 
 | ||||
|     except TransportClosed: | ||||
|     except TransportClosed as tc: | ||||
|         # channels "breaking" (for TCP streams by EOF or 104 | ||||
|         # connection-reset) is ok since we don't have a teardown | ||||
|         # handshake for them (yet) and instead we simply bail out of | ||||
|  | @ -1185,12 +1205,20 @@ async def process_messages( | |||
|         # up.. | ||||
|         # | ||||
|         # TODO: maybe add a teardown handshake? and, | ||||
|         # -[ ] don't show this msg if it's an ephemeral discovery ep call? | ||||
|         # -[x] don't show this msg if it's an ephemeral discovery ep call? | ||||
|         #  |_ see the below `.report_n_maybe_raise()` impl as well as | ||||
|         #     tc-exc input details in `MsgpackTCPStream._iter_pkts()` | ||||
|         #     for different read-failure cases. | ||||
|         # -[ ] figure out how this will break with other transports? | ||||
|         log.runtime( | ||||
|             f'IPC channel closed abruptly\n' | ||||
|             f'<=x peer: {chan.uid}\n' | ||||
|             f'   |_{chan.raddr}\n' | ||||
|         tc.report_n_maybe_raise( | ||||
|             message=( | ||||
|                 f'peer IPC channel closed abruptly?\n\n' | ||||
|                 f'<=x {chan}\n' | ||||
|                 f'  |_{chan.raddr}\n\n' | ||||
|             ) | ||||
|             + | ||||
|             tc.message | ||||
| 
 | ||||
|         ) | ||||
| 
 | ||||
|         # transport **WAS** disconnected | ||||
|  | @ -1238,7 +1266,7 @@ async def process_messages( | |||
|                 'Exiting IPC msg loop with final msg\n\n' | ||||
|                 f'<= peer: {chan.uid}\n' | ||||
|                 f'  |_{chan}\n\n' | ||||
|                 f'{pretty_struct.pformat(msg)}' | ||||
|                 # f'{pretty_struct.pformat(msg)}' | ||||
|             ) | ||||
| 
 | ||||
|         log.runtime(message) | ||||
|  |  | |||
|  | @ -54,11 +54,12 @@ LOG_FORMAT = ( | |||
| DATE_FORMAT = '%b %d %H:%M:%S' | ||||
| 
 | ||||
| # FYI, ERROR is 40 | ||||
| # TODO: use a `bidict` to avoid the :155 check? | ||||
| CUSTOM_LEVELS: dict[str, int] = { | ||||
|     'TRANSPORT': 5, | ||||
|     'RUNTIME': 15, | ||||
|     'DEVX': 17, | ||||
|     'CANCEL': 18, | ||||
|     'CANCEL': 22, | ||||
|     'PDB': 500, | ||||
| } | ||||
| STD_PALETTE = { | ||||
|  | @ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter): | |||
|         Delegate a log call to the underlying logger, after adding | ||||
|         contextual information from this adapter instance. | ||||
| 
 | ||||
|         NOTE: all custom level methods (above) delegate to this! | ||||
| 
 | ||||
|         ''' | ||||
|         if self.isEnabledFor(level): | ||||
|             stacklevel: int = 3 | ||||
|  |  | |||
|  | @ -34,6 +34,9 @@ from pprint import ( | |||
|     saferepr, | ||||
| ) | ||||
| 
 | ||||
| from tractor.log import get_logger | ||||
| 
 | ||||
| log = get_logger() | ||||
| # TODO: auto-gen type sig for input func both for | ||||
| # type-msgs and logging of RPC tasks? | ||||
| # taken and modified from: | ||||
|  | @ -143,7 +146,13 @@ def pformat( | |||
| 
 | ||||
|         else:  # the `pprint` recursion-safe format: | ||||
|             # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr | ||||
|             val_str: str = saferepr(v) | ||||
|             try: | ||||
|                 val_str: str = saferepr(v) | ||||
|             except Exception: | ||||
|                 log.exception( | ||||
|                     'Failed to `saferepr({type(struct)})` !?\n' | ||||
|                 ) | ||||
|             return _Struct.__repr__(struct) | ||||
| 
 | ||||
|         # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! | ||||
|         obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') | ||||
|  | @ -194,12 +203,20 @@ class Struct( | |||
|         return sin_props | ||||
| 
 | ||||
|     pformat = pformat | ||||
|     # __repr__ = pformat | ||||
|     # __str__ = __repr__ = pformat | ||||
|     # TODO: use a pprint.PrettyPrinter instance around ONLY rendering | ||||
|     # inside a known tty? | ||||
|     # def __repr__(self) -> str: | ||||
|     #     ... | ||||
|     __repr__ = pformat | ||||
|     def __repr__(self) -> str: | ||||
|         try: | ||||
|             return pformat(self) | ||||
|         except Exception: | ||||
|             log.exception( | ||||
|                 f'Failed to `pformat({type(self)})` !?\n' | ||||
|             ) | ||||
|             return _Struct.__repr__(self) | ||||
| 
 | ||||
|     def copy( | ||||
|         self, | ||||
|  |  | |||
|  | @ -156,11 +156,12 @@ class BroadcastState(Struct): | |||
| 
 | ||||
| class BroadcastReceiver(ReceiveChannel): | ||||
|     ''' | ||||
|     A memory receive channel broadcaster which is non-lossy for the | ||||
|     fastest consumer. | ||||
|     A memory receive channel broadcaster which is non-lossy for | ||||
|     the fastest consumer. | ||||
| 
 | ||||
|     Additional consumer tasks can receive all produced values by registering | ||||
|     with ``.subscribe()`` and receiving from the new instance it delivers. | ||||
|     Additional consumer tasks can receive all produced values by | ||||
|     registering with ``.subscribe()`` and receiving from the new | ||||
|     instance it delivers. | ||||
| 
 | ||||
|     ''' | ||||
|     def __init__( | ||||
|  |  | |||
|  | @ -18,8 +18,12 @@ | |||
| Async context manager primitives with hard ``trio``-aware semantics | ||||
| 
 | ||||
| ''' | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from __future__ import annotations | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| import inspect | ||||
| from types import ModuleType | ||||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncContextManager, | ||||
|  | @ -30,13 +34,16 @@ from typing import ( | |||
|     Optional, | ||||
|     Sequence, | ||||
|     TypeVar, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| 
 | ||||
| from tractor._state import current_actor | ||||
| from tractor.log import get_logger | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from tractor import ActorNursery | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
|  | @ -46,8 +53,10 @@ T = TypeVar("T") | |||
| 
 | ||||
| @acm | ||||
| async def maybe_open_nursery( | ||||
|     nursery: trio.Nursery | None = None, | ||||
|     nursery: trio.Nursery|ActorNursery|None = None, | ||||
|     shield: bool = False, | ||||
|     lib: ModuleType = trio, | ||||
| 
 | ||||
| ) -> AsyncGenerator[trio.Nursery, Any]: | ||||
|     ''' | ||||
|     Create a new nursery if None provided. | ||||
|  | @ -58,13 +67,12 @@ async def maybe_open_nursery( | |||
|     if nursery is not None: | ||||
|         yield nursery | ||||
|     else: | ||||
|         async with trio.open_nursery() as nursery: | ||||
|         async with lib.open_nursery() as nursery: | ||||
|             nursery.cancel_scope.shield = shield | ||||
|             yield nursery | ||||
| 
 | ||||
| 
 | ||||
| async def _enter_and_wait( | ||||
| 
 | ||||
|     mngr: AsyncContextManager[T], | ||||
|     unwrapped: dict[int, T], | ||||
|     all_entered: trio.Event, | ||||
|  | @ -91,7 +99,6 @@ async def _enter_and_wait( | |||
| 
 | ||||
| @acm | ||||
| async def gather_contexts( | ||||
| 
 | ||||
|     mngrs: Sequence[AsyncContextManager[T]], | ||||
| 
 | ||||
| ) -> AsyncGenerator[ | ||||
|  | @ -102,15 +109,17 @@ async def gather_contexts( | |||
|     None, | ||||
| ]: | ||||
|     ''' | ||||
|     Concurrently enter a sequence of async context managers, each in | ||||
|     a separate ``trio`` task and deliver the unwrapped values in the | ||||
|     same order once all managers have entered. On exit all contexts are | ||||
|     subsequently and concurrently exited. | ||||
|     Concurrently enter a sequence of async context managers (acms), | ||||
|     each from a separate `trio` task and deliver the unwrapped | ||||
|     `yield`-ed values in the same order once all managers have entered. | ||||
| 
 | ||||
|     This function is somewhat similar to common usage of | ||||
|     ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in | ||||
|     combo with ``asyncio.gather()`` except the managers are concurrently | ||||
|     entered and exited, and cancellation just works. | ||||
|     On exit, all acms are subsequently and concurrently exited. | ||||
| 
 | ||||
|     This function is somewhat similar to a batch of non-blocking | ||||
|     calls to `contextlib.AsyncExitStack.enter_async_context()` | ||||
|     (inside a loop) *in combo with* a `asyncio.gather()` to get the | ||||
|     `.__aenter__()`-ed values, except the managers are both | ||||
|     concurrently entered and exited and *cancellation just works*(R). | ||||
| 
 | ||||
|     ''' | ||||
|     seed: int = id(mngrs) | ||||
|  | @ -210,9 +219,10 @@ async def maybe_open_context( | |||
| 
 | ||||
| ) -> AsyncIterator[tuple[bool, T]]: | ||||
|     ''' | ||||
|     Maybe open a context manager if there is not already a _Cached | ||||
|     version for the provided ``key`` for *this* actor. Return the | ||||
|     _Cached instance on a _Cache hit. | ||||
|     Maybe open an async-context-manager (acm) if there is not already | ||||
|     a `_Cached` version for the provided (input) `key` for *this* actor. | ||||
| 
 | ||||
|     Return the `_Cached` instance on a _Cache hit. | ||||
| 
 | ||||
|     ''' | ||||
|     fid = id(acm_func) | ||||
|  | @ -273,8 +283,13 @@ async def maybe_open_context( | |||
|     else: | ||||
|         _Cache.users += 1 | ||||
|         log.runtime( | ||||
|             f'Reusing resource for `_Cache` user {_Cache.users}\n\n' | ||||
|             f'{ctx_key!r} -> {yielded!r}\n' | ||||
|             f'Re-using cached resource for user {_Cache.users}\n\n' | ||||
|             f'{ctx_key!r} -> {type(yielded)}\n' | ||||
| 
 | ||||
|             # TODO: make this work with values but without | ||||
|             # `msgspec.Struct` causing frickin crashes on field-type | ||||
|             # lookups.. | ||||
|             # f'{ctx_key!r} -> {yielded!r}\n' | ||||
|         ) | ||||
|         lock.release() | ||||
|         yield True, yielded | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue