Compare commits
	
		
			4 Commits 
		
	
	
		
			3ba46362a9
			...
			67f673bf36
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 67f673bf36 | |
|  | 3a105e2830 | |
|  | a315f01acc | |
|  | 25ffdedc06 | 
|  | @ -7,7 +7,6 @@ B~) | |||
| ''' | ||||
| from typing import ( | ||||
|     Any, | ||||
|     _GenericAlias, | ||||
|     Type, | ||||
|     Union, | ||||
| ) | ||||
|  | @ -26,20 +25,23 @@ from msgspec import ( | |||
| import pytest | ||||
| import tractor | ||||
| from tractor.msg import ( | ||||
|     _def_msgspec_codec, | ||||
|     _codec, | ||||
|     _ctxvar_MsgCodec, | ||||
| 
 | ||||
|     NamespacePath, | ||||
|     MsgCodec, | ||||
|     mk_codec, | ||||
|     apply_codec, | ||||
|     current_msgspec_codec, | ||||
|     current_codec, | ||||
| ) | ||||
| from tractor.msg import types | ||||
| from tractor.msg import ( | ||||
|     types, | ||||
| ) | ||||
| from tractor import _state | ||||
| from tractor.msg.types import ( | ||||
|     # PayloadT, | ||||
|     Msg, | ||||
|     # Started, | ||||
|     Started, | ||||
|     mk_msg_spec, | ||||
| ) | ||||
| import trio | ||||
|  | @ -60,56 +62,110 @@ def test_msg_spec_xor_pld_spec(): | |||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: wrap these into `._codec` such that user can just pass | ||||
| # a type table of some sort? | ||||
| def enc_hook(obj: Any) -> Any: | ||||
|     if isinstance(obj, NamespacePath): | ||||
|         return str(obj) | ||||
|     else: | ||||
|         raise NotImplementedError( | ||||
|             f'Objects of type {type(obj)} are not supported' | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| def dec_hook(type: Type, obj: Any) -> Any: | ||||
|     print(f'type is: {type}') | ||||
|     if type is NamespacePath: | ||||
|         return NamespacePath(obj) | ||||
|     else: | ||||
|         raise NotImplementedError( | ||||
|             f'Objects of type {type(obj)} are not supported' | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| def ex_func(*args): | ||||
|     print(f'ex_func({args})') | ||||
| 
 | ||||
| 
 | ||||
| def mk_custom_codec( | ||||
|     ipc_msg_spec: Type[Any] = Any, | ||||
| ) -> MsgCodec: | ||||
|     # apply custom hooks and set a `Decoder` which only | ||||
|     # loads `NamespacePath` types. | ||||
|     nsp_codec: MsgCodec = mk_codec( | ||||
|         ipc_msg_spec=ipc_msg_spec, | ||||
|         enc_hook=enc_hook, | ||||
|         dec_hook=dec_hook, | ||||
|     ) | ||||
|     pld_spec: Union[Type]|Any, | ||||
| 
 | ||||
|     # TODO: validate `MsgCodec` interface/semantics? | ||||
|     # -[ ] simple field tests to ensure caching + reset is workin? | ||||
|     # -[ ] custom / changing `.decoder()` calls? | ||||
|     # | ||||
|     # dec = nsp_codec.decoder( | ||||
|     #     types=NamespacePath, | ||||
|     # ) | ||||
|     # assert nsp_codec.dec is dec | ||||
| ) -> MsgCodec: | ||||
|     ''' | ||||
|     Create custom `msgpack` enc/dec-hooks and set a `Decoder` | ||||
|     which only loads `NamespacePath` types. | ||||
| 
 | ||||
|     ''' | ||||
|     uid: tuple[str, str] = tractor.current_actor().uid | ||||
| 
 | ||||
|     # XXX NOTE XXX: despite defining `NamespacePath` as a type | ||||
|     # field on our `Msg.pld`, we still need a enc/dec_hook() pair | ||||
|     # to cast to/from that type on the wire. See the docs: | ||||
|     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||
| 
 | ||||
|     def enc_nsp(obj: Any) -> Any: | ||||
|         match obj: | ||||
|             case NamespacePath(): | ||||
|                 print( | ||||
|                     f'{uid}: `NamespacePath`-Only ENCODE?\n' | ||||
|                     f'type: {type(obj)}\n' | ||||
|                     f'obj: {obj}\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 return str(obj) | ||||
| 
 | ||||
|         logmsg: str = ( | ||||
|             f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported' | ||||
|             f'type: {type(obj)}\n' | ||||
|             f'obj: {obj}\n' | ||||
|         ) | ||||
|         print(logmsg) | ||||
|         raise NotImplementedError(logmsg) | ||||
| 
 | ||||
|     def dec_nsp( | ||||
|         type: Type, | ||||
|         obj: Any, | ||||
| 
 | ||||
|     ) -> Any: | ||||
|         print( | ||||
|             f'{uid}: CUSTOM DECODE\n' | ||||
|             f'input type: {type}\n' | ||||
|             f'obj: {obj}\n' | ||||
|             f'type(obj): `{type(obj).__class__}`\n' | ||||
|         ) | ||||
|         nsp = None | ||||
| 
 | ||||
|         # This never seems to hit? | ||||
|         if isinstance(obj, Msg): | ||||
|             print(f'Msg type: {obj}') | ||||
| 
 | ||||
|         if ( | ||||
|             type is NamespacePath | ||||
|             and isinstance(obj, str) | ||||
|             and ':' in obj | ||||
|         ): | ||||
|             nsp = NamespacePath(obj) | ||||
| 
 | ||||
|         if nsp: | ||||
|             print(f'Returning NSP instance: {nsp}') | ||||
|             return nsp | ||||
| 
 | ||||
|         logmsg: str = ( | ||||
|             f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported' | ||||
|             f'input type: {type(obj)}\n' | ||||
|             f'obj: {obj}\n' | ||||
|             f'type(obj): `{type(obj).__class__}`\n' | ||||
|         ) | ||||
|         print(logmsg) | ||||
|         raise NotImplementedError(logmsg) | ||||
| 
 | ||||
| 
 | ||||
|     nsp_codec: MsgCodec = mk_codec( | ||||
|         ipc_pld_spec=pld_spec, | ||||
| 
 | ||||
|         # NOTE XXX: the encode hook MUST be used no matter what since | ||||
|         # our `NamespacePath` is not any of a `Any` native type nor | ||||
|         # a `msgspec.Struct` subtype - so `msgspec` has no way to know | ||||
|         # how to encode it unless we provide the custom hook. | ||||
|         # | ||||
|         # AGAIN that is, regardless of whether we spec an | ||||
|         # `Any`-decoded-pld the enc has no knowledge (by default) | ||||
|         # how to enc `NamespacePath` (nsp), so we add a custom | ||||
|         # hook to do that ALWAYS. | ||||
|         enc_hook=enc_nsp, | ||||
| 
 | ||||
|         # XXX NOTE: pretty sure this is mutex with the `type=` to | ||||
|         # `Decoder`? so it won't work in tandem with the | ||||
|         # `ipc_pld_spec` passed above? | ||||
|         dec_hook=dec_nsp, | ||||
|     ) | ||||
|     return nsp_codec | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def send_back_nsp( | ||||
|     ctx: tractor.Context, | ||||
|     ctx: Context, | ||||
|     expect_debug: bool, | ||||
|     use_any_spec: bool, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|  | @ -117,28 +173,65 @@ async def send_back_nsp( | |||
|     and ensure we can round trip a func ref with our parent. | ||||
| 
 | ||||
|     ''' | ||||
|     task: trio.Task = trio.lowlevel.current_task() | ||||
|     task_ctx: Context = task.context | ||||
|     assert _ctxvar_MsgCodec not in task_ctx | ||||
|     # debug mode sanity check | ||||
|     assert expect_debug == _state.debug_mode() | ||||
| 
 | ||||
|     nsp_codec: MsgCodec = mk_custom_codec() | ||||
|     # task: trio.Task = trio.lowlevel.current_task() | ||||
| 
 | ||||
|     # TreeVar | ||||
|     # curr_codec = _ctxvar_MsgCodec.get_in(task) | ||||
| 
 | ||||
|     # ContextVar | ||||
|     # task_ctx: Context = task.context | ||||
|     # assert _ctxvar_MsgCodec not in task_ctx | ||||
| 
 | ||||
|     curr_codec = _ctxvar_MsgCodec.get() | ||||
|     assert curr_codec is _codec._def_tractor_codec | ||||
| 
 | ||||
|     if use_any_spec: | ||||
|         pld_spec = Any | ||||
|     else: | ||||
|         # NOTE: don't need the |None here since | ||||
|         # the parent side will never send `None` like | ||||
|         # we do here in the implicit return at the end of this | ||||
|         # `@context` body. | ||||
|         pld_spec = NamespacePath  # |None | ||||
| 
 | ||||
|     nsp_codec: MsgCodec = mk_custom_codec( | ||||
|         pld_spec=pld_spec, | ||||
|     ) | ||||
|     with apply_codec(nsp_codec) as codec: | ||||
|         chk_codec_applied( | ||||
|             custom_codec=nsp_codec, | ||||
|             enter_value=codec, | ||||
|         ) | ||||
| 
 | ||||
|         # ensure roundtripping works locally | ||||
|         nsp = NamespacePath.from_ref(ex_func) | ||||
|         await ctx.started(nsp) | ||||
|         wire_bytes: bytes = nsp_codec.encode( | ||||
|             Started( | ||||
|                 cid=ctx.cid, | ||||
|                 pld=nsp | ||||
|             ) | ||||
|         ) | ||||
|         msg: Started = nsp_codec.decode(wire_bytes) | ||||
|         pld = msg.pld | ||||
|         assert pld == nsp | ||||
| 
 | ||||
|         await ctx.started(nsp) | ||||
|         async with ctx.open_stream() as ipc: | ||||
|             async for msg in ipc: | ||||
| 
 | ||||
|                 assert msg == f'{__name__}:ex_func' | ||||
|                 if use_any_spec: | ||||
|                     assert msg == f'{__name__}:ex_func' | ||||
| 
 | ||||
|                 # TODO: as per below | ||||
|                 # assert isinstance(msg, NamespacePath) | ||||
|                 assert isinstance(msg, str) | ||||
|                     # TODO: as per below | ||||
|                     # assert isinstance(msg, NamespacePath) | ||||
|                     assert isinstance(msg, str) | ||||
|                 else: | ||||
|                     assert isinstance(msg, NamespacePath) | ||||
| 
 | ||||
|                 await ipc.send(msg) | ||||
| 
 | ||||
| 
 | ||||
| def chk_codec_applied( | ||||
|  | @ -146,11 +239,20 @@ def chk_codec_applied( | |||
|     enter_value: MsgCodec, | ||||
| ) -> MsgCodec: | ||||
| 
 | ||||
|     task: trio.Task = trio.lowlevel.current_task() | ||||
|     task_ctx: Context = task.context | ||||
|     # task: trio.Task = trio.lowlevel.current_task() | ||||
| 
 | ||||
|     assert _ctxvar_MsgCodec in task_ctx | ||||
|     curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] | ||||
|     # TreeVar | ||||
|     # curr_codec = _ctxvar_MsgCodec.get_in(task) | ||||
| 
 | ||||
|     # ContextVar | ||||
|     # task_ctx: Context = task.context | ||||
|     # assert _ctxvar_MsgCodec in task_ctx | ||||
|     # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] | ||||
| 
 | ||||
|     # RunVar | ||||
|     curr_codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||
|     last_read_codec = _ctxvar_MsgCodec.get() | ||||
|     assert curr_codec is last_read_codec | ||||
| 
 | ||||
|     assert ( | ||||
|         # returned from `mk_codec()` | ||||
|  | @ -163,14 +265,31 @@ def chk_codec_applied( | |||
|         curr_codec is | ||||
| 
 | ||||
|         # public API for all of the above | ||||
|         current_msgspec_codec() | ||||
|         current_codec() | ||||
| 
 | ||||
|         # the default `msgspec` settings | ||||
|         is not _def_msgspec_codec | ||||
|         is not _codec._def_msgspec_codec | ||||
|         is not _codec._def_tractor_codec | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def test_codec_hooks_mod(): | ||||
| @pytest.mark.parametrize( | ||||
|     'ipc_pld_spec', | ||||
|     [ | ||||
|         # _codec._def_msgspec_codec, | ||||
|         Any, | ||||
|         # _codec._def_tractor_codec, | ||||
|         NamespacePath|None, | ||||
|     ], | ||||
|     ids=[ | ||||
|         'any_type', | ||||
|         'nsp_type', | ||||
|     ] | ||||
| ) | ||||
| def test_codec_hooks_mod( | ||||
|     debug_mode: bool, | ||||
|     ipc_pld_spec: Union[Type]|Any, | ||||
| ): | ||||
|     ''' | ||||
|     Audit the `.msg.MsgCodec` override apis details given our impl | ||||
|     uses `contextvars` to accomplish per `trio` task codec | ||||
|  | @ -178,11 +297,21 @@ def test_codec_hooks_mod(): | |||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
|         task: trio.Task = trio.lowlevel.current_task() | ||||
|         task_ctx: Context = task.context | ||||
|         assert _ctxvar_MsgCodec not in task_ctx | ||||
| 
 | ||||
|         async with tractor.open_nursery() as an: | ||||
|         # task: trio.Task = trio.lowlevel.current_task() | ||||
| 
 | ||||
|         # ContextVar | ||||
|         # task_ctx: Context = task.context | ||||
|         # assert _ctxvar_MsgCodec not in task_ctx | ||||
| 
 | ||||
|         # TreeVar | ||||
|         # def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task) | ||||
|         def_codec = _ctxvar_MsgCodec.get() | ||||
|         assert def_codec is _codec._def_tractor_codec | ||||
| 
 | ||||
|         async with tractor.open_nursery( | ||||
|             debug_mode=debug_mode, | ||||
|         ) as an: | ||||
|             p: tractor.Portal = await an.start_actor( | ||||
|                 'sub', | ||||
|                 enable_modules=[__name__], | ||||
|  | @ -192,7 +321,9 @@ def test_codec_hooks_mod(): | |||
|             # - codec not modified -> decode nsp as `str` | ||||
|             # - codec modified with hooks -> decode nsp as | ||||
|             #   `NamespacePath` | ||||
|             nsp_codec: MsgCodec = mk_custom_codec() | ||||
|             nsp_codec: MsgCodec = mk_custom_codec( | ||||
|                 pld_spec=ipc_pld_spec, | ||||
|             ) | ||||
|             with apply_codec(nsp_codec) as codec: | ||||
|                 chk_codec_applied( | ||||
|                     custom_codec=nsp_codec, | ||||
|  | @ -202,9 +333,22 @@ def test_codec_hooks_mod(): | |||
|                 async with ( | ||||
|                     p.open_context( | ||||
|                         send_back_nsp, | ||||
|                         # TODO: send the original nsp here and | ||||
|                         # test with `limit_msg_spec()` above? | ||||
|                         expect_debug=debug_mode, | ||||
|                         use_any_spec=(ipc_pld_spec==Any), | ||||
| 
 | ||||
|                     ) as (ctx, first), | ||||
|                     ctx.open_stream() as ipc, | ||||
|                 ): | ||||
|                     if ipc_pld_spec is NamespacePath: | ||||
|                         assert isinstance(first, NamespacePath) | ||||
| 
 | ||||
|                     print( | ||||
|                         'root: ENTERING CONTEXT BLOCK\n' | ||||
|                         f'type(first): {type(first)}\n' | ||||
|                         f'first: {first}\n' | ||||
|                     ) | ||||
|                     # ensure codec is still applied across | ||||
|                     # `tractor.Context` + its embedded nursery. | ||||
|                     chk_codec_applied( | ||||
|  | @ -212,23 +356,46 @@ def test_codec_hooks_mod(): | |||
|                         enter_value=codec, | ||||
|                     ) | ||||
| 
 | ||||
|                     assert first == f'{__name__}:ex_func' | ||||
|                     first_nsp = NamespacePath(first) | ||||
| 
 | ||||
|                     # ensure roundtripping works | ||||
|                     wire_bytes: bytes = nsp_codec.encode( | ||||
|                         Started( | ||||
|                             cid=ctx.cid, | ||||
|                             pld=first_nsp | ||||
|                         ) | ||||
|                     ) | ||||
|                     msg: Started = nsp_codec.decode(wire_bytes) | ||||
|                     pld = msg.pld | ||||
|                     assert  pld == first_nsp | ||||
| 
 | ||||
|                     # try a manual decode of the started msg+pld | ||||
| 
 | ||||
|                     # TODO: actually get the decoder loading | ||||
|                     # to native once we spec our SCIPP msgspec | ||||
|                     # (structurred-conc-inter-proc-protocol) | ||||
|                     # implemented as per, | ||||
|                     # https://github.com/goodboy/tractor/issues/36 | ||||
|                     # | ||||
|                     # assert isinstance(first, NamespacePath) | ||||
|                     assert isinstance(first, str) | ||||
|                     if ipc_pld_spec is NamespacePath: | ||||
|                         assert isinstance(first, NamespacePath) | ||||
| 
 | ||||
|                     # `Any`-payload-spec case | ||||
|                     else: | ||||
|                         assert isinstance(first, str) | ||||
|                         assert first == f'{__name__}:ex_func' | ||||
| 
 | ||||
|                     await ipc.send(first) | ||||
| 
 | ||||
|                     with trio.move_on_after(1): | ||||
|                     with trio.move_on_after(.6): | ||||
|                         async for msg in ipc: | ||||
|                             print(msg) | ||||
| 
 | ||||
|                             # TODO: as per above | ||||
|                             # assert isinstance(msg, NamespacePath) | ||||
|                             assert isinstance(msg, str) | ||||
|                             await ipc.send(msg) | ||||
|                             await trio.sleep(0.1) | ||||
| 
 | ||||
|             await p.cancel_actor() | ||||
| 
 | ||||
|  |  | |||
|  | @ -845,7 +845,10 @@ async def keep_sending_from_callee( | |||
|         ('caller', 1, never_open_stream), | ||||
|         ('callee', 0, keep_sending_from_callee), | ||||
|     ], | ||||
|     ids='overrun_condition={}'.format, | ||||
|     ids=[ | ||||
|          ('caller_1buf_never_open_stream'), | ||||
|          ('callee_0buf_keep_sending_from_callee'), | ||||
|     ] | ||||
| ) | ||||
| def test_one_end_stream_not_opened( | ||||
|     overrun_by: tuple[str, int, Callable], | ||||
|  | @ -869,29 +872,30 @@ def test_one_end_stream_not_opened( | |||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
| 
 | ||||
|             async with portal.open_context( | ||||
|                 entrypoint, | ||||
|             ) as (ctx, sent): | ||||
|                 assert sent is None | ||||
|             with trio.fail_after(1): | ||||
|                 async with portal.open_context( | ||||
|                     entrypoint, | ||||
|                 ) as (ctx, sent): | ||||
|                     assert sent is None | ||||
| 
 | ||||
|                 if 'caller' in overrunner: | ||||
|                     if 'caller' in overrunner: | ||||
| 
 | ||||
|                     async with ctx.open_stream() as stream: | ||||
|                         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                         # itersend +1 msg more then the buffer size | ||||
|                         # to cause the most basic overrun. | ||||
|                         for i in range(buf_size): | ||||
|                             print(f'sending {i}') | ||||
|                             await stream.send(i) | ||||
|                             # itersend +1 msg more then the buffer size | ||||
|                             # to cause the most basic overrun. | ||||
|                             for i in range(buf_size): | ||||
|                                 print(f'sending {i}') | ||||
|                                 await stream.send(i) | ||||
| 
 | ||||
|                         else: | ||||
|                             # expect overrun error to be relayed back | ||||
|                             # and this sleep interrupted | ||||
|                             await trio.sleep_forever() | ||||
|                             else: | ||||
|                                 # expect overrun error to be relayed back | ||||
|                                 # and this sleep interrupted | ||||
|                                 await trio.sleep_forever() | ||||
| 
 | ||||
|                 else: | ||||
|                     # callee overruns caller case so we do nothing here | ||||
|                     await trio.sleep_forever() | ||||
|                     else: | ||||
|                         # callee overruns caller case so we do nothing here | ||||
|                         await trio.sleep_forever() | ||||
| 
 | ||||
|             await portal.cancel_actor() | ||||
| 
 | ||||
|  |  | |||
|  | @ -190,11 +190,14 @@ class Lock: | |||
|         is_trio_main = ( | ||||
|             # TODO: since this is private, @oremanj says | ||||
|             # we should just copy the impl for now.. | ||||
|             trio._util.is_main_thread() | ||||
|             (is_main_thread := trio._util.is_main_thread()) | ||||
|             and | ||||
|             (async_lib := sniffio.current_async_library()) == 'trio' | ||||
|         ) | ||||
|         if not is_trio_main: | ||||
|         if ( | ||||
|             not is_trio_main | ||||
|             and is_main_thread | ||||
|         ): | ||||
|             log.warning( | ||||
|                 f'Current async-lib detected by `sniffio`: {async_lib}\n' | ||||
|             ) | ||||
|  |  | |||
|  | @ -31,25 +31,24 @@ from ._codec import ( | |||
|     apply_codec as apply_codec, | ||||
|     mk_codec as mk_codec, | ||||
|     MsgCodec as MsgCodec, | ||||
|     current_msgspec_codec as current_msgspec_codec, | ||||
|     current_codec as current_codec, | ||||
| ) | ||||
| 
 | ||||
| from .types import ( | ||||
|     Msg as Msg, | ||||
| 
 | ||||
|     Start as Start,  # with pld | ||||
|     FuncSpec as FuncSpec, | ||||
|     Aid as Aid, | ||||
|     SpawnSpec as SpawnSpec, | ||||
| 
 | ||||
|     StartAck as StartAck, # with pld | ||||
|     IpcCtxSpec as IpcCtxSpec, | ||||
|     Start as Start, | ||||
|     StartAck as StartAck, | ||||
| 
 | ||||
|     Started as Started, | ||||
|     Yield as Yield, | ||||
|     Stop as Stop, | ||||
|     Return as Return, | ||||
| 
 | ||||
|     Error as Error,  # with pld | ||||
|     ErrorData as ErrorData, | ||||
|     Error as Error, | ||||
| 
 | ||||
|     # full msg spec set | ||||
|     __spec__ as __spec__, | ||||
|  |  | |||
|  | @ -30,13 +30,13 @@ ToDo: backends we prolly should offer: | |||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextvars import ( | ||||
|     ContextVar, | ||||
|     Token, | ||||
| ) | ||||
| from contextlib import ( | ||||
|     contextmanager as cm, | ||||
| ) | ||||
| # from contextvars import ( | ||||
| #     ContextVar, | ||||
| #     Token, | ||||
| # ) | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Callable, | ||||
|  | @ -47,6 +47,12 @@ from types import ModuleType | |||
| 
 | ||||
| import msgspec | ||||
| from msgspec import msgpack | ||||
| from trio.lowlevel import ( | ||||
|     RunVar, | ||||
|     RunVarToken, | ||||
| ) | ||||
| # TODO: see notes below from @mikenerone.. | ||||
| # from tricycle import TreeVar | ||||
| 
 | ||||
| from tractor.msg.pretty_struct import Struct | ||||
| from tractor.msg.types import ( | ||||
|  | @ -72,6 +78,9 @@ class MsgCodec(Struct): | |||
|     ''' | ||||
|     A IPC msg interchange format lib's encoder + decoder pair. | ||||
| 
 | ||||
|     Pretty much nothing more then delegation to underlying | ||||
|     `msgspec.<interchange-protocol>.Encoder/Decoder`s for now. | ||||
| 
 | ||||
|     ''' | ||||
|     _enc: msgpack.Encoder | ||||
|     _dec: msgpack.Decoder | ||||
|  | @ -86,11 +95,6 @@ class MsgCodec(Struct): | |||
| 
 | ||||
|     lib: ModuleType = msgspec | ||||
| 
 | ||||
|     # ad-hoc type extensions | ||||
|     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||
|     enc_hook: Callable[[Any], Any]|None = None  # coder | ||||
|     dec_hook: Callable[[type, Any], Any]|None = None # decoder | ||||
| 
 | ||||
|     # TODO: a sub-decoder system as well? | ||||
|     # payload_msg_specs: Union[Type[Struct]] = Any | ||||
|     # see related comments in `.msg.types` | ||||
|  | @ -304,7 +308,8 @@ def mk_codec( | |||
| 
 | ||||
|     libname: str = 'msgspec', | ||||
| 
 | ||||
|     # proxy as `Struct(**kwargs)` | ||||
|     # proxy as `Struct(**kwargs)` for ad-hoc type extensions | ||||
|     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||
|     # ------ - ------ | ||||
|     dec_hook: Callable|None = None, | ||||
|     enc_hook: Callable|None = None, | ||||
|  | @ -389,14 +394,52 @@ def mk_codec( | |||
| # no custom structs, hooks or other special types. | ||||
| _def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any) | ||||
| 
 | ||||
| # NOTE: provides for per-`trio.Task` specificity of the | ||||
| # The built-in IPC `Msg` spec. | ||||
| # Our composing "shuttle" protocol which allows `tractor`-app code | ||||
| # to use any `msgspec` supported type as the `Msg.pld` payload, | ||||
| # https://jcristharif.com/msgspec/supported-types.html | ||||
| # | ||||
| _def_tractor_codec: MsgCodec = mk_codec( | ||||
|     ipc_pld_spec=Any, | ||||
| ) | ||||
| # TODO: IDEALLY provides for per-`trio.Task` specificity of the | ||||
| # IPC msging codec used by the transport layer when doing | ||||
| # `Channel.send()/.recv()` of wire data. | ||||
| _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( | ||||
| 
 | ||||
| # ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!? | ||||
| # _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( | ||||
| 
 | ||||
| # TreeVar-TODO: DIDN'T WORK, kept resetting in every new embedded nursery | ||||
| # even though it's supposed to inherit from a parent context ??? | ||||
| # | ||||
| # _ctxvar_MsgCodec: TreeVar[MsgCodec] = TreeVar( | ||||
| # | ||||
| # ^-NOTE-^: for this to work see the mods by @mikenerone from `trio` gitter: | ||||
| # | ||||
| # 22:02:54 <mikenerone> even for regular contextvars, all you have to do is: | ||||
| #    `task: Task = trio.lowlevel.current_task()` | ||||
| #    `task.parent_nursery.parent_task.context.run(my_ctx_var.set, new_value)` | ||||
| # | ||||
| # From a comment in his prop code he couldn't share outright: | ||||
| # 1. For every TreeVar set in the current task (which covers what | ||||
| #    we need from SynchronizerFacade), walk up the tree until the | ||||
| #    root or finding one where the TreeVar is already set, setting | ||||
| #    it in all of the contexts along the way. | ||||
| # 2. For each of those, we also forcibly set the values that are | ||||
| #    pending for child nurseries that have not yet accessed the | ||||
| #    TreeVar. | ||||
| # 3. We similarly set the pending values for the child nurseries | ||||
| #    of the *current* task. | ||||
| # | ||||
| 
 | ||||
| # TODO: STOP USING THIS, since it's basically a global and won't | ||||
| # allow sub-IPC-ctxs to limit the msg-spec however desired.. | ||||
| _ctxvar_MsgCodec: MsgCodec = RunVar( | ||||
|     'msgspec_codec', | ||||
| 
 | ||||
|     # TODO: move this to our new `Msg`-spec! | ||||
|     default=_def_msgspec_codec, | ||||
|     # default=_def_msgspec_codec, | ||||
|     default=_def_tractor_codec, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -410,15 +453,36 @@ def apply_codec( | |||
|     runtime context such that all IPC msgs are processed | ||||
|     with it for that task. | ||||
| 
 | ||||
|     Uses a `tricycle.TreeVar` to ensure the scope of the codec | ||||
|     matches the `@cm` block and DOES NOT change to the original | ||||
|     (default) value in new tasks (as it does for `ContextVar`). | ||||
| 
 | ||||
|     See the docs: | ||||
|     - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables | ||||
|     - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py | ||||
| 
 | ||||
|     ''' | ||||
|     token: Token = _ctxvar_MsgCodec.set(codec) | ||||
|     orig: MsgCodec = _ctxvar_MsgCodec.get() | ||||
|     assert orig is not codec | ||||
|     token: RunVarToken = _ctxvar_MsgCodec.set(codec) | ||||
| 
 | ||||
|     # TODO: for TreeVar approach, see docs for @cm `.being()` API: | ||||
|     # https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables | ||||
|     # try: | ||||
|     #     with _ctxvar_MsgCodec.being(codec): | ||||
|     #         new = _ctxvar_MsgCodec.get() | ||||
|     #         assert new is codec | ||||
|     #         yield codec | ||||
| 
 | ||||
|     try: | ||||
|         yield _ctxvar_MsgCodec.get() | ||||
|     finally: | ||||
|         _ctxvar_MsgCodec.reset(token) | ||||
| 
 | ||||
|     assert _ctxvar_MsgCodec.get() is orig | ||||
| 
 | ||||
| def current_msgspec_codec() -> MsgCodec: | ||||
| 
 | ||||
| def current_codec() -> MsgCodec: | ||||
|     ''' | ||||
|     Return the current `trio.Task.context`'s value | ||||
|     for `msgspec_codec` used by `Channel.send/.recv()` | ||||
|  | @ -449,5 +513,6 @@ def limit_msg_spec( | |||
|         payload_types=payload_types, | ||||
|         **codec_kwargs, | ||||
|     ) | ||||
|     with apply_codec(msgspec_codec): | ||||
|     with apply_codec(msgspec_codec) as applied_codec: | ||||
|         assert applied_codec is msgspec_codec | ||||
|         yield msgspec_codec | ||||
|  |  | |||
|  | @ -80,6 +80,28 @@ class DiffDump(UserList): | |||
|         return repstr | ||||
| 
 | ||||
| 
 | ||||
| def iter_fields(struct: Struct) -> Iterator[ | ||||
|     tuple[ | ||||
|         structs.FieldIinfo, | ||||
|         str, | ||||
|         Any, | ||||
|     ] | ||||
| ]: | ||||
|     ''' | ||||
|     Iterate over all non-@property fields of this struct. | ||||
| 
 | ||||
|     ''' | ||||
|     fi: structs.FieldInfo | ||||
|     for fi in structs.fields(struct): | ||||
|         key: str = fi.name | ||||
|         val: Any = getattr(struct, key) | ||||
|         yield ( | ||||
|             fi, | ||||
|             key, | ||||
|             val, | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| class Struct( | ||||
|     _Struct, | ||||
| 
 | ||||
|  | @ -91,23 +113,6 @@ class Struct( | |||
|     A "human friendlier" (aka repl buddy) struct subtype. | ||||
| 
 | ||||
|     ''' | ||||
|     def _sin_props(self) -> Iterator[ | ||||
|         tuple[ | ||||
|             structs.FieldIinfo, | ||||
|             str, | ||||
|             Any, | ||||
|         ] | ||||
|     ]: | ||||
|         ''' | ||||
|         Iterate over all non-@property fields of this struct. | ||||
| 
 | ||||
|         ''' | ||||
|         fi: structs.FieldInfo | ||||
|         for fi in structs.fields(self): | ||||
|             key: str = fi.name | ||||
|             val: Any = getattr(self, key) | ||||
|             yield fi, key, val | ||||
| 
 | ||||
|     def to_dict( | ||||
|         self, | ||||
|         include_non_members: bool = True, | ||||
|  | @ -130,7 +135,7 @@ class Struct( | |||
|         # added as type-defined `@property` methods! | ||||
|         sin_props: dict = {} | ||||
|         fi: structs.FieldInfo | ||||
|         for fi, k, v in self._sin_props(): | ||||
|         for fi, k, v in iter_fields(self): | ||||
|             sin_props[k] = asdict[k] | ||||
| 
 | ||||
|         return sin_props | ||||
|  | @ -159,7 +164,7 @@ class Struct( | |||
|         fi: structs.FieldInfo | ||||
|         k: str | ||||
|         v: Any | ||||
|         for fi, k, v in self._sin_props(): | ||||
|         for fi, k, v in iter_fields(self): | ||||
| 
 | ||||
|             # TODO: how can we prefer `Literal['option1',  'option2, | ||||
|             # ..]` over .__name__ == `Literal` but still get only the | ||||
|  |  | |||
|  | @ -26,6 +26,7 @@ from __future__ import annotations | |||
| import types | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Callable, | ||||
|     Generic, | ||||
|     Literal, | ||||
|     Type, | ||||
|  | @ -37,8 +38,12 @@ from msgspec import ( | |||
|     defstruct, | ||||
|     # field, | ||||
|     Struct, | ||||
|     UNSET, | ||||
|     UnsetType, | ||||
|     # UNSET, | ||||
|     # UnsetType, | ||||
| ) | ||||
| 
 | ||||
| from tractor.msg import ( | ||||
|     pretty_struct, | ||||
| ) | ||||
| 
 | ||||
| # type variable for the boxed payload field `.pld` | ||||
|  | @ -48,11 +53,19 @@ PayloadT = TypeVar('PayloadT') | |||
| class Msg( | ||||
|     Struct, | ||||
|     Generic[PayloadT], | ||||
| 
 | ||||
|     # https://jcristharif.com/msgspec/structs.html#tagged-unions | ||||
|     tag=True, | ||||
|     tag_field='msg_type', | ||||
| 
 | ||||
|     # eq=True, | ||||
|     # https://jcristharif.com/msgspec/structs.html#field-ordering | ||||
|     # kw_only=True, | ||||
| 
 | ||||
|     # https://jcristharif.com/msgspec/structs.html#equality-and-order | ||||
|     # order=True, | ||||
| 
 | ||||
|     # https://jcristharif.com/msgspec/structs.html#encoding-decoding-as-arrays | ||||
|     # as_array=True, | ||||
| ): | ||||
|     ''' | ||||
|     The "god" boxing msg type. | ||||
|  | @ -90,6 +103,53 @@ class Msg( | |||
|     pld: PayloadT | ||||
| 
 | ||||
| 
 | ||||
| class Aid( | ||||
|     Struct, | ||||
|     tag=True, | ||||
|     tag_field='msg_type', | ||||
| ): | ||||
|     ''' | ||||
|     Actor-identity msg. | ||||
| 
 | ||||
|     Initial contact exchange enabling an actor "mailbox handshake" | ||||
|     delivering the peer identity (and maybe eventually contact) | ||||
|     info. | ||||
| 
 | ||||
|     Used by discovery protocol to register actors as well as | ||||
|     conduct the initial comms (capability) filtering. | ||||
| 
 | ||||
|     ''' | ||||
|     name: str | ||||
|     uuid: str | ||||
|     # TODO: use built-in support for UUIDs? | ||||
|     # -[ ] `uuid.UUID` which has multi-protocol support | ||||
|     #  https://jcristharif.com/msgspec/supported-types.html#uuid | ||||
| 
 | ||||
| 
 | ||||
| class SpawnSpec( | ||||
|     pretty_struct.Struct, | ||||
|     tag=True, | ||||
|     tag_field='msg_type', | ||||
| ): | ||||
|     ''' | ||||
|     Initial runtime spec handed down from a spawning parent to its | ||||
|     child subactor immediately following first contact via an | ||||
|     `Aid` msg. | ||||
| 
 | ||||
|     ''' | ||||
|     _parent_main_data: dict | ||||
|     _runtime_vars: dict[str, Any] | ||||
| 
 | ||||
|     # module import capability | ||||
|     enable_modules: dict[str, str] | ||||
| 
 | ||||
|     # TODO: not just sockaddr pairs? | ||||
|     # -[ ] abstract into a `TransportAddr` type? | ||||
|     reg_addrs: list[tuple[str, int]] | ||||
|     bind_addrs: list[tuple[str, int]] | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # TODO: caps based RPC support in the payload? | ||||
| # | ||||
| # -[ ] integration with our ``enable_modules: list[str]`` caps sys. | ||||
|  | @ -105,18 +165,31 @@ class Msg( | |||
| # | ||||
| # -[ ] can we combine .ns + .func into a native `NamespacePath` field? | ||||
| # | ||||
| # -[ ]better name, like `Call/TaskInput`? | ||||
| # -[ ] better name, like `Call/TaskInput`? | ||||
| # | ||||
| class FuncSpec(Struct): | ||||
|     ns: str | ||||
|     func: str | ||||
| 
 | ||||
|     kwargs: dict | ||||
|     uid: str  # (calling) actor-id | ||||
| # -[ ] XXX a debugger lock msg transaction with payloads like, | ||||
| #   child -> `.pld: DebugLock` -> root | ||||
| #   child <- `.pld: DebugLocked` <- root | ||||
| #   child -> `.pld: DebugRelease` -> root | ||||
| # | ||||
| #   WHY => when a pld spec is provided it might not allow for | ||||
| #   debug mode msgs as they currently are (using plain old `pld. | ||||
| #   str` payloads) so we only when debug_mode=True we need to | ||||
| #   union in this debugger payload set? | ||||
| # | ||||
| #   mk_msg_spec( | ||||
| #       MyPldSpec, | ||||
| #       debug_mode=True, | ||||
| #   ) -> ( | ||||
| #       Union[MyPldSpec] | ||||
| #      | Union[DebugLock, DebugLocked, DebugRelease] | ||||
| #   ) | ||||
| 
 | ||||
| 
 | ||||
| class Start( | ||||
|     Msg, | ||||
|     Struct, | ||||
|     tag=True, | ||||
|     tag_field='msg_type', | ||||
| ): | ||||
|     ''' | ||||
|     Initial request to remotely schedule an RPC `trio.Task` via | ||||
|  | @ -134,14 +207,26 @@ class Start( | |||
|     - `Context.open_context()` | ||||
| 
 | ||||
|     ''' | ||||
|     pld: FuncSpec | ||||
|     cid: str | ||||
| 
 | ||||
|     ns: str | ||||
|     func: str | ||||
| 
 | ||||
|     kwargs: dict | ||||
|     uid: tuple[str, str]  # (calling) actor-id | ||||
| 
 | ||||
| 
 | ||||
| class IpcCtxSpec(Struct): | ||||
| class StartAck( | ||||
|     Struct, | ||||
|     tag=True, | ||||
|     tag_field='msg_type', | ||||
| ): | ||||
|     ''' | ||||
|     An inter-actor-`trio.Task`-comms `Context` spec. | ||||
|     Init response to a `Cmd` request indicating the far | ||||
|     end's RPC spec, namely its callable "type". | ||||
| 
 | ||||
|     ''' | ||||
|     cid: str | ||||
|     # TODO: maybe better names for all these? | ||||
|     # -[ ] obvi ^ would need sync with `._rpc` | ||||
|     functype: Literal[ | ||||
|  | @ -160,18 +245,6 @@ class IpcCtxSpec(Struct): | |||
|     # msgspec: MsgSpec | ||||
| 
 | ||||
| 
 | ||||
| class StartAck( | ||||
|     Msg, | ||||
|     Generic[PayloadT], | ||||
| ): | ||||
|     ''' | ||||
|     Init response to a `Cmd` request indicating the far | ||||
|     end's RPC callable "type". | ||||
| 
 | ||||
|     ''' | ||||
|     pld: IpcCtxSpec | ||||
| 
 | ||||
| 
 | ||||
| class Started( | ||||
|     Msg, | ||||
|     Generic[PayloadT], | ||||
|  | @ -202,13 +275,19 @@ class Yield( | |||
|     pld: PayloadT | ||||
| 
 | ||||
| 
 | ||||
| class Stop(Msg): | ||||
| class Stop( | ||||
|     Struct, | ||||
|     tag=True, | ||||
|     tag_field='msg_type', | ||||
| ): | ||||
|     ''' | ||||
|     Stream termination signal much like an IPC version  | ||||
|     of `StopAsyncIteration`. | ||||
| 
 | ||||
|     ''' | ||||
|     pld: UnsetType = UNSET | ||||
|     cid: str | ||||
|     # TODO: do we want to support a payload on stop? | ||||
|     # pld: UnsetType = UNSET | ||||
| 
 | ||||
| 
 | ||||
| class Return( | ||||
|  | @ -223,32 +302,33 @@ class Return( | |||
|     pld: PayloadT | ||||
| 
 | ||||
| 
 | ||||
| class ErrorData(Struct): | ||||
| class Error( | ||||
|     Struct, | ||||
|     tag=True, | ||||
|     tag_field='msg_type', | ||||
| ): | ||||
|     ''' | ||||
|     Remote actor error meta-data as needed originally by | ||||
|     A pkt that wraps `RemoteActorError`s for relay and raising. | ||||
| 
 | ||||
|     Fields are 1-to-1 meta-data as needed originally by | ||||
|     `RemoteActorError.msgdata: dict`. | ||||
| 
 | ||||
|     ''' | ||||
|     src_uid: str | ||||
|     src_uid: tuple[str, str] | ||||
|     src_type_str: str | ||||
|     boxed_type_str: str | ||||
| 
 | ||||
|     relay_path: list[str] | ||||
|     relay_path: list[tuple[str, str]] | ||||
|     tb_str: str | ||||
| 
 | ||||
|     cid: str|None = None | ||||
| 
 | ||||
|     # TODO: use UNSET or don't include them via | ||||
|     # | ||||
|     # `ContextCancelled` | ||||
|     canceller: str|None = None | ||||
|     canceller: tuple[str, str]|None = None | ||||
| 
 | ||||
|     # `StreamOverrun` | ||||
|     sender: str|None = None | ||||
| 
 | ||||
| 
 | ||||
| class Error(Msg): | ||||
|     ''' | ||||
|     A pkt that wraps `RemoteActorError`s for relay. | ||||
| 
 | ||||
|     ''' | ||||
|     pld: ErrorData | ||||
|     sender: tuple[str, str]|None = None | ||||
| 
 | ||||
| 
 | ||||
| # TODO: should be make a msg version of `ContextCancelled?` | ||||
|  | @ -265,6 +345,12 @@ class Error(Msg): | |||
| # approx order of the IPC txn-state spaces. | ||||
| __spec__: list[Msg] = [ | ||||
| 
 | ||||
|     # identity handshake | ||||
|     Aid, | ||||
| 
 | ||||
|     # spawn specification from parent | ||||
|     SpawnSpec, | ||||
| 
 | ||||
|     # inter-actor RPC initiation | ||||
|     Start, | ||||
|     StartAck, | ||||
|  | @ -280,6 +366,8 @@ __spec__: list[Msg] = [ | |||
| ] | ||||
| 
 | ||||
| _runtime_spec_msgs: list[Msg] = [ | ||||
|     Aid, | ||||
|     SpawnSpec, | ||||
|     Start, | ||||
|     StartAck, | ||||
|     Stop, | ||||
|  | @ -443,3 +531,99 @@ def mk_msg_spec( | |||
|         pld_spec | runtime_spec, | ||||
|         msgtypes_table[spec_build_method] + ipc_msg_types, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: make something similar to this inside `._codec` such that | ||||
| # user can just pass a type table of some sort? | ||||
| # def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: | ||||
| #     ''' | ||||
| #     Deliver a `enc_hook()`/`dec_hook()` pair which does | ||||
| #     manual convertion from our above native `Msg` set | ||||
| #     to `dict` equivalent (wire msgs) in order to keep legacy compat | ||||
| #     with the original runtime implementation. | ||||
| 
 | ||||
| #     Note: this is is/was primarly used while moving the core | ||||
| #     runtime over to using native `Msg`-struct types wherein we | ||||
| #     start with the send side emitting without loading | ||||
| #     a typed-decoder and then later flipping the switch over to | ||||
| #     load to the native struct types once all runtime usage has | ||||
| #     been adjusted appropriately. | ||||
| 
 | ||||
| #     ''' | ||||
| #     def enc_to_dict(msg: Any) -> Any: | ||||
| #         ''' | ||||
| #         Encode `Msg`-structs to `dict` msgs instead | ||||
| #         of using `msgspec.msgpack.Decoder.type`-ed | ||||
| #         features. | ||||
| 
 | ||||
| #         ''' | ||||
| #         match msg: | ||||
| #             case Start(): | ||||
| #                 dctmsg: dict = pretty_struct.Struct.to_dict( | ||||
| #                     msg | ||||
| #                 )['pld'] | ||||
| 
 | ||||
| #             case Error(): | ||||
| #                 dctmsg: dict = pretty_struct.Struct.to_dict( | ||||
| #                     msg | ||||
| #                 )['pld'] | ||||
| #                 return {'error': dctmsg} | ||||
| 
 | ||||
| 
 | ||||
| #     def dec_from_dict( | ||||
| #         type: Type, | ||||
| #         obj: Any, | ||||
| #     ) -> Any: | ||||
| #         ''' | ||||
| #         Decode to `Msg`-structs from `dict` msgs instead | ||||
| #         of using `msgspec.msgpack.Decoder.type`-ed | ||||
| #         features. | ||||
| 
 | ||||
| #         ''' | ||||
| #         cid: str = obj.get('cid') | ||||
| #         match obj: | ||||
| #             case {'cmd': pld}: | ||||
| #                 return Start( | ||||
| #                     cid=cid, | ||||
| #                     pld=pld, | ||||
| #                 ) | ||||
| #             case {'functype': pld}: | ||||
| #                 return StartAck( | ||||
| #                     cid=cid, | ||||
| #                     functype=pld, | ||||
| #                     # pld=IpcCtxSpec( | ||||
| #                     #     functype=pld, | ||||
| #                     # ), | ||||
| #                 ) | ||||
| #             case {'started': pld}: | ||||
| #                 return Started( | ||||
| #                     cid=cid, | ||||
| #                     pld=pld, | ||||
| #                 ) | ||||
| #             case {'yield': pld}: | ||||
| #                 return Yield( | ||||
| #                     cid=obj['cid'], | ||||
| #                     pld=pld, | ||||
| #                 ) | ||||
| #             case {'stop': pld}: | ||||
| #                 return Stop( | ||||
| #                     cid=cid, | ||||
| #                 ) | ||||
| #             case {'return': pld}: | ||||
| #                 return Return( | ||||
| #                     cid=cid, | ||||
| #                     pld=pld, | ||||
| #                 ) | ||||
| 
 | ||||
| #             case {'error': pld}: | ||||
| #                 return Error( | ||||
| #                     cid=cid, | ||||
| #                     pld=ErrorData( | ||||
| #                         **pld | ||||
| #                     ), | ||||
| #                 ) | ||||
| 
 | ||||
| #     return ( | ||||
| #         # enc_to_dict, | ||||
| #         dec_from_dict, | ||||
| #     ) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue