Compare commits
	
		
			4 Commits 
		
	
	
		
			3ba46362a9
			...
			67f673bf36
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 67f673bf36 | |
|  | 3a105e2830 | |
|  | a315f01acc | |
|  | 25ffdedc06 | 
|  | @ -7,7 +7,6 @@ B~) | ||||||
| ''' | ''' | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     _GenericAlias, |  | ||||||
|     Type, |     Type, | ||||||
|     Union, |     Union, | ||||||
| ) | ) | ||||||
|  | @ -26,20 +25,23 @@ from msgspec import ( | ||||||
| import pytest | import pytest | ||||||
| import tractor | import tractor | ||||||
| from tractor.msg import ( | from tractor.msg import ( | ||||||
|     _def_msgspec_codec, |     _codec, | ||||||
|     _ctxvar_MsgCodec, |     _ctxvar_MsgCodec, | ||||||
| 
 | 
 | ||||||
|     NamespacePath, |     NamespacePath, | ||||||
|     MsgCodec, |     MsgCodec, | ||||||
|     mk_codec, |     mk_codec, | ||||||
|     apply_codec, |     apply_codec, | ||||||
|     current_msgspec_codec, |     current_codec, | ||||||
| ) | ) | ||||||
| from tractor.msg import types | from tractor.msg import ( | ||||||
|  |     types, | ||||||
|  | ) | ||||||
|  | from tractor import _state | ||||||
| from tractor.msg.types import ( | from tractor.msg.types import ( | ||||||
|     # PayloadT, |     # PayloadT, | ||||||
|     Msg, |     Msg, | ||||||
|     # Started, |     Started, | ||||||
|     mk_msg_spec, |     mk_msg_spec, | ||||||
| ) | ) | ||||||
| import trio | import trio | ||||||
|  | @ -60,56 +62,110 @@ def test_msg_spec_xor_pld_spec(): | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: wrap these into `._codec` such that user can just pass |  | ||||||
| # a type table of some sort? |  | ||||||
| def enc_hook(obj: Any) -> Any: |  | ||||||
|     if isinstance(obj, NamespacePath): |  | ||||||
|         return str(obj) |  | ||||||
|     else: |  | ||||||
|         raise NotImplementedError( |  | ||||||
|             f'Objects of type {type(obj)} are not supported' |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def dec_hook(type: Type, obj: Any) -> Any: |  | ||||||
|     print(f'type is: {type}') |  | ||||||
|     if type is NamespacePath: |  | ||||||
|         return NamespacePath(obj) |  | ||||||
|     else: |  | ||||||
|         raise NotImplementedError( |  | ||||||
|             f'Objects of type {type(obj)} are not supported' |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def ex_func(*args): | def ex_func(*args): | ||||||
|     print(f'ex_func({args})') |     print(f'ex_func({args})') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def mk_custom_codec( | def mk_custom_codec( | ||||||
|     ipc_msg_spec: Type[Any] = Any, |     pld_spec: Union[Type]|Any, | ||||||
|  | 
 | ||||||
| ) -> MsgCodec: | ) -> MsgCodec: | ||||||
|     # apply custom hooks and set a `Decoder` which only |     ''' | ||||||
|     # loads `NamespacePath` types. |     Create custom `msgpack` enc/dec-hooks and set a `Decoder` | ||||||
|     nsp_codec: MsgCodec = mk_codec( |     which only loads `NamespacePath` types. | ||||||
|         ipc_msg_spec=ipc_msg_spec, | 
 | ||||||
|         enc_hook=enc_hook, |     ''' | ||||||
|         dec_hook=dec_hook, |     uid: tuple[str, str] = tractor.current_actor().uid | ||||||
|  | 
 | ||||||
|  |     # XXX NOTE XXX: despite defining `NamespacePath` as a type | ||||||
|  |     # field on our `Msg.pld`, we still need a enc/dec_hook() pair | ||||||
|  |     # to cast to/from that type on the wire. See the docs: | ||||||
|  |     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||||
|  | 
 | ||||||
|  |     def enc_nsp(obj: Any) -> Any: | ||||||
|  |         match obj: | ||||||
|  |             case NamespacePath(): | ||||||
|  |                 print( | ||||||
|  |                     f'{uid}: `NamespacePath`-Only ENCODE?\n' | ||||||
|  |                     f'type: {type(obj)}\n' | ||||||
|  |                     f'obj: {obj}\n' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|     # TODO: validate `MsgCodec` interface/semantics? |                 return str(obj) | ||||||
|     # -[ ] simple field tests to ensure caching + reset is workin? | 
 | ||||||
|     # -[ ] custom / changing `.decoder()` calls? |         logmsg: str = ( | ||||||
|  |             f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported' | ||||||
|  |             f'type: {type(obj)}\n' | ||||||
|  |             f'obj: {obj}\n' | ||||||
|  |         ) | ||||||
|  |         print(logmsg) | ||||||
|  |         raise NotImplementedError(logmsg) | ||||||
|  | 
 | ||||||
|  |     def dec_nsp( | ||||||
|  |         type: Type, | ||||||
|  |         obj: Any, | ||||||
|  | 
 | ||||||
|  |     ) -> Any: | ||||||
|  |         print( | ||||||
|  |             f'{uid}: CUSTOM DECODE\n' | ||||||
|  |             f'input type: {type}\n' | ||||||
|  |             f'obj: {obj}\n' | ||||||
|  |             f'type(obj): `{type(obj).__class__}`\n' | ||||||
|  |         ) | ||||||
|  |         nsp = None | ||||||
|  | 
 | ||||||
|  |         # This never seems to hit? | ||||||
|  |         if isinstance(obj, Msg): | ||||||
|  |             print(f'Msg type: {obj}') | ||||||
|  | 
 | ||||||
|  |         if ( | ||||||
|  |             type is NamespacePath | ||||||
|  |             and isinstance(obj, str) | ||||||
|  |             and ':' in obj | ||||||
|  |         ): | ||||||
|  |             nsp = NamespacePath(obj) | ||||||
|  | 
 | ||||||
|  |         if nsp: | ||||||
|  |             print(f'Returning NSP instance: {nsp}') | ||||||
|  |             return nsp | ||||||
|  | 
 | ||||||
|  |         logmsg: str = ( | ||||||
|  |             f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported' | ||||||
|  |             f'input type: {type(obj)}\n' | ||||||
|  |             f'obj: {obj}\n' | ||||||
|  |             f'type(obj): `{type(obj).__class__}`\n' | ||||||
|  |         ) | ||||||
|  |         print(logmsg) | ||||||
|  |         raise NotImplementedError(logmsg) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |     nsp_codec: MsgCodec = mk_codec( | ||||||
|  |         ipc_pld_spec=pld_spec, | ||||||
|  | 
 | ||||||
|  |         # NOTE XXX: the encode hook MUST be used no matter what since | ||||||
|  |         # our `NamespacePath` is not any of a `Any` native type nor | ||||||
|  |         # a `msgspec.Struct` subtype - so `msgspec` has no way to know | ||||||
|  |         # how to encode it unless we provide the custom hook. | ||||||
|         # |         # | ||||||
|     # dec = nsp_codec.decoder( |         # AGAIN that is, regardless of whether we spec an | ||||||
|     #     types=NamespacePath, |         # `Any`-decoded-pld the enc has no knowledge (by default) | ||||||
|     # ) |         # how to enc `NamespacePath` (nsp), so we add a custom | ||||||
|     # assert nsp_codec.dec is dec |         # hook to do that ALWAYS. | ||||||
|  |         enc_hook=enc_nsp, | ||||||
|  | 
 | ||||||
|  |         # XXX NOTE: pretty sure this is mutex with the `type=` to | ||||||
|  |         # `Decoder`? so it won't work in tandem with the | ||||||
|  |         # `ipc_pld_spec` passed above? | ||||||
|  |         dec_hook=dec_nsp, | ||||||
|  |     ) | ||||||
|     return nsp_codec |     return nsp_codec | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
| async def send_back_nsp( | async def send_back_nsp( | ||||||
|     ctx: tractor.Context, |     ctx: Context, | ||||||
|  |     expect_debug: bool, | ||||||
|  |     use_any_spec: bool, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     ''' |     ''' | ||||||
|  | @ -117,28 +173,65 @@ async def send_back_nsp( | ||||||
|     and ensure we can round trip a func ref with our parent. |     and ensure we can round trip a func ref with our parent. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     task: trio.Task = trio.lowlevel.current_task() |     # debug mode sanity check | ||||||
|     task_ctx: Context = task.context |     assert expect_debug == _state.debug_mode() | ||||||
|     assert _ctxvar_MsgCodec not in task_ctx |  | ||||||
| 
 | 
 | ||||||
|     nsp_codec: MsgCodec = mk_custom_codec() |     # task: trio.Task = trio.lowlevel.current_task() | ||||||
|  | 
 | ||||||
|  |     # TreeVar | ||||||
|  |     # curr_codec = _ctxvar_MsgCodec.get_in(task) | ||||||
|  | 
 | ||||||
|  |     # ContextVar | ||||||
|  |     # task_ctx: Context = task.context | ||||||
|  |     # assert _ctxvar_MsgCodec not in task_ctx | ||||||
|  | 
 | ||||||
|  |     curr_codec = _ctxvar_MsgCodec.get() | ||||||
|  |     assert curr_codec is _codec._def_tractor_codec | ||||||
|  | 
 | ||||||
|  |     if use_any_spec: | ||||||
|  |         pld_spec = Any | ||||||
|  |     else: | ||||||
|  |         # NOTE: don't need the |None here since | ||||||
|  |         # the parent side will never send `None` like | ||||||
|  |         # we do here in the implicit return at the end of this | ||||||
|  |         # `@context` body. | ||||||
|  |         pld_spec = NamespacePath  # |None | ||||||
|  | 
 | ||||||
|  |     nsp_codec: MsgCodec = mk_custom_codec( | ||||||
|  |         pld_spec=pld_spec, | ||||||
|  |     ) | ||||||
|     with apply_codec(nsp_codec) as codec: |     with apply_codec(nsp_codec) as codec: | ||||||
|         chk_codec_applied( |         chk_codec_applied( | ||||||
|             custom_codec=nsp_codec, |             custom_codec=nsp_codec, | ||||||
|             enter_value=codec, |             enter_value=codec, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |         # ensure roundtripping works locally | ||||||
|         nsp = NamespacePath.from_ref(ex_func) |         nsp = NamespacePath.from_ref(ex_func) | ||||||
|         await ctx.started(nsp) |         wire_bytes: bytes = nsp_codec.encode( | ||||||
|  |             Started( | ||||||
|  |                 cid=ctx.cid, | ||||||
|  |                 pld=nsp | ||||||
|  |             ) | ||||||
|  |         ) | ||||||
|  |         msg: Started = nsp_codec.decode(wire_bytes) | ||||||
|  |         pld = msg.pld | ||||||
|  |         assert pld == nsp | ||||||
| 
 | 
 | ||||||
|  |         await ctx.started(nsp) | ||||||
|         async with ctx.open_stream() as ipc: |         async with ctx.open_stream() as ipc: | ||||||
|             async for msg in ipc: |             async for msg in ipc: | ||||||
| 
 | 
 | ||||||
|  |                 if use_any_spec: | ||||||
|                     assert msg == f'{__name__}:ex_func' |                     assert msg == f'{__name__}:ex_func' | ||||||
| 
 | 
 | ||||||
|                     # TODO: as per below |                     # TODO: as per below | ||||||
|                     # assert isinstance(msg, NamespacePath) |                     # assert isinstance(msg, NamespacePath) | ||||||
|                     assert isinstance(msg, str) |                     assert isinstance(msg, str) | ||||||
|  |                 else: | ||||||
|  |                     assert isinstance(msg, NamespacePath) | ||||||
|  | 
 | ||||||
|  |                 await ipc.send(msg) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def chk_codec_applied( | def chk_codec_applied( | ||||||
|  | @ -146,11 +239,20 @@ def chk_codec_applied( | ||||||
|     enter_value: MsgCodec, |     enter_value: MsgCodec, | ||||||
| ) -> MsgCodec: | ) -> MsgCodec: | ||||||
| 
 | 
 | ||||||
|     task: trio.Task = trio.lowlevel.current_task() |     # task: trio.Task = trio.lowlevel.current_task() | ||||||
|     task_ctx: Context = task.context |  | ||||||
| 
 | 
 | ||||||
|     assert _ctxvar_MsgCodec in task_ctx |     # TreeVar | ||||||
|     curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] |     # curr_codec = _ctxvar_MsgCodec.get_in(task) | ||||||
|  | 
 | ||||||
|  |     # ContextVar | ||||||
|  |     # task_ctx: Context = task.context | ||||||
|  |     # assert _ctxvar_MsgCodec in task_ctx | ||||||
|  |     # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec] | ||||||
|  | 
 | ||||||
|  |     # RunVar | ||||||
|  |     curr_codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||||
|  |     last_read_codec = _ctxvar_MsgCodec.get() | ||||||
|  |     assert curr_codec is last_read_codec | ||||||
| 
 | 
 | ||||||
|     assert ( |     assert ( | ||||||
|         # returned from `mk_codec()` |         # returned from `mk_codec()` | ||||||
|  | @ -163,14 +265,31 @@ def chk_codec_applied( | ||||||
|         curr_codec is |         curr_codec is | ||||||
| 
 | 
 | ||||||
|         # public API for all of the above |         # public API for all of the above | ||||||
|         current_msgspec_codec() |         current_codec() | ||||||
| 
 | 
 | ||||||
|         # the default `msgspec` settings |         # the default `msgspec` settings | ||||||
|         is not _def_msgspec_codec |         is not _codec._def_msgspec_codec | ||||||
|  |         is not _codec._def_tractor_codec | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_codec_hooks_mod(): | @pytest.mark.parametrize( | ||||||
|  |     'ipc_pld_spec', | ||||||
|  |     [ | ||||||
|  |         # _codec._def_msgspec_codec, | ||||||
|  |         Any, | ||||||
|  |         # _codec._def_tractor_codec, | ||||||
|  |         NamespacePath|None, | ||||||
|  |     ], | ||||||
|  |     ids=[ | ||||||
|  |         'any_type', | ||||||
|  |         'nsp_type', | ||||||
|  |     ] | ||||||
|  | ) | ||||||
|  | def test_codec_hooks_mod( | ||||||
|  |     debug_mode: bool, | ||||||
|  |     ipc_pld_spec: Union[Type]|Any, | ||||||
|  | ): | ||||||
|     ''' |     ''' | ||||||
|     Audit the `.msg.MsgCodec` override apis details given our impl |     Audit the `.msg.MsgCodec` override apis details given our impl | ||||||
|     uses `contextvars` to accomplish per `trio` task codec |     uses `contextvars` to accomplish per `trio` task codec | ||||||
|  | @ -178,11 +297,21 @@ def test_codec_hooks_mod(): | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     async def main(): |     async def main(): | ||||||
|         task: trio.Task = trio.lowlevel.current_task() |  | ||||||
|         task_ctx: Context = task.context |  | ||||||
|         assert _ctxvar_MsgCodec not in task_ctx |  | ||||||
| 
 | 
 | ||||||
|         async with tractor.open_nursery() as an: |         # task: trio.Task = trio.lowlevel.current_task() | ||||||
|  | 
 | ||||||
|  |         # ContextVar | ||||||
|  |         # task_ctx: Context = task.context | ||||||
|  |         # assert _ctxvar_MsgCodec not in task_ctx | ||||||
|  | 
 | ||||||
|  |         # TreeVar | ||||||
|  |         # def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task) | ||||||
|  |         def_codec = _ctxvar_MsgCodec.get() | ||||||
|  |         assert def_codec is _codec._def_tractor_codec | ||||||
|  | 
 | ||||||
|  |         async with tractor.open_nursery( | ||||||
|  |             debug_mode=debug_mode, | ||||||
|  |         ) as an: | ||||||
|             p: tractor.Portal = await an.start_actor( |             p: tractor.Portal = await an.start_actor( | ||||||
|                 'sub', |                 'sub', | ||||||
|                 enable_modules=[__name__], |                 enable_modules=[__name__], | ||||||
|  | @ -192,7 +321,9 @@ def test_codec_hooks_mod(): | ||||||
|             # - codec not modified -> decode nsp as `str` |             # - codec not modified -> decode nsp as `str` | ||||||
|             # - codec modified with hooks -> decode nsp as |             # - codec modified with hooks -> decode nsp as | ||||||
|             #   `NamespacePath` |             #   `NamespacePath` | ||||||
|             nsp_codec: MsgCodec = mk_custom_codec() |             nsp_codec: MsgCodec = mk_custom_codec( | ||||||
|  |                 pld_spec=ipc_pld_spec, | ||||||
|  |             ) | ||||||
|             with apply_codec(nsp_codec) as codec: |             with apply_codec(nsp_codec) as codec: | ||||||
|                 chk_codec_applied( |                 chk_codec_applied( | ||||||
|                     custom_codec=nsp_codec, |                     custom_codec=nsp_codec, | ||||||
|  | @ -202,9 +333,22 @@ def test_codec_hooks_mod(): | ||||||
|                 async with ( |                 async with ( | ||||||
|                     p.open_context( |                     p.open_context( | ||||||
|                         send_back_nsp, |                         send_back_nsp, | ||||||
|  |                         # TODO: send the original nsp here and | ||||||
|  |                         # test with `limit_msg_spec()` above? | ||||||
|  |                         expect_debug=debug_mode, | ||||||
|  |                         use_any_spec=(ipc_pld_spec==Any), | ||||||
|  | 
 | ||||||
|                     ) as (ctx, first), |                     ) as (ctx, first), | ||||||
|                     ctx.open_stream() as ipc, |                     ctx.open_stream() as ipc, | ||||||
|                 ): |                 ): | ||||||
|  |                     if ipc_pld_spec is NamespacePath: | ||||||
|  |                         assert isinstance(first, NamespacePath) | ||||||
|  | 
 | ||||||
|  |                     print( | ||||||
|  |                         'root: ENTERING CONTEXT BLOCK\n' | ||||||
|  |                         f'type(first): {type(first)}\n' | ||||||
|  |                         f'first: {first}\n' | ||||||
|  |                     ) | ||||||
|                     # ensure codec is still applied across |                     # ensure codec is still applied across | ||||||
|                     # `tractor.Context` + its embedded nursery. |                     # `tractor.Context` + its embedded nursery. | ||||||
|                     chk_codec_applied( |                     chk_codec_applied( | ||||||
|  | @ -212,23 +356,46 @@ def test_codec_hooks_mod(): | ||||||
|                         enter_value=codec, |                         enter_value=codec, | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|                     assert first == f'{__name__}:ex_func' |                     first_nsp = NamespacePath(first) | ||||||
|  | 
 | ||||||
|  |                     # ensure roundtripping works | ||||||
|  |                     wire_bytes: bytes = nsp_codec.encode( | ||||||
|  |                         Started( | ||||||
|  |                             cid=ctx.cid, | ||||||
|  |                             pld=first_nsp | ||||||
|  |                         ) | ||||||
|  |                     ) | ||||||
|  |                     msg: Started = nsp_codec.decode(wire_bytes) | ||||||
|  |                     pld = msg.pld | ||||||
|  |                     assert  pld == first_nsp | ||||||
|  | 
 | ||||||
|  |                     # try a manual decode of the started msg+pld | ||||||
|  | 
 | ||||||
|                     # TODO: actually get the decoder loading |                     # TODO: actually get the decoder loading | ||||||
|                     # to native once we spec our SCIPP msgspec |                     # to native once we spec our SCIPP msgspec | ||||||
|                     # (structurred-conc-inter-proc-protocol) |                     # (structurred-conc-inter-proc-protocol) | ||||||
|                     # implemented as per, |                     # implemented as per, | ||||||
|                     # https://github.com/goodboy/tractor/issues/36 |                     # https://github.com/goodboy/tractor/issues/36 | ||||||
|                     # |                     # | ||||||
|                     # assert isinstance(first, NamespacePath) |                     if ipc_pld_spec is NamespacePath: | ||||||
|  |                         assert isinstance(first, NamespacePath) | ||||||
|  | 
 | ||||||
|  |                     # `Any`-payload-spec case | ||||||
|  |                     else: | ||||||
|                         assert isinstance(first, str) |                         assert isinstance(first, str) | ||||||
|  |                         assert first == f'{__name__}:ex_func' | ||||||
|  | 
 | ||||||
|                     await ipc.send(first) |                     await ipc.send(first) | ||||||
| 
 | 
 | ||||||
|                     with trio.move_on_after(1): |                     with trio.move_on_after(.6): | ||||||
|                         async for msg in ipc: |                         async for msg in ipc: | ||||||
|  |                             print(msg) | ||||||
| 
 | 
 | ||||||
|                             # TODO: as per above |                             # TODO: as per above | ||||||
|                             # assert isinstance(msg, NamespacePath) |                             # assert isinstance(msg, NamespacePath) | ||||||
|                             assert isinstance(msg, str) |                             assert isinstance(msg, str) | ||||||
|  |                             await ipc.send(msg) | ||||||
|  |                             await trio.sleep(0.1) | ||||||
| 
 | 
 | ||||||
|             await p.cancel_actor() |             await p.cancel_actor() | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -845,7 +845,10 @@ async def keep_sending_from_callee( | ||||||
|         ('caller', 1, never_open_stream), |         ('caller', 1, never_open_stream), | ||||||
|         ('callee', 0, keep_sending_from_callee), |         ('callee', 0, keep_sending_from_callee), | ||||||
|     ], |     ], | ||||||
|     ids='overrun_condition={}'.format, |     ids=[ | ||||||
|  |          ('caller_1buf_never_open_stream'), | ||||||
|  |          ('callee_0buf_keep_sending_from_callee'), | ||||||
|  |     ] | ||||||
| ) | ) | ||||||
| def test_one_end_stream_not_opened( | def test_one_end_stream_not_opened( | ||||||
|     overrun_by: tuple[str, int, Callable], |     overrun_by: tuple[str, int, Callable], | ||||||
|  | @ -869,6 +872,7 @@ def test_one_end_stream_not_opened( | ||||||
|                 enable_modules=[__name__], |                 enable_modules=[__name__], | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  |             with trio.fail_after(1): | ||||||
|                 async with portal.open_context( |                 async with portal.open_context( | ||||||
|                     entrypoint, |                     entrypoint, | ||||||
|                 ) as (ctx, sent): |                 ) as (ctx, sent): | ||||||
|  |  | ||||||
|  | @ -190,11 +190,14 @@ class Lock: | ||||||
|         is_trio_main = ( |         is_trio_main = ( | ||||||
|             # TODO: since this is private, @oremanj says |             # TODO: since this is private, @oremanj says | ||||||
|             # we should just copy the impl for now.. |             # we should just copy the impl for now.. | ||||||
|             trio._util.is_main_thread() |             (is_main_thread := trio._util.is_main_thread()) | ||||||
|             and |             and | ||||||
|             (async_lib := sniffio.current_async_library()) == 'trio' |             (async_lib := sniffio.current_async_library()) == 'trio' | ||||||
|         ) |         ) | ||||||
|         if not is_trio_main: |         if ( | ||||||
|  |             not is_trio_main | ||||||
|  |             and is_main_thread | ||||||
|  |         ): | ||||||
|             log.warning( |             log.warning( | ||||||
|                 f'Current async-lib detected by `sniffio`: {async_lib}\n' |                 f'Current async-lib detected by `sniffio`: {async_lib}\n' | ||||||
|             ) |             ) | ||||||
|  |  | ||||||
|  | @ -31,25 +31,24 @@ from ._codec import ( | ||||||
|     apply_codec as apply_codec, |     apply_codec as apply_codec, | ||||||
|     mk_codec as mk_codec, |     mk_codec as mk_codec, | ||||||
|     MsgCodec as MsgCodec, |     MsgCodec as MsgCodec, | ||||||
|     current_msgspec_codec as current_msgspec_codec, |     current_codec as current_codec, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| from .types import ( | from .types import ( | ||||||
|     Msg as Msg, |     Msg as Msg, | ||||||
| 
 | 
 | ||||||
|     Start as Start,  # with pld |     Aid as Aid, | ||||||
|     FuncSpec as FuncSpec, |     SpawnSpec as SpawnSpec, | ||||||
| 
 | 
 | ||||||
|     StartAck as StartAck, # with pld |     Start as Start, | ||||||
|     IpcCtxSpec as IpcCtxSpec, |     StartAck as StartAck, | ||||||
| 
 | 
 | ||||||
|     Started as Started, |     Started as Started, | ||||||
|     Yield as Yield, |     Yield as Yield, | ||||||
|     Stop as Stop, |     Stop as Stop, | ||||||
|     Return as Return, |     Return as Return, | ||||||
| 
 | 
 | ||||||
|     Error as Error,  # with pld |     Error as Error, | ||||||
|     ErrorData as ErrorData, |  | ||||||
| 
 | 
 | ||||||
|     # full msg spec set |     # full msg spec set | ||||||
|     __spec__ as __spec__, |     __spec__ as __spec__, | ||||||
|  |  | ||||||
|  | @ -30,13 +30,13 @@ ToDo: backends we prolly should offer: | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
| from contextvars import ( |  | ||||||
|     ContextVar, |  | ||||||
|     Token, |  | ||||||
| ) |  | ||||||
| from contextlib import ( | from contextlib import ( | ||||||
|     contextmanager as cm, |     contextmanager as cm, | ||||||
| ) | ) | ||||||
|  | # from contextvars import ( | ||||||
|  | #     ContextVar, | ||||||
|  | #     Token, | ||||||
|  | # ) | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     Callable, |     Callable, | ||||||
|  | @ -47,6 +47,12 @@ from types import ModuleType | ||||||
| 
 | 
 | ||||||
| import msgspec | import msgspec | ||||||
| from msgspec import msgpack | from msgspec import msgpack | ||||||
|  | from trio.lowlevel import ( | ||||||
|  |     RunVar, | ||||||
|  |     RunVarToken, | ||||||
|  | ) | ||||||
|  | # TODO: see notes below from @mikenerone.. | ||||||
|  | # from tricycle import TreeVar | ||||||
| 
 | 
 | ||||||
| from tractor.msg.pretty_struct import Struct | from tractor.msg.pretty_struct import Struct | ||||||
| from tractor.msg.types import ( | from tractor.msg.types import ( | ||||||
|  | @ -72,6 +78,9 @@ class MsgCodec(Struct): | ||||||
|     ''' |     ''' | ||||||
|     A IPC msg interchange format lib's encoder + decoder pair. |     A IPC msg interchange format lib's encoder + decoder pair. | ||||||
| 
 | 
 | ||||||
|  |     Pretty much nothing more then delegation to underlying | ||||||
|  |     `msgspec.<interchange-protocol>.Encoder/Decoder`s for now. | ||||||
|  | 
 | ||||||
|     ''' |     ''' | ||||||
|     _enc: msgpack.Encoder |     _enc: msgpack.Encoder | ||||||
|     _dec: msgpack.Decoder |     _dec: msgpack.Decoder | ||||||
|  | @ -86,11 +95,6 @@ class MsgCodec(Struct): | ||||||
| 
 | 
 | ||||||
|     lib: ModuleType = msgspec |     lib: ModuleType = msgspec | ||||||
| 
 | 
 | ||||||
|     # ad-hoc type extensions |  | ||||||
|     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types |  | ||||||
|     enc_hook: Callable[[Any], Any]|None = None  # coder |  | ||||||
|     dec_hook: Callable[[type, Any], Any]|None = None # decoder |  | ||||||
| 
 |  | ||||||
|     # TODO: a sub-decoder system as well? |     # TODO: a sub-decoder system as well? | ||||||
|     # payload_msg_specs: Union[Type[Struct]] = Any |     # payload_msg_specs: Union[Type[Struct]] = Any | ||||||
|     # see related comments in `.msg.types` |     # see related comments in `.msg.types` | ||||||
|  | @ -304,7 +308,8 @@ def mk_codec( | ||||||
| 
 | 
 | ||||||
|     libname: str = 'msgspec', |     libname: str = 'msgspec', | ||||||
| 
 | 
 | ||||||
|     # proxy as `Struct(**kwargs)` |     # proxy as `Struct(**kwargs)` for ad-hoc type extensions | ||||||
|  |     # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types | ||||||
|     # ------ - ------ |     # ------ - ------ | ||||||
|     dec_hook: Callable|None = None, |     dec_hook: Callable|None = None, | ||||||
|     enc_hook: Callable|None = None, |     enc_hook: Callable|None = None, | ||||||
|  | @ -389,14 +394,52 @@ def mk_codec( | ||||||
| # no custom structs, hooks or other special types. | # no custom structs, hooks or other special types. | ||||||
| _def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any) | _def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any) | ||||||
| 
 | 
 | ||||||
| # NOTE: provides for per-`trio.Task` specificity of the | # The built-in IPC `Msg` spec. | ||||||
|  | # Our composing "shuttle" protocol which allows `tractor`-app code | ||||||
|  | # to use any `msgspec` supported type as the `Msg.pld` payload, | ||||||
|  | # https://jcristharif.com/msgspec/supported-types.html | ||||||
|  | # | ||||||
|  | _def_tractor_codec: MsgCodec = mk_codec( | ||||||
|  |     ipc_pld_spec=Any, | ||||||
|  | ) | ||||||
|  | # TODO: IDEALLY provides for per-`trio.Task` specificity of the | ||||||
| # IPC msging codec used by the transport layer when doing | # IPC msging codec used by the transport layer when doing | ||||||
| # `Channel.send()/.recv()` of wire data. | # `Channel.send()/.recv()` of wire data. | ||||||
| _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( | 
 | ||||||
|  | # ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!? | ||||||
|  | # _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar( | ||||||
|  | 
 | ||||||
|  | # TreeVar-TODO: DIDN'T WORK, kept resetting in every new embedded nursery | ||||||
|  | # even though it's supposed to inherit from a parent context ??? | ||||||
|  | # | ||||||
|  | # _ctxvar_MsgCodec: TreeVar[MsgCodec] = TreeVar( | ||||||
|  | # | ||||||
|  | # ^-NOTE-^: for this to work see the mods by @mikenerone from `trio` gitter: | ||||||
|  | # | ||||||
|  | # 22:02:54 <mikenerone> even for regular contextvars, all you have to do is: | ||||||
|  | #    `task: Task = trio.lowlevel.current_task()` | ||||||
|  | #    `task.parent_nursery.parent_task.context.run(my_ctx_var.set, new_value)` | ||||||
|  | # | ||||||
|  | # From a comment in his prop code he couldn't share outright: | ||||||
|  | # 1. For every TreeVar set in the current task (which covers what | ||||||
|  | #    we need from SynchronizerFacade), walk up the tree until the | ||||||
|  | #    root or finding one where the TreeVar is already set, setting | ||||||
|  | #    it in all of the contexts along the way. | ||||||
|  | # 2. For each of those, we also forcibly set the values that are | ||||||
|  | #    pending for child nurseries that have not yet accessed the | ||||||
|  | #    TreeVar. | ||||||
|  | # 3. We similarly set the pending values for the child nurseries | ||||||
|  | #    of the *current* task. | ||||||
|  | # | ||||||
|  | 
 | ||||||
|  | # TODO: STOP USING THIS, since it's basically a global and won't | ||||||
|  | # allow sub-IPC-ctxs to limit the msg-spec however desired.. | ||||||
|  | _ctxvar_MsgCodec: MsgCodec = RunVar( | ||||||
|     'msgspec_codec', |     'msgspec_codec', | ||||||
| 
 | 
 | ||||||
|     # TODO: move this to our new `Msg`-spec! |     # TODO: move this to our new `Msg`-spec! | ||||||
|     default=_def_msgspec_codec, |     # default=_def_msgspec_codec, | ||||||
|  |     default=_def_tractor_codec, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -410,15 +453,36 @@ def apply_codec( | ||||||
|     runtime context such that all IPC msgs are processed |     runtime context such that all IPC msgs are processed | ||||||
|     with it for that task. |     with it for that task. | ||||||
| 
 | 
 | ||||||
|  |     Uses a `tricycle.TreeVar` to ensure the scope of the codec | ||||||
|  |     matches the `@cm` block and DOES NOT change to the original | ||||||
|  |     (default) value in new tasks (as it does for `ContextVar`). | ||||||
|  | 
 | ||||||
|  |     See the docs: | ||||||
|  |     - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables | ||||||
|  |     - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py | ||||||
|  | 
 | ||||||
|     ''' |     ''' | ||||||
|     token: Token = _ctxvar_MsgCodec.set(codec) |     orig: MsgCodec = _ctxvar_MsgCodec.get() | ||||||
|  |     assert orig is not codec | ||||||
|  |     token: RunVarToken = _ctxvar_MsgCodec.set(codec) | ||||||
|  | 
 | ||||||
|  |     # TODO: for TreeVar approach, see docs for @cm `.being()` API: | ||||||
|  |     # https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables | ||||||
|  |     # try: | ||||||
|  |     #     with _ctxvar_MsgCodec.being(codec): | ||||||
|  |     #         new = _ctxvar_MsgCodec.get() | ||||||
|  |     #         assert new is codec | ||||||
|  |     #         yield codec | ||||||
|  | 
 | ||||||
|     try: |     try: | ||||||
|         yield _ctxvar_MsgCodec.get() |         yield _ctxvar_MsgCodec.get() | ||||||
|     finally: |     finally: | ||||||
|         _ctxvar_MsgCodec.reset(token) |         _ctxvar_MsgCodec.reset(token) | ||||||
| 
 | 
 | ||||||
|  |     assert _ctxvar_MsgCodec.get() is orig | ||||||
| 
 | 
 | ||||||
| def current_msgspec_codec() -> MsgCodec: | 
 | ||||||
|  | def current_codec() -> MsgCodec: | ||||||
|     ''' |     ''' | ||||||
|     Return the current `trio.Task.context`'s value |     Return the current `trio.Task.context`'s value | ||||||
|     for `msgspec_codec` used by `Channel.send/.recv()` |     for `msgspec_codec` used by `Channel.send/.recv()` | ||||||
|  | @ -449,5 +513,6 @@ def limit_msg_spec( | ||||||
|         payload_types=payload_types, |         payload_types=payload_types, | ||||||
|         **codec_kwargs, |         **codec_kwargs, | ||||||
|     ) |     ) | ||||||
|     with apply_codec(msgspec_codec): |     with apply_codec(msgspec_codec) as applied_codec: | ||||||
|  |         assert applied_codec is msgspec_codec | ||||||
|         yield msgspec_codec |         yield msgspec_codec | ||||||
|  |  | ||||||
|  | @ -80,6 +80,28 @@ class DiffDump(UserList): | ||||||
|         return repstr |         return repstr | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def iter_fields(struct: Struct) -> Iterator[ | ||||||
|  |     tuple[ | ||||||
|  |         structs.FieldIinfo, | ||||||
|  |         str, | ||||||
|  |         Any, | ||||||
|  |     ] | ||||||
|  | ]: | ||||||
|  |     ''' | ||||||
|  |     Iterate over all non-@property fields of this struct. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     fi: structs.FieldInfo | ||||||
|  |     for fi in structs.fields(struct): | ||||||
|  |         key: str = fi.name | ||||||
|  |         val: Any = getattr(struct, key) | ||||||
|  |         yield ( | ||||||
|  |             fi, | ||||||
|  |             key, | ||||||
|  |             val, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| class Struct( | class Struct( | ||||||
|     _Struct, |     _Struct, | ||||||
| 
 | 
 | ||||||
|  | @ -91,23 +113,6 @@ class Struct( | ||||||
|     A "human friendlier" (aka repl buddy) struct subtype. |     A "human friendlier" (aka repl buddy) struct subtype. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     def _sin_props(self) -> Iterator[ |  | ||||||
|         tuple[ |  | ||||||
|             structs.FieldIinfo, |  | ||||||
|             str, |  | ||||||
|             Any, |  | ||||||
|         ] |  | ||||||
|     ]: |  | ||||||
|         ''' |  | ||||||
|         Iterate over all non-@property fields of this struct. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         fi: structs.FieldInfo |  | ||||||
|         for fi in structs.fields(self): |  | ||||||
|             key: str = fi.name |  | ||||||
|             val: Any = getattr(self, key) |  | ||||||
|             yield fi, key, val |  | ||||||
| 
 |  | ||||||
|     def to_dict( |     def to_dict( | ||||||
|         self, |         self, | ||||||
|         include_non_members: bool = True, |         include_non_members: bool = True, | ||||||
|  | @ -130,7 +135,7 @@ class Struct( | ||||||
|         # added as type-defined `@property` methods! |         # added as type-defined `@property` methods! | ||||||
|         sin_props: dict = {} |         sin_props: dict = {} | ||||||
|         fi: structs.FieldInfo |         fi: structs.FieldInfo | ||||||
|         for fi, k, v in self._sin_props(): |         for fi, k, v in iter_fields(self): | ||||||
|             sin_props[k] = asdict[k] |             sin_props[k] = asdict[k] | ||||||
| 
 | 
 | ||||||
|         return sin_props |         return sin_props | ||||||
|  | @ -159,7 +164,7 @@ class Struct( | ||||||
|         fi: structs.FieldInfo |         fi: structs.FieldInfo | ||||||
|         k: str |         k: str | ||||||
|         v: Any |         v: Any | ||||||
|         for fi, k, v in self._sin_props(): |         for fi, k, v in iter_fields(self): | ||||||
| 
 | 
 | ||||||
|             # TODO: how can we prefer `Literal['option1',  'option2, |             # TODO: how can we prefer `Literal['option1',  'option2, | ||||||
|             # ..]` over .__name__ == `Literal` but still get only the |             # ..]` over .__name__ == `Literal` but still get only the | ||||||
|  |  | ||||||
|  | @ -26,6 +26,7 @@ from __future__ import annotations | ||||||
| import types | import types | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|  |     Callable, | ||||||
|     Generic, |     Generic, | ||||||
|     Literal, |     Literal, | ||||||
|     Type, |     Type, | ||||||
|  | @ -37,8 +38,12 @@ from msgspec import ( | ||||||
|     defstruct, |     defstruct, | ||||||
|     # field, |     # field, | ||||||
|     Struct, |     Struct, | ||||||
|     UNSET, |     # UNSET, | ||||||
|     UnsetType, |     # UnsetType, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | from tractor.msg import ( | ||||||
|  |     pretty_struct, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| # type variable for the boxed payload field `.pld` | # type variable for the boxed payload field `.pld` | ||||||
|  | @ -48,11 +53,19 @@ PayloadT = TypeVar('PayloadT') | ||||||
| class Msg( | class Msg( | ||||||
|     Struct, |     Struct, | ||||||
|     Generic[PayloadT], |     Generic[PayloadT], | ||||||
|  | 
 | ||||||
|  |     # https://jcristharif.com/msgspec/structs.html#tagged-unions | ||||||
|     tag=True, |     tag=True, | ||||||
|     tag_field='msg_type', |     tag_field='msg_type', | ||||||
| 
 | 
 | ||||||
|     # eq=True, |     # https://jcristharif.com/msgspec/structs.html#field-ordering | ||||||
|  |     # kw_only=True, | ||||||
|  | 
 | ||||||
|  |     # https://jcristharif.com/msgspec/structs.html#equality-and-order | ||||||
|     # order=True, |     # order=True, | ||||||
|  | 
 | ||||||
|  |     # https://jcristharif.com/msgspec/structs.html#encoding-decoding-as-arrays | ||||||
|  |     # as_array=True, | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|     The "god" boxing msg type. |     The "god" boxing msg type. | ||||||
|  | @ -90,6 +103,53 @@ class Msg( | ||||||
|     pld: PayloadT |     pld: PayloadT | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | class Aid( | ||||||
|  |     Struct, | ||||||
|  |     tag=True, | ||||||
|  |     tag_field='msg_type', | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Actor-identity msg. | ||||||
|  | 
 | ||||||
|  |     Initial contact exchange enabling an actor "mailbox handshake" | ||||||
|  |     delivering the peer identity (and maybe eventually contact) | ||||||
|  |     info. | ||||||
|  | 
 | ||||||
|  |     Used by discovery protocol to register actors as well as | ||||||
|  |     conduct the initial comms (capability) filtering. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     name: str | ||||||
|  |     uuid: str | ||||||
|  |     # TODO: use built-in support for UUIDs? | ||||||
|  |     # -[ ] `uuid.UUID` which has multi-protocol support | ||||||
|  |     #  https://jcristharif.com/msgspec/supported-types.html#uuid | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class SpawnSpec( | ||||||
|  |     pretty_struct.Struct, | ||||||
|  |     tag=True, | ||||||
|  |     tag_field='msg_type', | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Initial runtime spec handed down from a spawning parent to its | ||||||
|  |     child subactor immediately following first contact via an | ||||||
|  |     `Aid` msg. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     _parent_main_data: dict | ||||||
|  |     _runtime_vars: dict[str, Any] | ||||||
|  | 
 | ||||||
|  |     # module import capability | ||||||
|  |     enable_modules: dict[str, str] | ||||||
|  | 
 | ||||||
|  |     # TODO: not just sockaddr pairs? | ||||||
|  |     # -[ ] abstract into a `TransportAddr` type? | ||||||
|  |     reg_addrs: list[tuple[str, int]] | ||||||
|  |     bind_addrs: list[tuple[str, int]] | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # TODO: caps based RPC support in the payload? | # TODO: caps based RPC support in the payload? | ||||||
| # | # | ||||||
| # -[ ] integration with our ``enable_modules: list[str]`` caps sys. | # -[ ] integration with our ``enable_modules: list[str]`` caps sys. | ||||||
|  | @ -105,18 +165,31 @@ class Msg( | ||||||
| # | # | ||||||
| # -[ ] can we combine .ns + .func into a native `NamespacePath` field? | # -[ ] can we combine .ns + .func into a native `NamespacePath` field? | ||||||
| # | # | ||||||
| # -[ ]better name, like `Call/TaskInput`? | # -[ ] better name, like `Call/TaskInput`? | ||||||
| # | # | ||||||
| class FuncSpec(Struct): | # -[ ] XXX a debugger lock msg transaction with payloads like, | ||||||
|     ns: str | #   child -> `.pld: DebugLock` -> root | ||||||
|     func: str | #   child <- `.pld: DebugLocked` <- root | ||||||
| 
 | #   child -> `.pld: DebugRelease` -> root | ||||||
|     kwargs: dict | # | ||||||
|     uid: str  # (calling) actor-id | #   WHY => when a pld spec is provided it might not allow for | ||||||
|  | #   debug mode msgs as they currently are (using plain old `pld. | ||||||
|  | #   str` payloads) so we only when debug_mode=True we need to | ||||||
|  | #   union in this debugger payload set? | ||||||
|  | # | ||||||
|  | #   mk_msg_spec( | ||||||
|  | #       MyPldSpec, | ||||||
|  | #       debug_mode=True, | ||||||
|  | #   ) -> ( | ||||||
|  | #       Union[MyPldSpec] | ||||||
|  | #      | Union[DebugLock, DebugLocked, DebugRelease] | ||||||
|  | #   ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class Start( | class Start( | ||||||
|     Msg, |     Struct, | ||||||
|  |     tag=True, | ||||||
|  |     tag_field='msg_type', | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|     Initial request to remotely schedule an RPC `trio.Task` via |     Initial request to remotely schedule an RPC `trio.Task` via | ||||||
|  | @ -134,14 +207,26 @@ class Start( | ||||||
|     - `Context.open_context()` |     - `Context.open_context()` | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     pld: FuncSpec |     cid: str | ||||||
|  | 
 | ||||||
|  |     ns: str | ||||||
|  |     func: str | ||||||
|  | 
 | ||||||
|  |     kwargs: dict | ||||||
|  |     uid: tuple[str, str]  # (calling) actor-id | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class IpcCtxSpec(Struct): | class StartAck( | ||||||
|  |     Struct, | ||||||
|  |     tag=True, | ||||||
|  |     tag_field='msg_type', | ||||||
|  | ): | ||||||
|     ''' |     ''' | ||||||
|     An inter-actor-`trio.Task`-comms `Context` spec. |     Init response to a `Cmd` request indicating the far | ||||||
|  |     end's RPC spec, namely its callable "type". | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|  |     cid: str | ||||||
|     # TODO: maybe better names for all these? |     # TODO: maybe better names for all these? | ||||||
|     # -[ ] obvi ^ would need sync with `._rpc` |     # -[ ] obvi ^ would need sync with `._rpc` | ||||||
|     functype: Literal[ |     functype: Literal[ | ||||||
|  | @ -160,18 +245,6 @@ class IpcCtxSpec(Struct): | ||||||
|     # msgspec: MsgSpec |     # msgspec: MsgSpec | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class StartAck( |  | ||||||
|     Msg, |  | ||||||
|     Generic[PayloadT], |  | ||||||
| ): |  | ||||||
|     ''' |  | ||||||
|     Init response to a `Cmd` request indicating the far |  | ||||||
|     end's RPC callable "type". |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     pld: IpcCtxSpec |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| class Started( | class Started( | ||||||
|     Msg, |     Msg, | ||||||
|     Generic[PayloadT], |     Generic[PayloadT], | ||||||
|  | @ -202,13 +275,19 @@ class Yield( | ||||||
|     pld: PayloadT |     pld: PayloadT | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class Stop(Msg): | class Stop( | ||||||
|  |     Struct, | ||||||
|  |     tag=True, | ||||||
|  |     tag_field='msg_type', | ||||||
|  | ): | ||||||
|     ''' |     ''' | ||||||
|     Stream termination signal much like an IPC version  |     Stream termination signal much like an IPC version  | ||||||
|     of `StopAsyncIteration`. |     of `StopAsyncIteration`. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     pld: UnsetType = UNSET |     cid: str | ||||||
|  |     # TODO: do we want to support a payload on stop? | ||||||
|  |     # pld: UnsetType = UNSET | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class Return( | class Return( | ||||||
|  | @ -223,32 +302,33 @@ class Return( | ||||||
|     pld: PayloadT |     pld: PayloadT | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class ErrorData(Struct): | class Error( | ||||||
|  |     Struct, | ||||||
|  |     tag=True, | ||||||
|  |     tag_field='msg_type', | ||||||
|  | ): | ||||||
|     ''' |     ''' | ||||||
|     Remote actor error meta-data as needed originally by |     A pkt that wraps `RemoteActorError`s for relay and raising. | ||||||
|  | 
 | ||||||
|  |     Fields are 1-to-1 meta-data as needed originally by | ||||||
|     `RemoteActorError.msgdata: dict`. |     `RemoteActorError.msgdata: dict`. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     src_uid: str |     src_uid: tuple[str, str] | ||||||
|     src_type_str: str |     src_type_str: str | ||||||
|     boxed_type_str: str |     boxed_type_str: str | ||||||
| 
 |     relay_path: list[tuple[str, str]] | ||||||
|     relay_path: list[str] |  | ||||||
|     tb_str: str |     tb_str: str | ||||||
| 
 | 
 | ||||||
|  |     cid: str|None = None | ||||||
|  | 
 | ||||||
|  |     # TODO: use UNSET or don't include them via | ||||||
|  |     # | ||||||
|     # `ContextCancelled` |     # `ContextCancelled` | ||||||
|     canceller: str|None = None |     canceller: tuple[str, str]|None = None | ||||||
| 
 | 
 | ||||||
|     # `StreamOverrun` |     # `StreamOverrun` | ||||||
|     sender: str|None = None |     sender: tuple[str, str]|None = None | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| class Error(Msg): |  | ||||||
|     ''' |  | ||||||
|     A pkt that wraps `RemoteActorError`s for relay. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     pld: ErrorData |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: should be make a msg version of `ContextCancelled?` | # TODO: should be make a msg version of `ContextCancelled?` | ||||||
|  | @ -265,6 +345,12 @@ class Error(Msg): | ||||||
| # approx order of the IPC txn-state spaces. | # approx order of the IPC txn-state spaces. | ||||||
| __spec__: list[Msg] = [ | __spec__: list[Msg] = [ | ||||||
| 
 | 
 | ||||||
|  |     # identity handshake | ||||||
|  |     Aid, | ||||||
|  | 
 | ||||||
|  |     # spawn specification from parent | ||||||
|  |     SpawnSpec, | ||||||
|  | 
 | ||||||
|     # inter-actor RPC initiation |     # inter-actor RPC initiation | ||||||
|     Start, |     Start, | ||||||
|     StartAck, |     StartAck, | ||||||
|  | @ -280,6 +366,8 @@ __spec__: list[Msg] = [ | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
| _runtime_spec_msgs: list[Msg] = [ | _runtime_spec_msgs: list[Msg] = [ | ||||||
|  |     Aid, | ||||||
|  |     SpawnSpec, | ||||||
|     Start, |     Start, | ||||||
|     StartAck, |     StartAck, | ||||||
|     Stop, |     Stop, | ||||||
|  | @ -443,3 +531,99 @@ def mk_msg_spec( | ||||||
|         pld_spec | runtime_spec, |         pld_spec | runtime_spec, | ||||||
|         msgtypes_table[spec_build_method] + ipc_msg_types, |         msgtypes_table[spec_build_method] + ipc_msg_types, | ||||||
|     ) |     ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: make something similar to this inside `._codec` such that | ||||||
|  | # user can just pass a type table of some sort? | ||||||
|  | # def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: | ||||||
|  | #     ''' | ||||||
|  | #     Deliver a `enc_hook()`/`dec_hook()` pair which does | ||||||
|  | #     manual convertion from our above native `Msg` set | ||||||
|  | #     to `dict` equivalent (wire msgs) in order to keep legacy compat | ||||||
|  | #     with the original runtime implementation. | ||||||
|  | 
 | ||||||
|  | #     Note: this is is/was primarly used while moving the core | ||||||
|  | #     runtime over to using native `Msg`-struct types wherein we | ||||||
|  | #     start with the send side emitting without loading | ||||||
|  | #     a typed-decoder and then later flipping the switch over to | ||||||
|  | #     load to the native struct types once all runtime usage has | ||||||
|  | #     been adjusted appropriately. | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     def enc_to_dict(msg: Any) -> Any: | ||||||
|  | #         ''' | ||||||
|  | #         Encode `Msg`-structs to `dict` msgs instead | ||||||
|  | #         of using `msgspec.msgpack.Decoder.type`-ed | ||||||
|  | #         features. | ||||||
|  | 
 | ||||||
|  | #         ''' | ||||||
|  | #         match msg: | ||||||
|  | #             case Start(): | ||||||
|  | #                 dctmsg: dict = pretty_struct.Struct.to_dict( | ||||||
|  | #                     msg | ||||||
|  | #                 )['pld'] | ||||||
|  | 
 | ||||||
|  | #             case Error(): | ||||||
|  | #                 dctmsg: dict = pretty_struct.Struct.to_dict( | ||||||
|  | #                     msg | ||||||
|  | #                 )['pld'] | ||||||
|  | #                 return {'error': dctmsg} | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | #     def dec_from_dict( | ||||||
|  | #         type: Type, | ||||||
|  | #         obj: Any, | ||||||
|  | #     ) -> Any: | ||||||
|  | #         ''' | ||||||
|  | #         Decode to `Msg`-structs from `dict` msgs instead | ||||||
|  | #         of using `msgspec.msgpack.Decoder.type`-ed | ||||||
|  | #         features. | ||||||
|  | 
 | ||||||
|  | #         ''' | ||||||
|  | #         cid: str = obj.get('cid') | ||||||
|  | #         match obj: | ||||||
|  | #             case {'cmd': pld}: | ||||||
|  | #                 return Start( | ||||||
|  | #                     cid=cid, | ||||||
|  | #                     pld=pld, | ||||||
|  | #                 ) | ||||||
|  | #             case {'functype': pld}: | ||||||
|  | #                 return StartAck( | ||||||
|  | #                     cid=cid, | ||||||
|  | #                     functype=pld, | ||||||
|  | #                     # pld=IpcCtxSpec( | ||||||
|  | #                     #     functype=pld, | ||||||
|  | #                     # ), | ||||||
|  | #                 ) | ||||||
|  | #             case {'started': pld}: | ||||||
|  | #                 return Started( | ||||||
|  | #                     cid=cid, | ||||||
|  | #                     pld=pld, | ||||||
|  | #                 ) | ||||||
|  | #             case {'yield': pld}: | ||||||
|  | #                 return Yield( | ||||||
|  | #                     cid=obj['cid'], | ||||||
|  | #                     pld=pld, | ||||||
|  | #                 ) | ||||||
|  | #             case {'stop': pld}: | ||||||
|  | #                 return Stop( | ||||||
|  | #                     cid=cid, | ||||||
|  | #                 ) | ||||||
|  | #             case {'return': pld}: | ||||||
|  | #                 return Return( | ||||||
|  | #                     cid=cid, | ||||||
|  | #                     pld=pld, | ||||||
|  | #                 ) | ||||||
|  | 
 | ||||||
|  | #             case {'error': pld}: | ||||||
|  | #                 return Error( | ||||||
|  | #                     cid=cid, | ||||||
|  | #                     pld=ErrorData( | ||||||
|  | #                         **pld | ||||||
|  | #                     ), | ||||||
|  | #                 ) | ||||||
|  | 
 | ||||||
|  | #     return ( | ||||||
|  | #         # enc_to_dict, | ||||||
|  | #         dec_from_dict, | ||||||
|  | #     ) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue