Compare commits
	
		
			No commits in common. "cf48fdecfeb76cdf67a3f779d98c086e75659dd6" and "b1fd8b2ec36d12cedc79d39e8da27207b964340f" have entirely different histories. 
		
	
	
		
			cf48fdecfe
			...
			b1fd8b2ec3
		
	
		|  | @ -374,7 +374,7 @@ def enc_type_union( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
| async def send_back_values( | async def send_back_nsp( | ||||||
|     ctx: Context, |     ctx: Context, | ||||||
|     expect_debug: bool, |     expect_debug: bool, | ||||||
|     pld_spec_type_strs: list[str], |     pld_spec_type_strs: list[str], | ||||||
|  | @ -388,8 +388,6 @@ async def send_back_values( | ||||||
|     and ensure we can round trip a func ref with our parent. |     and ensure we can round trip a func ref with our parent. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     uid: tuple = tractor.current_actor().uid |  | ||||||
| 
 |  | ||||||
|     # debug mode sanity check (prolly superfluous but, meh) |     # debug mode sanity check (prolly superfluous but, meh) | ||||||
|     assert expect_debug == _state.debug_mode() |     assert expect_debug == _state.debug_mode() | ||||||
| 
 | 
 | ||||||
|  | @ -416,7 +414,7 @@ async def send_back_values( | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         print( |         print( | ||||||
|             f'{uid}: attempting `Started`-bytes DECODE..\n' |             'CHILD attempting `Started`-bytes DECODE..\n' | ||||||
|         ) |         ) | ||||||
|         try: |         try: | ||||||
|             msg: Started = nsp_codec.decode(started_msg_bytes) |             msg: Started = nsp_codec.decode(started_msg_bytes) | ||||||
|  | @ -438,7 +436,7 @@ async def send_back_values( | ||||||
|                 raise |                 raise | ||||||
|             else: |             else: | ||||||
|                 print( |                 print( | ||||||
|                     f'{uid}: (correctly) unable to DECODE `Started`-bytes\n' |                     'CHILD (correctly) unable to DECODE `Started`-bytes\n' | ||||||
|                     f'{started_msg_bytes}\n' |                     f'{started_msg_bytes}\n' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|  | @ -447,7 +445,7 @@ async def send_back_values( | ||||||
|         for send_value, expect_send in iter_send_val_items: |         for send_value, expect_send in iter_send_val_items: | ||||||
|             try: |             try: | ||||||
|                 print( |                 print( | ||||||
|                     f'{uid}: attempting to `.started({send_value})`\n' |                     f'CHILD attempting to `.started({send_value})`\n' | ||||||
|                     f'=> expect_send: {expect_send}\n' |                     f'=> expect_send: {expect_send}\n' | ||||||
|                     f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n' |                     f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n' | ||||||
|                     f'AND, codec: {codec}\n' |                     f'AND, codec: {codec}\n' | ||||||
|  | @ -462,6 +460,7 @@ async def send_back_values( | ||||||
|                     # await tractor.pause() |                     # await tractor.pause() | ||||||
| 
 | 
 | ||||||
|                     raise RuntimeError( |                     raise RuntimeError( | ||||||
|  |                     # pytest.fail( | ||||||
|                         f'NOT-EXPECTED able to roundtrip value given spec:\n' |                         f'NOT-EXPECTED able to roundtrip value given spec:\n' | ||||||
|                         f'ipc_pld_spec -> {ipc_pld_spec}\n' |                         f'ipc_pld_spec -> {ipc_pld_spec}\n' | ||||||
|                         f'value -> {send_value}: {type(send_value)}\n' |                         f'value -> {send_value}: {type(send_value)}\n' | ||||||
|  | @ -469,76 +468,53 @@ async def send_back_values( | ||||||
| 
 | 
 | ||||||
|                 break  # move on to streaming block.. |                 break  # move on to streaming block.. | ||||||
| 
 | 
 | ||||||
|  |             except NotImplementedError: | ||||||
|  |                 print('FAILED ENCODE!') | ||||||
|  | 
 | ||||||
|             except tractor.MsgTypeError: |             except tractor.MsgTypeError: | ||||||
|                 # await tractor.pause() |                 # await tractor.pause() | ||||||
|                 if expect_send: |                 if expect_send: | ||||||
|                     raise RuntimeError( |                     pytest.fail( | ||||||
|                         f'EXPECTED to `.started()` value given spec:\n' |                         f'EXPECTED to `.started()` value given spec:\n' | ||||||
|                         f'ipc_pld_spec -> {ipc_pld_spec}\n' |                         f'ipc_pld_spec -> {ipc_pld_spec}\n' | ||||||
|                         f'value -> {send_value}: {type(send_value)}\n' |                         f'value -> {send_value}: {type(send_value)}\n' | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|         async with ctx.open_stream() as ipc: |         async with ctx.open_stream() as ipc: | ||||||
|             print( |  | ||||||
|                 f'{uid}: Entering streaming block to send remaining values..' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             for send_value, expect_send in iter_send_val_items: |             for send_value, expect_send in iter_send_val_items: | ||||||
|                 send_type: Type = type(send_value) |                 send_type: Type = type(send_value) | ||||||
|                 print( |                 print( | ||||||
|                     '------ - ------\n' |                     'CHILD report on send value\n' | ||||||
|                     f'{uid}: SENDING NEXT VALUE\n' |  | ||||||
|                     f'ipc_pld_spec: {ipc_pld_spec}\n' |                     f'ipc_pld_spec: {ipc_pld_spec}\n' | ||||||
|                     f'expect_send: {expect_send}\n' |                     f'expect_send: {expect_send}\n' | ||||||
|                     f'val: {send_value}\n' |                     f'val: {send_value}\n' | ||||||
|                     '------ - ------\n' |  | ||||||
|                 ) |                 ) | ||||||
|                 try: |                 try: | ||||||
|                     await ipc.send(send_value) |                     await ipc.send(send_value) | ||||||
|                     print(f'***\n{uid}-CHILD sent {send_value!r}\n***\n') |  | ||||||
|                     sent.append(send_value) |                     sent.append(send_value) | ||||||
| 
 |                     if not expect_send: | ||||||
|                     # NOTE: should only raise above on |                         pytest.fail( | ||||||
|                     # `.started()` or a `Return` |                             f'NOT-EXPECTED able to roundtrip value given spec:\n' | ||||||
|                     # if not expect_send: |                             f'ipc_pld_spec -> {ipc_pld_spec}\n' | ||||||
|                     #     raise RuntimeError( |                             f'value -> {send_value}: {send_type}\n' | ||||||
|                     #         f'NOT-EXPECTED able to roundtrip value given spec:\n' |                         ) | ||||||
|                     #         f'ipc_pld_spec -> {ipc_pld_spec}\n' |  | ||||||
|                     #         f'value -> {send_value}: {send_type}\n' |  | ||||||
|                     #     ) |  | ||||||
| 
 |  | ||||||
|                 except ValidationError: |                 except ValidationError: | ||||||
|                     print(f'{uid} FAILED TO SEND {send_value}!') |  | ||||||
| 
 |  | ||||||
|                     # await tractor.pause() |  | ||||||
|                     if expect_send: |                     if expect_send: | ||||||
|                         raise RuntimeError( |                         pytest.fail( | ||||||
|                             f'EXPECTED to roundtrip value given spec:\n' |                             f'EXPECTED to roundtrip value given spec:\n' | ||||||
|                             f'ipc_pld_spec -> {ipc_pld_spec}\n' |                             f'ipc_pld_spec -> {ipc_pld_spec}\n' | ||||||
|                             f'value -> {send_value}: {send_type}\n' |                             f'value -> {send_value}: {send_type}\n' | ||||||
|                         ) |                         ) | ||||||
|                     # continue |                     continue | ||||||
| 
 | 
 | ||||||
|             else: |         assert ( | ||||||
|                 print( |             len(sent) | ||||||
|                     f'{uid}: finished sending all values\n' |             == | ||||||
|                     'Should be exiting stream block!\n' |             len([val | ||||||
|                 ) |                  for val, expect in | ||||||
| 
 |                  expect_ipc_send.values() | ||||||
|         print(f'{uid}: exited streaming block!') |                  if expect is True]) | ||||||
| 
 |         ) | ||||||
|         # TODO: this won't be true bc in streaming phase we DO NOT |  | ||||||
|         # msgspec check outbound msgs! |  | ||||||
|         # -[ ] once we implement the receiver side `InvalidMsg` |  | ||||||
|         #   then we can expect it here? |  | ||||||
|         # assert ( |  | ||||||
|         #     len(sent) |  | ||||||
|         #     == |  | ||||||
|         #     len([val |  | ||||||
|         #          for val, expect in |  | ||||||
|         #          expect_ipc_send.values() |  | ||||||
|         #          if expect is True]) |  | ||||||
|         # ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def ex_func(*args): | def ex_func(*args): | ||||||
|  | @ -659,7 +635,7 @@ def test_codec_hooks_mod( | ||||||
|                 async with ( |                 async with ( | ||||||
| 
 | 
 | ||||||
|                     p.open_context( |                     p.open_context( | ||||||
|                         send_back_values, |                         send_back_nsp, | ||||||
|                         expect_debug=debug_mode, |                         expect_debug=debug_mode, | ||||||
|                         pld_spec_type_strs=pld_spec_type_strs, |                         pld_spec_type_strs=pld_spec_type_strs, | ||||||
|                         add_hooks=add_codec_hooks, |                         add_hooks=add_codec_hooks, | ||||||
|  | @ -689,13 +665,10 @@ def test_codec_hooks_mod( | ||||||
| 
 | 
 | ||||||
|                     async for next_sent in ipc: |                     async for next_sent in ipc: | ||||||
|                         print( |                         print( | ||||||
|                             'Parent: child sent next value\n' |                             'Child sent next value\n' | ||||||
|                             f'{next_sent}: {type(next_sent)}\n' |                             f'{next_sent}: {type(next_sent)}\n' | ||||||
|                         ) |                         ) | ||||||
|                         if expect_to_send: |                         expect_to_send.remove(next_sent) | ||||||
|                             expect_to_send.remove(next_sent) |  | ||||||
|                         else: |  | ||||||
|                             print('PARENT should terminate stream loop + block!') |  | ||||||
| 
 | 
 | ||||||
|                     # all sent values should have arrived! |                     # all sent values should have arrived! | ||||||
|                     assert not expect_to_send |                     assert not expect_to_send | ||||||
|  |  | ||||||
|  | @ -796,12 +796,10 @@ async def test_callee_cancels_before_started( | ||||||
| 
 | 
 | ||||||
|         # raises a special cancel signal |         # raises a special cancel signal | ||||||
|         except tractor.ContextCancelled as ce: |         except tractor.ContextCancelled as ce: | ||||||
|             _ce = ce  # for debug on crash |  | ||||||
|             ce.boxed_type == trio.Cancelled |             ce.boxed_type == trio.Cancelled | ||||||
| 
 | 
 | ||||||
|             # the traceback should be informative |             # the traceback should be informative | ||||||
|             assert 'itself' in ce.tb_str |             assert 'itself' in ce.msgdata['tb_str'] | ||||||
|             assert ce.tb_str == ce.msgdata['tb_str'] |  | ||||||
| 
 | 
 | ||||||
|         # teardown the actor |         # teardown the actor | ||||||
|         await portal.cancel_actor() |         await portal.cancel_actor() | ||||||
|  | @ -1159,8 +1157,7 @@ def test_maybe_allow_overruns_stream( | ||||||
| 
 | 
 | ||||||
|         elif slow_side == 'parent': |         elif slow_side == 'parent': | ||||||
|             assert err.boxed_type == tractor.RemoteActorError |             assert err.boxed_type == tractor.RemoteActorError | ||||||
|             assert 'StreamOverrun' in err.tb_str |             assert 'StreamOverrun' in err.msgdata['tb_str'] | ||||||
|             assert err.tb_str == err.msgdata['tb_str'] |  | ||||||
| 
 | 
 | ||||||
|     else: |     else: | ||||||
|         # if this hits the logic blocks from above are not |         # if this hits the logic blocks from above are not | ||||||
|  |  | ||||||
|  | @ -185,10 +185,6 @@ async def sleep_a_bit_then_cancel_peer( | ||||||
|         await trio.sleep(cancel_after) |         await trio.sleep(cancel_after) | ||||||
|         await peer.cancel_actor() |         await peer.cancel_actor() | ||||||
| 
 | 
 | ||||||
|         # such that we're cancelled by our rent ctx-task |  | ||||||
|         await trio.sleep(3) |  | ||||||
|         print('CANCELLER RETURNING!') |  | ||||||
| 
 |  | ||||||
| 
 | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
| async def stream_ints( | async def stream_ints( | ||||||
|  | @ -249,12 +245,6 @@ async def stream_from_peer( | ||||||
|         assert peer_ctx._remote_error is ctxerr |         assert peer_ctx._remote_error is ctxerr | ||||||
|         assert peer_ctx._remote_error.msgdata == ctxerr.msgdata |         assert peer_ctx._remote_error.msgdata == ctxerr.msgdata | ||||||
| 
 | 
 | ||||||
|         # XXX YES, bc exact same msg instances |  | ||||||
|         assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg |  | ||||||
| 
 |  | ||||||
|         # XXX NO, bc new one always created for property accesss |  | ||||||
|         assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg |  | ||||||
| 
 |  | ||||||
|         # the peer ctx is the canceller even though it's canceller |         # the peer ctx is the canceller even though it's canceller | ||||||
|         # is the "canceller" XD |         # is the "canceller" XD | ||||||
|         assert peer_name in peer_ctx.canceller |         assert peer_name in peer_ctx.canceller | ||||||
|  |  | ||||||
|  | @ -44,10 +44,9 @@ from ._state import ( | ||||||
|     is_root_process as is_root_process, |     is_root_process as is_root_process, | ||||||
| ) | ) | ||||||
| from ._exceptions import ( | from ._exceptions import ( | ||||||
|     ContextCancelled as ContextCancelled, |  | ||||||
|     ModuleNotExposed as ModuleNotExposed, |  | ||||||
|     MsgTypeError as MsgTypeError, |  | ||||||
|     RemoteActorError as RemoteActorError, |     RemoteActorError as RemoteActorError, | ||||||
|  |     ModuleNotExposed as ModuleNotExposed, | ||||||
|  |     ContextCancelled as ContextCancelled, | ||||||
| ) | ) | ||||||
| from .devx import ( | from .devx import ( | ||||||
|     breakpoint as breakpoint, |     breakpoint as breakpoint, | ||||||
|  |  | ||||||
|  | @ -1207,7 +1207,7 @@ class Context: | ||||||
|                 # XXX: (MEGA IMPORTANT) if this is a root opened process we |                 # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||||
|                 # wait for any immediate child in debug before popping the |                 # wait for any immediate child in debug before popping the | ||||||
|                 # context from the runtime msg loop otherwise inside |                 # context from the runtime msg loop otherwise inside | ||||||
|                 # ``Actor._deliver_ctx_payload()`` the msg will be discarded and in |                 # ``Actor._push_result()`` the msg will be discarded and in | ||||||
|                 # the case where that msg is global debugger unlock (via |                 # the case where that msg is global debugger unlock (via | ||||||
|                 # a "stop" msg for a stream), this can result in a deadlock |                 # a "stop" msg for a stream), this can result in a deadlock | ||||||
|                 # where the root is waiting on the lock to clear but the |                 # where the root is waiting on the lock to clear but the | ||||||
|  | @ -1698,11 +1698,11 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         # raise any msg type error NO MATTER WHAT! |         # raise any msg type error NO MATTER WHAT! | ||||||
|         except msgspec.ValidationError as verr: |         except msgspec.ValidationError as verr: | ||||||
|             from tractor._ipc import _mk_msg_type_err |             from tractor._ipc import _raise_msg_type_err | ||||||
|             raise _mk_msg_type_err( |             _raise_msg_type_err( | ||||||
|                 msg=msg_bytes, |                 msg=msg_bytes, | ||||||
|                 codec=codec, |                 codec=codec, | ||||||
|                 src_validation_error=verr, |                 validation_err=verr, | ||||||
|                 verb_header='Trying to send payload' |                 verb_header='Trying to send payload' | ||||||
|                 # > 'invalid `Started IPC msgs\n' |                 # > 'invalid `Started IPC msgs\n' | ||||||
|             ) |             ) | ||||||
|  | @ -2415,7 +2415,7 @@ async def open_context_from_portal( | ||||||
|         # XXX: (MEGA IMPORTANT) if this is a root opened process we |         # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||||
|         # wait for any immediate child in debug before popping the |         # wait for any immediate child in debug before popping the | ||||||
|         # context from the runtime msg loop otherwise inside |         # context from the runtime msg loop otherwise inside | ||||||
|         # ``Actor._deliver_ctx_payload()`` the msg will be discarded and in |         # ``Actor._push_result()`` the msg will be discarded and in | ||||||
|         # the case where that msg is global debugger unlock (via |         # the case where that msg is global debugger unlock (via | ||||||
|         # a "stop" msg for a stream), this can result in a deadlock |         # a "stop" msg for a stream), this can result in a deadlock | ||||||
|         # where the root is waiting on the lock to clear but the |         # where the root is waiting on the lock to clear but the | ||||||
|  |  | ||||||
|  | @ -31,10 +31,7 @@ import textwrap | ||||||
| import traceback | import traceback | ||||||
| 
 | 
 | ||||||
| import trio | import trio | ||||||
| from msgspec import ( | from msgspec import structs | ||||||
|     structs, |  | ||||||
|     defstruct, |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| from tractor._state import current_actor | from tractor._state import current_actor | ||||||
| from tractor.log import get_logger | from tractor.log import get_logger | ||||||
|  | @ -43,8 +40,6 @@ from tractor.msg import ( | ||||||
|     Msg, |     Msg, | ||||||
|     Stop, |     Stop, | ||||||
|     Yield, |     Yield, | ||||||
|     pretty_struct, |  | ||||||
|     types as msgtypes, |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||||
|  | @ -69,38 +64,21 @@ class InternalError(RuntimeError): | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
| 
 | 
 | ||||||
|  | _body_fields: list[str] = [ | ||||||
|  |     'boxed_type', | ||||||
|  |     'src_type', | ||||||
|  |     # TODO: format this better if we're going to include it. | ||||||
|  |     # 'relay_path', | ||||||
|  |     'src_uid', | ||||||
| 
 | 
 | ||||||
| # NOTE: more or less should be close to these: |     # only in sub-types | ||||||
| # 'boxed_type', |     'canceller', | ||||||
| # 'src_type', |     'sender', | ||||||
| # 'src_uid', |  | ||||||
| # 'canceller', |  | ||||||
| # 'sender', |  | ||||||
| # TODO: format this better if we're going to include it. |  | ||||||
| # 'relay_path', |  | ||||||
| # |  | ||||||
| _ipcmsg_keys: list[str] = [ |  | ||||||
|     fi.name |  | ||||||
|     for fi, k, v |  | ||||||
|     in pretty_struct.iter_fields(Error) |  | ||||||
| 
 |  | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
| _body_fields: list[str] = list( | _msgdata_keys: list[str] = [ | ||||||
|     set(_ipcmsg_keys) |     'boxed_type_str', | ||||||
| 
 | ] + _body_fields | ||||||
|     # NOTE: don't show fields that either don't provide |  | ||||||
|     # any extra useful info or that are already shown |  | ||||||
|     # as part of `.__repr__()` output. |  | ||||||
|     - { |  | ||||||
|         'src_type_str', |  | ||||||
|         'boxed_type_str', |  | ||||||
|         'tb_str', |  | ||||||
|         'relay_path', |  | ||||||
|         '_msg_dict', |  | ||||||
|         'cid', |  | ||||||
|     } |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_err_type(type_name: str) -> BaseException|None: | def get_err_type(type_name: str) -> BaseException|None: | ||||||
|  | @ -159,7 +137,7 @@ def pformat_boxed_tb( | ||||||
|         f'|\n' |         f'|\n' | ||||||
|         f' ------ - ------\n\n' |         f' ------ - ------\n\n' | ||||||
|         f'{tb_str}\n' |         f'{tb_str}\n' | ||||||
|         f'  ------ - ------\n' |         f' ------ - ------\n' | ||||||
|         f'_|\n' |         f'_|\n' | ||||||
|     ) |     ) | ||||||
|     if len(indent): |     if len(indent): | ||||||
|  | @ -174,40 +152,10 @@ def pformat_boxed_tb( | ||||||
|         + |         + | ||||||
|         body |         body | ||||||
|     ) |     ) | ||||||
|  |     # return body | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def pack_from_raise( | # TODO: rename to just `RemoteError`? | ||||||
|     local_err: ( |  | ||||||
|         ContextCancelled |  | ||||||
|         |StreamOverrun |  | ||||||
|         |MsgTypeError |  | ||||||
|     ), |  | ||||||
|     cid: str, |  | ||||||
| 
 |  | ||||||
|     **rae_fields, |  | ||||||
| 
 |  | ||||||
| ) -> Error: |  | ||||||
|     ''' |  | ||||||
|     Raise the provided `RemoteActorError` subtype exception |  | ||||||
|     instance locally to get a traceback and pack it into an IPC |  | ||||||
|     `Error`-msg using `pack_error()` to extract the tb info. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     try: |  | ||||||
|         raise local_err |  | ||||||
|     except type(local_err) as local_err: |  | ||||||
|         err_msg: dict[str, dict] = pack_error( |  | ||||||
|             local_err, |  | ||||||
|             cid=cid, |  | ||||||
|             **rae_fields, |  | ||||||
|         ) |  | ||||||
|         return err_msg |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # TODO: better compat with IPC msg structs? |  | ||||||
| # -[ ] rename to just `RemoteError` like in `mp.manager`? |  | ||||||
| # -[ ] make a `Struct`-subtype by using the .__post_init__()`? |  | ||||||
| #  https://jcristharif.com/msgspec/structs.html#post-init-processing |  | ||||||
| class RemoteActorError(Exception): | class RemoteActorError(Exception): | ||||||
|     ''' |     ''' | ||||||
|     A box(ing) type which bundles a remote actor `BaseException` for |     A box(ing) type which bundles a remote actor `BaseException` for | ||||||
|  | @ -222,28 +170,12 @@ class RemoteActorError(Exception): | ||||||
|         'src_uid', |         'src_uid', | ||||||
|         # 'relay_path', |         # 'relay_path', | ||||||
|     ] |     ] | ||||||
|     extra_body_fields: list[str] = [ |  | ||||||
|         'cid', |  | ||||||
|         'boxed_type', |  | ||||||
|     ] |  | ||||||
| 
 | 
 | ||||||
|     def __init__( |     def __init__( | ||||||
|         self, |         self, | ||||||
|         message: str, |         message: str, | ||||||
|         ipc_msg: Error|None = None, |  | ||||||
|         boxed_type: Type[BaseException]|None = None, |         boxed_type: Type[BaseException]|None = None, | ||||||
| 
 |         **msgdata | ||||||
|         # NOTE: only provided by subtypes (ctxc and overruns) |  | ||||||
|         # wishing to both manually instantiate and add field |  | ||||||
|         # values defined on `Error` without having to construct an |  | ||||||
|         # `Error()` before the exception is processed by |  | ||||||
|         # `pack_error()`. |  | ||||||
|         # |  | ||||||
|         # TODO: a better way to support this without the extra |  | ||||||
|         # private `._extra_msgdata`? |  | ||||||
|         # -[ ] ctxc constructed inside `._rpc._invoke()` L:638 |  | ||||||
|         # -[ ] overrun @ `._context.Context._deliver_msg()` L:1958 |  | ||||||
|         **extra_msgdata, |  | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         super().__init__(message) |         super().__init__(message) | ||||||
|  | @ -256,24 +188,14 @@ class RemoteActorError(Exception): | ||||||
|         # - .remote_type |         # - .remote_type | ||||||
|         # also pertains to our long long oustanding issue XD |         # also pertains to our long long oustanding issue XD | ||||||
|         # https://github.com/goodboy/tractor/issues/5 |         # https://github.com/goodboy/tractor/issues/5 | ||||||
|  |         # | ||||||
|  |         # TODO: always set ._boxed_type` as `None` by default | ||||||
|  |         # and instead render if from `.boxed_type_str`? | ||||||
|         self._boxed_type: BaseException = boxed_type |         self._boxed_type: BaseException = boxed_type | ||||||
|         self._src_type: BaseException|None = None |         self._src_type: BaseException|None = None | ||||||
|         self._ipc_msg: Error|None = ipc_msg |  | ||||||
| 
 | 
 | ||||||
|         if ( |         # TODO: make this a `.errmsg: Error` throughout? | ||||||
|             extra_msgdata |         self.msgdata: dict[str, Any] = msgdata | ||||||
|             and ipc_msg |  | ||||||
|         ): |  | ||||||
|             # XXX mutate the orig msg directly from |  | ||||||
|             # manually provided input params. |  | ||||||
|             for k, v in extra_msgdata.items(): |  | ||||||
|                 setattr( |  | ||||||
|                     self._ipc_msg, |  | ||||||
|                     k, |  | ||||||
|                     v, |  | ||||||
|                 ) |  | ||||||
|         else: |  | ||||||
|             self._extra_msgdata = extra_msgdata |  | ||||||
| 
 | 
 | ||||||
|         # TODO: mask out eventually or place in `pack_error()` |         # TODO: mask out eventually or place in `pack_error()` | ||||||
|         # pre-`return` lines? |         # pre-`return` lines? | ||||||
|  | @ -292,56 +214,14 @@ class RemoteActorError(Exception): | ||||||
|         # either by customizing `ContextCancelled.__init__()` or |         # either by customizing `ContextCancelled.__init__()` or | ||||||
|         # through a special factor func? |         # through a special factor func? | ||||||
|         elif boxed_type: |         elif boxed_type: | ||||||
|             boxed_type_str: str = type(boxed_type).__name__ |             if not self.msgdata.get('boxed_type_str'): | ||||||
|             if ( |                 self.msgdata['boxed_type_str'] = str( | ||||||
|                 ipc_msg |                     type(boxed_type).__name__ | ||||||
|                 and not self._ipc_msg.boxed_type_str |                 ) | ||||||
|             ): |  | ||||||
|                 self._ipc_msg.boxed_type_str = boxed_type_str |  | ||||||
|                 assert self.boxed_type_str == self._ipc_msg.boxed_type_str |  | ||||||
| 
 |  | ||||||
|             else: |  | ||||||
|                 self._extra_msgdata['boxed_type_str'] = boxed_type_str |  | ||||||
| 
 | 
 | ||||||
|  |             assert self.boxed_type_str == self.msgdata['boxed_type_str'] | ||||||
|             assert self.boxed_type is boxed_type |             assert self.boxed_type is boxed_type | ||||||
| 
 | 
 | ||||||
|     @property |  | ||||||
|     def ipc_msg(self) -> pretty_struct.Struct: |  | ||||||
|         ''' |  | ||||||
|         Re-render the underlying `._ipc_msg: Msg` as |  | ||||||
|         a `pretty_struct.Struct` for introspection such that the |  | ||||||
|         returned value is a read-only copy of the original. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         if self._ipc_msg is None: |  | ||||||
|             return None |  | ||||||
| 
 |  | ||||||
|         msg_type: Msg = type(self._ipc_msg) |  | ||||||
|         fields: dict[str, Any] = { |  | ||||||
|             k: v for _, k, v in |  | ||||||
|             pretty_struct.iter_fields(self._ipc_msg) |  | ||||||
|         } |  | ||||||
|         return defstruct( |  | ||||||
|             msg_type.__name__, |  | ||||||
|             fields=fields.keys(), |  | ||||||
|             bases=(msg_type, pretty_struct.Struct), |  | ||||||
|         )(**fields) |  | ||||||
| 
 |  | ||||||
|     @property |  | ||||||
|     def msgdata(self) -> dict[str, Any]: |  | ||||||
|         ''' |  | ||||||
|         The (remote) error data provided by a merge of the |  | ||||||
|         `._ipc_msg: Error` msg and any input `._extra_msgdata: dict` |  | ||||||
|         (provided by subtypes via `.__init__()`). |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         msgdata: dict = ( |  | ||||||
|             structs.asdict(self._ipc_msg) |  | ||||||
|             if self._ipc_msg |  | ||||||
|             else {} |  | ||||||
|         ) |  | ||||||
|         return self._extra_msgdata | msgdata |  | ||||||
| 
 |  | ||||||
|     @property |     @property | ||||||
|     def src_type_str(self) -> str: |     def src_type_str(self) -> str: | ||||||
|         ''' |         ''' | ||||||
|  | @ -351,7 +231,7 @@ class RemoteActorError(Exception): | ||||||
|         at the first relay/hop's receiving actor. |         at the first relay/hop's receiving actor. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         return self._ipc_msg.src_type_str |         return self.msgdata['src_type_str'] | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def src_type(self) -> str: |     def src_type(self) -> str: | ||||||
|  | @ -361,7 +241,7 @@ class RemoteActorError(Exception): | ||||||
|         ''' |         ''' | ||||||
|         if self._src_type is None: |         if self._src_type is None: | ||||||
|             self._src_type = get_err_type( |             self._src_type = get_err_type( | ||||||
|                 self._ipc_msg.src_type_str |                 self.msgdata['src_type_str'] | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         return self._src_type |         return self._src_type | ||||||
|  | @ -372,7 +252,7 @@ class RemoteActorError(Exception): | ||||||
|         String-name of the (last hop's) boxed error type. |         String-name of the (last hop's) boxed error type. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         return self._ipc_msg.boxed_type_str |         return self.msgdata['boxed_type_str'] | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def boxed_type(self) -> str: |     def boxed_type(self) -> str: | ||||||
|  | @ -382,7 +262,7 @@ class RemoteActorError(Exception): | ||||||
|         ''' |         ''' | ||||||
|         if self._boxed_type is None: |         if self._boxed_type is None: | ||||||
|             self._boxed_type = get_err_type( |             self._boxed_type = get_err_type( | ||||||
|                 self._ipc_msg.boxed_type_str |                 self.msgdata['boxed_type_str'] | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         return self._boxed_type |         return self._boxed_type | ||||||
|  | @ -395,44 +275,40 @@ class RemoteActorError(Exception): | ||||||
|         actor's hop. |         actor's hop. | ||||||
| 
 | 
 | ||||||
|         NOTE: a `list` field with the same name is expected to be |         NOTE: a `list` field with the same name is expected to be | ||||||
|         passed/updated in `.ipc_msg`. |         passed/updated in `.msgdata`. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         return self._ipc_msg.relay_path |         return self.msgdata['relay_path'] | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def relay_uid(self) -> tuple[str, str]|None: |     def relay_uid(self) -> tuple[str, str]|None: | ||||||
|         return tuple( |         return tuple( | ||||||
|             self._ipc_msg.relay_path[-1] |             self.msgdata['relay_path'][-1] | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def src_uid(self) -> tuple[str, str]|None: |     def src_uid(self) -> tuple[str, str]|None: | ||||||
|         if src_uid := ( |         if src_uid := ( | ||||||
|             self._ipc_msg.src_uid |             self.msgdata.get('src_uid') | ||||||
|         ): |         ): | ||||||
|             return tuple(src_uid) |             return tuple(src_uid) | ||||||
|         # TODO: use path lookup instead? |         # TODO: use path lookup instead? | ||||||
|         # return tuple( |         # return tuple( | ||||||
|         #     self._ipc_msg.relay_path[0] |         #     self.msgdata['relay_path'][0] | ||||||
|         # ) |         # ) | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def tb_str( |     def tb_str( | ||||||
|         self, |         self, | ||||||
|         indent: str = '', |         indent: str = ' ', | ||||||
|     ) -> str: |     ) -> str: | ||||||
|         remote_tb: str = '' |         if remote_tb := self.msgdata.get('tb_str'): | ||||||
|  |             return textwrap.indent( | ||||||
|  |                 remote_tb, | ||||||
|  |                 prefix=indent, | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|         if self._ipc_msg: |         return '' | ||||||
|             remote_tb: str = self._ipc_msg.tb_str |  | ||||||
|         else: |  | ||||||
|             remote_tb = self.msgdata.get('tb_str') |  | ||||||
| 
 |  | ||||||
|         return textwrap.indent( |  | ||||||
|             remote_tb or '', |  | ||||||
|             prefix=indent, |  | ||||||
|         ) |  | ||||||
| 
 | 
 | ||||||
|     def _mk_fields_str( |     def _mk_fields_str( | ||||||
|         self, |         self, | ||||||
|  | @ -444,17 +320,14 @@ class RemoteActorError(Exception): | ||||||
|             val: Any|None = ( |             val: Any|None = ( | ||||||
|                 getattr(self, key, None) |                 getattr(self, key, None) | ||||||
|                 or |                 or | ||||||
|                 getattr( |                 self.msgdata.get(key) | ||||||
|                     self._ipc_msg, |  | ||||||
|                     key, |  | ||||||
|                     None, |  | ||||||
|                 ) |  | ||||||
|             ) |             ) | ||||||
|             # TODO: for `.relay_path` on multiline? |             # TODO: for `.relay_path` on multiline? | ||||||
|             # if not isinstance(val, str): |             # if not isinstance(val, str): | ||||||
|             #     val_str = pformat(val) |             #     val_str = pformat(val) | ||||||
|             # else: |             # else: | ||||||
|             val_str: str = repr(val) |             val_str: str = repr(val) | ||||||
|  | 
 | ||||||
|             if val: |             if val: | ||||||
|                 _repr += f'{key}={val_str}{end_char}' |                 _repr += f'{key}={val_str}{end_char}' | ||||||
| 
 | 
 | ||||||
|  | @ -485,9 +358,7 @@ class RemoteActorError(Exception): | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         fields: str = self._mk_fields_str( |         fields: str = self._mk_fields_str( | ||||||
|             _body_fields |             _body_fields, | ||||||
|             + |  | ||||||
|             self.extra_body_fields, |  | ||||||
|         ) |         ) | ||||||
|         body: str = pformat_boxed_tb( |         body: str = pformat_boxed_tb( | ||||||
|             tb_str=self.tb_str, |             tb_str=self.tb_str, | ||||||
|  | @ -544,6 +415,15 @@ class RemoteActorError(Exception): | ||||||
|     #     raise NotImplementedError |     #     raise NotImplementedError | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | class InternalActorError(RemoteActorError): | ||||||
|  |     ''' | ||||||
|  |     (Remote) internal `tractor` error indicating failure of some | ||||||
|  |     primitive, machinery state or lowlevel task that should never | ||||||
|  |     occur. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| class ContextCancelled(RemoteActorError): | class ContextCancelled(RemoteActorError): | ||||||
|     ''' |     ''' | ||||||
|     Inter-actor task context was cancelled by either a call to |     Inter-actor task context was cancelled by either a call to | ||||||
|  | @ -553,10 +433,6 @@ class ContextCancelled(RemoteActorError): | ||||||
|     reprol_fields: list[str] = [ |     reprol_fields: list[str] = [ | ||||||
|         'canceller', |         'canceller', | ||||||
|     ] |     ] | ||||||
|     extra_body_fields: list[str] = [ |  | ||||||
|         'cid', |  | ||||||
|         'canceller', |  | ||||||
|     ] |  | ||||||
|     @property |     @property | ||||||
|     def canceller(self) -> tuple[str, str]|None: |     def canceller(self) -> tuple[str, str]|None: | ||||||
|         ''' |         ''' | ||||||
|  | @ -578,7 +454,7 @@ class ContextCancelled(RemoteActorError): | ||||||
|           |_`._cancel_task()` |           |_`._cancel_task()` | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         value: tuple[str, str]|None = self._ipc_msg.canceller |         value = self.msgdata.get('canceller') | ||||||
|         if value: |         if value: | ||||||
|             return tuple(value) |             return tuple(value) | ||||||
| 
 | 
 | ||||||
|  | @ -592,132 +468,6 @@ class ContextCancelled(RemoteActorError): | ||||||
|     # src_actor_uid = canceller |     # src_actor_uid = canceller | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class MsgTypeError( |  | ||||||
|     RemoteActorError, |  | ||||||
| ): |  | ||||||
|     ''' |  | ||||||
|     Equivalent of a runtime `TypeError` for IPC dialogs. |  | ||||||
| 
 |  | ||||||
|     Raise when any IPC wire-message is decoded to have invalid |  | ||||||
|     field values (due to type) or for other `MsgCodec` related |  | ||||||
|     violations such as having no extension-type for a field with |  | ||||||
|     a custom type but no `enc/dec_hook()` support. |  | ||||||
| 
 |  | ||||||
|     Can be raised on the send or recv side of an IPC `Channel` |  | ||||||
|     depending on the particular msg. |  | ||||||
| 
 |  | ||||||
|     Msgs which cause this to be raised on the `.send()` side (aka |  | ||||||
|     in the "ctl" dialog phase) include: |  | ||||||
|     - `Start` |  | ||||||
|     - `Started` |  | ||||||
|     - `Return` |  | ||||||
| 
 |  | ||||||
|     Those which cause it on on the `.recv()` side (aka the "nasty |  | ||||||
|     streaming" dialog phase) are: |  | ||||||
|     - `Yield` |  | ||||||
|     - TODO: any embedded `.pld` type defined by user code? |  | ||||||
| 
 |  | ||||||
|     Normally the source of an error is re-raised from some `.msg._codec` |  | ||||||
|     decode which itself raises in a backend interchange |  | ||||||
|     lib (eg. a `msgspec.ValidationError`). |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     reprol_fields: list[str] = [ |  | ||||||
|         'ipc_msg', |  | ||||||
|     ] |  | ||||||
|     extra_body_fields: list[str] = [ |  | ||||||
|         'cid', |  | ||||||
|         'payload_msg', |  | ||||||
|     ] |  | ||||||
| 
 |  | ||||||
|     @property |  | ||||||
|     def msg_dict(self) -> dict[str, Any]: |  | ||||||
|         ''' |  | ||||||
|         If the underlying IPC `Msg` was received from a remote |  | ||||||
|         actor but was unable to be decoded to a native |  | ||||||
|         `Yield`|`Started`|`Return` struct, the interchange backend |  | ||||||
|         native format decoder can be used to stash a `dict` |  | ||||||
|         version for introspection by the invalidating RPC task. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         return self.msgdata.get('_msg_dict') |  | ||||||
| 
 |  | ||||||
|     @property |  | ||||||
|     def payload_msg(self) -> Msg|None: |  | ||||||
|         ''' |  | ||||||
|         Attempt to construct what would have been the original |  | ||||||
|         `Msg`-with-payload subtype (i.e. an instance from the set |  | ||||||
|         of msgs in `.msg.types._payload_msgs`) which failed |  | ||||||
|         validation. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         msg_dict: dict = self.msg_dict.copy() |  | ||||||
|         name: str = msg_dict.pop('msg_type') |  | ||||||
|         msg_type: Msg = getattr( |  | ||||||
|             msgtypes, |  | ||||||
|             name, |  | ||||||
|             Msg, |  | ||||||
|         ) |  | ||||||
|         return msg_type(**msg_dict) |  | ||||||
| 
 |  | ||||||
|     @property |  | ||||||
|     def cid(self) -> str: |  | ||||||
|         # pre-packed using `.from_decode()` constructor |  | ||||||
|         return self.msgdata.get('cid') |  | ||||||
| 
 |  | ||||||
|     @classmethod |  | ||||||
|     def from_decode( |  | ||||||
|         cls, |  | ||||||
|         message: str, |  | ||||||
|         msgdict: dict, |  | ||||||
| 
 |  | ||||||
|     ) -> MsgTypeError: |  | ||||||
|         return cls( |  | ||||||
|             message=message, |  | ||||||
| 
 |  | ||||||
|             # NOTE: original "vanilla decode" of the msg-bytes |  | ||||||
|             # is placed inside a value readable from |  | ||||||
|             # `.msgdata['_msg_dict']` |  | ||||||
|             _msg_dict=msgdict, |  | ||||||
| 
 |  | ||||||
|             # expand and pack all RAE compat fields |  | ||||||
|             # into the `._extra_msgdata` aux `dict`. |  | ||||||
|             **{ |  | ||||||
|                 k: v |  | ||||||
|                 for k, v in msgdict.items() |  | ||||||
|                 if k in _ipcmsg_keys |  | ||||||
|             }, |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| class StreamOverrun( |  | ||||||
|     RemoteActorError, |  | ||||||
|     trio.TooSlowError, |  | ||||||
| ): |  | ||||||
|     reprol_fields: list[str] = [ |  | ||||||
|         'sender', |  | ||||||
|     ] |  | ||||||
|     ''' |  | ||||||
|     This stream was overrun by its sender and can be optionally |  | ||||||
|     handled by app code using `MsgStream.send()/.receive()`. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     @property |  | ||||||
|     def sender(self) -> tuple[str, str] | None: |  | ||||||
|         value = self._ipc_msg.sender |  | ||||||
|         if value: |  | ||||||
|             return tuple(value) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # class InternalActorError(RemoteActorError): |  | ||||||
| #     ''' |  | ||||||
| #     Boxed (Remote) internal `tractor` error indicating failure of some |  | ||||||
| #     primitive, machinery state or lowlevel task that should never |  | ||||||
| #     occur. |  | ||||||
| 
 |  | ||||||
| #     ''' |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| class TransportClosed(trio.ClosedResourceError): | class TransportClosed(trio.ClosedResourceError): | ||||||
|     "Underlying channel transport was closed prior to use" |     "Underlying channel transport was closed prior to use" | ||||||
| 
 | 
 | ||||||
|  | @ -734,6 +484,23 @@ class NoRuntime(RuntimeError): | ||||||
|     "The root actor has not been initialized yet" |     "The root actor has not been initialized yet" | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | class StreamOverrun( | ||||||
|  |     RemoteActorError, | ||||||
|  |     trio.TooSlowError, | ||||||
|  | ): | ||||||
|  |     reprol_fields: list[str] = [ | ||||||
|  |         'sender', | ||||||
|  |     ] | ||||||
|  |     ''' | ||||||
|  |     This stream was overrun by sender | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     @property | ||||||
|  |     def sender(self) -> tuple[str, str] | None: | ||||||
|  |         value = self.msgdata.get('sender') | ||||||
|  |         if value: | ||||||
|  |             return tuple(value) | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class AsyncioCancelled(Exception): | class AsyncioCancelled(Exception): | ||||||
|     ''' |     ''' | ||||||
|  | @ -751,12 +518,23 @@ class MessagingError(Exception): | ||||||
|     ''' |     ''' | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | class MsgTypeError(MessagingError): | ||||||
|  |     ''' | ||||||
|  |     Equivalent of a `TypeError` for an IPC wire-message | ||||||
|  |     due to an invalid field value (type). | ||||||
|  | 
 | ||||||
|  |     Normally this is re-raised from some `.msg._codec` | ||||||
|  |     decode error raised by a backend interchange lib | ||||||
|  |     like `msgspec` or `pycapnproto`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def pack_error( | def pack_error( | ||||||
|     exc: BaseException|RemoteActorError, |     exc: BaseException|RemoteActorError, | ||||||
| 
 | 
 | ||||||
|     tb: str|None = None, |     tb: str|None = None, | ||||||
|     cid: str|None = None, |     cid: str|None = None, | ||||||
|     src_uid: tuple[str, str]|None = None, |  | ||||||
| 
 | 
 | ||||||
| ) -> Error: | ) -> Error: | ||||||
|     ''' |     ''' | ||||||
|  | @ -782,8 +560,7 @@ def pack_error( | ||||||
|     ): |     ): | ||||||
|         error_msg.update(exc.msgdata) |         error_msg.update(exc.msgdata) | ||||||
| 
 | 
 | ||||||
|     # an onion/inception we need to pack as a nested and relayed |     # an onion/inception we need to pack | ||||||
|     # remotely boxed error. |  | ||||||
|     if ( |     if ( | ||||||
|         type(exc) is RemoteActorError |         type(exc) is RemoteActorError | ||||||
|         and (boxed := exc.boxed_type) |         and (boxed := exc.boxed_type) | ||||||
|  | @ -807,7 +584,7 @@ def pack_error( | ||||||
|         error_msg['boxed_type_str'] = 'RemoteActorError' |         error_msg['boxed_type_str'] = 'RemoteActorError' | ||||||
| 
 | 
 | ||||||
|     else: |     else: | ||||||
|         error_msg['src_uid'] = src_uid or our_uid |         error_msg['src_uid'] = our_uid | ||||||
|         error_msg['src_type_str'] =  type(exc).__name__ |         error_msg['src_type_str'] =  type(exc).__name__ | ||||||
|         error_msg['boxed_type_str'] = type(exc).__name__ |         error_msg['boxed_type_str'] = type(exc).__name__ | ||||||
| 
 | 
 | ||||||
|  | @ -819,7 +596,7 @@ def pack_error( | ||||||
| 
 | 
 | ||||||
|     # XXX NOTE: always ensure the traceback-str is from the |     # XXX NOTE: always ensure the traceback-str is from the | ||||||
|     # locally raised error (**not** the prior relay's boxed |     # locally raised error (**not** the prior relay's boxed | ||||||
|     # content's in `._ipc_msg.tb_str`). |     # content's `.msgdata`). | ||||||
|     error_msg['tb_str'] = tb_str |     error_msg['tb_str'] = tb_str | ||||||
| 
 | 
 | ||||||
|     if cid is not None: |     if cid is not None: | ||||||
|  | @ -829,7 +606,7 @@ def pack_error( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def unpack_error( | def unpack_error( | ||||||
|     msg: Error, |     msg: dict[str, Any]|Error, | ||||||
| 
 | 
 | ||||||
|     chan: Channel|None = None, |     chan: Channel|None = None, | ||||||
|     box_type: RemoteActorError = RemoteActorError, |     box_type: RemoteActorError = RemoteActorError, | ||||||
|  | @ -847,10 +624,16 @@ def unpack_error( | ||||||
|     ''' |     ''' | ||||||
|     __tracebackhide__: bool = hide_tb |     __tracebackhide__: bool = hide_tb | ||||||
| 
 | 
 | ||||||
|  |     error_dict: dict[str, dict]|None | ||||||
|     if not isinstance(msg, Error): |     if not isinstance(msg, Error): | ||||||
|  |     # if ( | ||||||
|  |     #     error_dict := msg.get('error') | ||||||
|  |     # ) is None: | ||||||
|  |         # no error field, nothing to unpack. | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
|     # retrieve the remote error's encoded details from fields |     # retrieve the remote error's msg encoded details | ||||||
|  |     # tb_str: str = error_dict.get('tb_str', '') | ||||||
|     tb_str: str = msg.tb_str |     tb_str: str = msg.tb_str | ||||||
|     message: str = ( |     message: str = ( | ||||||
|         f'{chan.uid}\n' |         f'{chan.uid}\n' | ||||||
|  | @ -868,10 +651,6 @@ def unpack_error( | ||||||
|         box_type = ContextCancelled |         box_type = ContextCancelled | ||||||
|         assert boxed_type is box_type |         assert boxed_type is box_type | ||||||
| 
 | 
 | ||||||
|     elif boxed_type_str == 'MsgTypeError': |  | ||||||
|         box_type = MsgTypeError |  | ||||||
|         assert boxed_type is box_type |  | ||||||
| 
 |  | ||||||
|     # TODO: already included by `_this_mod` in else loop right? |     # TODO: already included by `_this_mod` in else loop right? | ||||||
|     # |     # | ||||||
|     # we have an inception/onion-error so ensure |     # we have an inception/onion-error so ensure | ||||||
|  | @ -882,9 +661,12 @@ def unpack_error( | ||||||
|         # assert len(error_dict['relay_path']) >= 1 |         # assert len(error_dict['relay_path']) >= 1 | ||||||
|         assert len(msg.relay_path) >= 1 |         assert len(msg.relay_path) >= 1 | ||||||
| 
 | 
 | ||||||
|  |     # TODO: mk RAE just take the `Error` instance directly? | ||||||
|  |     error_dict: dict = structs.asdict(msg) | ||||||
|  | 
 | ||||||
|     exc = box_type( |     exc = box_type( | ||||||
|         message, |         message, | ||||||
|         ipc_msg=msg, |         **error_dict, | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     return exc |     return exc | ||||||
|  |  | ||||||
							
								
								
									
										175
									
								
								tractor/_ipc.py
								
								
								
								
							
							
						
						
									
										175
									
								
								tractor/_ipc.py
								
								
								
								
							|  | @ -54,8 +54,7 @@ from tractor.msg import ( | ||||||
|     _ctxvar_MsgCodec, |     _ctxvar_MsgCodec, | ||||||
|     _codec, |     _codec, | ||||||
|     MsgCodec, |     MsgCodec, | ||||||
|     types as msgtypes, |     types, | ||||||
|     pretty_struct, |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
|  | @ -73,7 +72,6 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple: | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: this should be our `Union[*msgtypes.__spec__]` now right? |  | ||||||
| MsgType = TypeVar("MsgType") | MsgType = TypeVar("MsgType") | ||||||
| 
 | 
 | ||||||
| # TODO: consider using a generic def and indexing with our eventual | # TODO: consider using a generic def and indexing with our eventual | ||||||
|  | @ -118,74 +116,6 @@ class MsgTransport(Protocol[MsgType]): | ||||||
|         ... |         ... | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _raise_msg_type_err( |  | ||||||
|     msg: Any|bytes, |  | ||||||
|     codec: MsgCodec, |  | ||||||
|     validation_err: msgspec.ValidationError|None = None, |  | ||||||
|     verb_header: str = '', |  | ||||||
| 
 |  | ||||||
| ) -> None: |  | ||||||
| 
 |  | ||||||
|     # if side == 'send': |  | ||||||
|     if validation_err is None:  # send-side |  | ||||||
| 
 |  | ||||||
|         import traceback |  | ||||||
|         from tractor._exceptions import pformat_boxed_tb |  | ||||||
| 
 |  | ||||||
|         fmt_spec: str = '\n'.join( |  | ||||||
|             map(str, codec.msg_spec.__args__) |  | ||||||
|         ) |  | ||||||
|         fmt_stack: str = ( |  | ||||||
|             '\n'.join(traceback.format_stack(limit=3)) |  | ||||||
|         ) |  | ||||||
|         tb_fmt: str = pformat_boxed_tb( |  | ||||||
|             tb_str=fmt_stack, |  | ||||||
|             # fields_str=header, |  | ||||||
|             field_prefix='  ', |  | ||||||
|             indent='', |  | ||||||
|         ) |  | ||||||
|         raise MsgTypeError( |  | ||||||
|             f'invalid msg -> {msg}: {type(msg)}\n\n' |  | ||||||
|             f'{tb_fmt}\n' |  | ||||||
|             f'Valid IPC msgs are:\n\n' |  | ||||||
|             # f'  ------ - ------\n' |  | ||||||
|             f'{fmt_spec}\n' |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|     else: |  | ||||||
|         # decode the msg-bytes using the std msgpack |  | ||||||
|         # interchange-prot (i.e. without any |  | ||||||
|         # `msgspec.Struct` handling) so that we can |  | ||||||
|         # determine what `.msg.types.Msg` is the culprit |  | ||||||
|         # by reporting the received value. |  | ||||||
|         msg_dict: dict = msgspec.msgpack.decode(msg) |  | ||||||
|         msg_type_name: str = msg_dict['msg_type'] |  | ||||||
|         msg_type = getattr(msgtypes, msg_type_name) |  | ||||||
|         errmsg: str = ( |  | ||||||
|             f'invalid `{msg_type_name}` IPC msg\n\n' |  | ||||||
|         ) |  | ||||||
|         if verb_header: |  | ||||||
|             errmsg = f'{verb_header} ' + errmsg |  | ||||||
| 
 |  | ||||||
|         # XXX see if we can determine the exact invalid field |  | ||||||
|         # such that we can comprehensively report the |  | ||||||
|         # specific field's type problem |  | ||||||
|         msgspec_msg: str = validation_err.args[0].rstrip('`') |  | ||||||
|         msg, _, maybe_field = msgspec_msg.rpartition('$.') |  | ||||||
|         obj = object() |  | ||||||
|         if (field_val := msg_dict.get(maybe_field, obj)) is not obj: |  | ||||||
|             field_type: Union[Type] = msg_type.__signature__.parameters[ |  | ||||||
|                 maybe_field |  | ||||||
|             ].annotation |  | ||||||
|             errmsg += ( |  | ||||||
|                 f'{msg.rstrip("`")}\n\n' |  | ||||||
|                 f'{msg_type}\n' |  | ||||||
|                 f' |_.{maybe_field}: {field_type} = {field_val!r}\n' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         raise MsgTypeError(errmsg) from validation_err |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # TODO: not sure why we have to inherit here, but it seems to be an | # TODO: not sure why we have to inherit here, but it seems to be an | ||||||
| # issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; | # issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; | ||||||
| # probably should make a `mypy` issue? | # probably should make a `mypy` issue? | ||||||
|  | @ -245,10 +175,9 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|             or |             or | ||||||
|             _codec._ctxvar_MsgCodec.get() |             _codec._ctxvar_MsgCodec.get() | ||||||
|         ) |         ) | ||||||
|         # TODO: mask out before release? |         log.critical( | ||||||
|         log.runtime( |             '!?!: USING STD `tractor` CODEC !?!?\n' | ||||||
|             f'New {self} created with codec\n' |             f'{self._codec}\n' | ||||||
|             f'codec: {self._codec}\n' |  | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     async def _iter_packets(self) -> AsyncGenerator[dict, None]: |     async def _iter_packets(self) -> AsyncGenerator[dict, None]: | ||||||
|  | @ -292,18 +221,16 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|                 # NOTE: lookup the `trio.Task.context`'s var for |                 # NOTE: lookup the `trio.Task.context`'s var for | ||||||
|                 # the current `MsgCodec`. |                 # the current `MsgCodec`. | ||||||
|                 codec: MsgCodec = _ctxvar_MsgCodec.get() |                 codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||||
| 
 |  | ||||||
|                 # TODO: mask out before release? |  | ||||||
|                 if self._codec.pld_spec != codec.pld_spec: |                 if self._codec.pld_spec != codec.pld_spec: | ||||||
|                     # assert ( |                     # assert ( | ||||||
|                     #     task := trio.lowlevel.current_task() |                     #     task := trio.lowlevel.current_task() | ||||||
|                     # ) is not self._task |                     # ) is not self._task | ||||||
|                     # self._task = task |                     # self._task = task | ||||||
|                     self._codec = codec |                     self._codec = codec | ||||||
|                     log.runtime( |                     log.critical( | ||||||
|                         'Using new codec in {self}.recv()\n' |                         '.recv() USING NEW CODEC !?!?\n' | ||||||
|                         f'codec: {self._codec}\n\n' |                         f'{self._codec}\n\n' | ||||||
|                         f'msg_bytes: {msg_bytes}\n' |                         f'msg_bytes -> {msg_bytes}\n' | ||||||
|                     ) |                     ) | ||||||
|                 yield codec.decode(msg_bytes) |                 yield codec.decode(msg_bytes) | ||||||
| 
 | 
 | ||||||
|  | @ -325,13 +252,36 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|             # and always raise such that spec violations |             # and always raise such that spec violations | ||||||
|             # are never allowed to be caught silently! |             # are never allowed to be caught silently! | ||||||
|             except msgspec.ValidationError as verr: |             except msgspec.ValidationError as verr: | ||||||
|                 # re-raise as type error | 
 | ||||||
|                 _raise_msg_type_err( |                 # decode the msg-bytes using the std msgpack | ||||||
|                     msg=msg_bytes, |                 # interchange-prot (i.e. without any | ||||||
|                     codec=codec, |                 # `msgspec.Struct` handling) so that we can | ||||||
|                     validation_err=verr, |                 # determine what `.msg.types.Msg` is the culprit | ||||||
|  |                 # by reporting the received value. | ||||||
|  |                 msg_dict: dict = msgspec.msgpack.decode(msg_bytes) | ||||||
|  |                 msg_type_name: str = msg_dict['msg_type'] | ||||||
|  |                 msg_type = getattr(types, msg_type_name) | ||||||
|  |                 errmsg: str = ( | ||||||
|  |                     f'Received invalid IPC `{msg_type_name}` msg\n\n' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|  |                 # XXX see if we can determine the exact invalid field | ||||||
|  |                 # such that we can comprehensively report the | ||||||
|  |                 # specific field's type problem | ||||||
|  |                 msgspec_msg: str = verr.args[0].rstrip('`') | ||||||
|  |                 msg, _, maybe_field = msgspec_msg.rpartition('$.') | ||||||
|  |                 if field_val := msg_dict.get(maybe_field): | ||||||
|  |                     field_type: Union[Type] = msg_type.__signature__.parameters[ | ||||||
|  |                         maybe_field | ||||||
|  |                     ].annotation | ||||||
|  |                     errmsg += ( | ||||||
|  |                         f'{msg.rstrip("`")}\n\n' | ||||||
|  |                         f'{msg_type}\n' | ||||||
|  |                         f' |_.{maybe_field}: {field_type} = {field_val}\n' | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                 raise MsgTypeError(errmsg) from verr | ||||||
|  | 
 | ||||||
|             except ( |             except ( | ||||||
|                 msgspec.DecodeError, |                 msgspec.DecodeError, | ||||||
|                 UnicodeDecodeError, |                 UnicodeDecodeError, | ||||||
|  | @ -357,16 +307,12 @@ class MsgpackTCPStream(MsgTransport): | ||||||
| 
 | 
 | ||||||
|     async def send( |     async def send( | ||||||
|         self, |         self, | ||||||
|         msg: msgtypes.Msg, |         msg: Any, | ||||||
| 
 | 
 | ||||||
|         strict_types: bool = True, |  | ||||||
|         # hide_tb: bool = False, |         # hide_tb: bool = False, | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|         Send a msgpack encoded py-object-blob-as-msg over TCP. |         Send a msgpack coded blob-as-msg over TCP. | ||||||
| 
 |  | ||||||
|         If `strict_types == True` then a `MsgTypeError` will be raised on any |  | ||||||
|         invalid msg type |  | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         # __tracebackhide__: bool = hide_tb |         # __tracebackhide__: bool = hide_tb | ||||||
|  | @ -375,40 +321,25 @@ class MsgpackTCPStream(MsgTransport): | ||||||
|             # NOTE: lookup the `trio.Task.context`'s var for |             # NOTE: lookup the `trio.Task.context`'s var for | ||||||
|             # the current `MsgCodec`. |             # the current `MsgCodec`. | ||||||
|             codec: MsgCodec = _ctxvar_MsgCodec.get() |             codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||||
| 
 |             # if self._codec != codec: | ||||||
|             # TODO: mask out before release? |  | ||||||
|             if self._codec.pld_spec != codec.pld_spec: |             if self._codec.pld_spec != codec.pld_spec: | ||||||
|                 self._codec = codec |                 self._codec = codec | ||||||
|                 log.runtime( |                 log.critical( | ||||||
|                     'Using new codec in {self}.send()\n' |                     '.send() using NEW CODEC !?!?\n' | ||||||
|                     f'codec: {self._codec}\n\n' |                     f'{self._codec}\n\n' | ||||||
|                     f'msg: {msg}\n' |                     f'OBJ -> {msg}\n' | ||||||
|                 ) |                 ) | ||||||
| 
 |             if type(msg) not in types.__spec__: | ||||||
|             if type(msg) not in msgtypes.__msg_types__: |                 log.warning( | ||||||
|                 if strict_types: |                     'Sending non-`Msg`-spec msg?\n\n' | ||||||
|                     _raise_msg_type_err( |                     f'{msg}\n' | ||||||
|                         msg, |                 ) | ||||||
|                         codec=codec, |             bytes_data: bytes = codec.encode(msg) | ||||||
|                     ) |  | ||||||
|                 else: |  | ||||||
|                     log.warning( |  | ||||||
|                         'Sending non-`Msg`-spec msg?\n\n' |  | ||||||
|                         f'{msg}\n' |  | ||||||
|                     ) |  | ||||||
| 
 |  | ||||||
|             try: |  | ||||||
|                 bytes_data: bytes = codec.encode(msg) |  | ||||||
|             except TypeError as typerr: |  | ||||||
|                 raise MsgTypeError( |  | ||||||
|                     'A msg field violates the current spec\n' |  | ||||||
|                     f'{codec.pld_spec}\n\n' |  | ||||||
|                     f'{pretty_struct.Struct.pformat(msg)}' |  | ||||||
|                 ) from typerr |  | ||||||
| 
 | 
 | ||||||
|             # supposedly the fastest says, |             # supposedly the fastest says, | ||||||
|             # https://stackoverflow.com/a/54027962 |             # https://stackoverflow.com/a/54027962 | ||||||
|             size: bytes = struct.pack("<I", len(bytes_data)) |             size: bytes = struct.pack("<I", len(bytes_data)) | ||||||
|  | 
 | ||||||
|             return await self.stream.send_all(size + bytes_data) |             return await self.stream.send_all(size + bytes_data) | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|  | @ -636,6 +567,7 @@ class Channel: | ||||||
|             f'{pformat(payload)}\n' |             f'{pformat(payload)}\n' | ||||||
|         )  # type: ignore |         )  # type: ignore | ||||||
|         assert self._transport |         assert self._transport | ||||||
|  | 
 | ||||||
|         await self._transport.send( |         await self._transport.send( | ||||||
|             payload, |             payload, | ||||||
|             # hide_tb=hide_tb, |             # hide_tb=hide_tb, | ||||||
|  | @ -645,11 +577,6 @@ class Channel: | ||||||
|         assert self._transport |         assert self._transport | ||||||
|         return await self._transport.recv() |         return await self._transport.recv() | ||||||
| 
 | 
 | ||||||
|         # TODO: auto-reconnect features like 0mq/nanomsg? |  | ||||||
|         # -[ ] implement it manually with nods to SC prot |  | ||||||
|         #      possibly on multiple transport backends? |  | ||||||
|         #  -> seems like that might be re-inventing scalability |  | ||||||
|         #     prots tho no? |  | ||||||
|         # try: |         # try: | ||||||
|         #     return await self._transport.recv() |         #     return await self._transport.recv() | ||||||
|         # except trio.BrokenResourceError: |         # except trio.BrokenResourceError: | ||||||
|  |  | ||||||
|  | @ -502,7 +502,7 @@ async def open_portal( | ||||||
|     ''' |     ''' | ||||||
|     actor = current_actor() |     actor = current_actor() | ||||||
|     assert actor |     assert actor | ||||||
|     was_connected: bool = False |     was_connected = False | ||||||
| 
 | 
 | ||||||
|     async with maybe_open_nursery(nursery, shield=shield) as nursery: |     async with maybe_open_nursery(nursery, shield=shield) as nursery: | ||||||
| 
 | 
 | ||||||
|  | @ -533,7 +533,9 @@ async def open_portal( | ||||||
|             await portal.aclose() |             await portal.aclose() | ||||||
| 
 | 
 | ||||||
|             if was_connected: |             if was_connected: | ||||||
|                 await channel.aclose() |                 # gracefully signal remote channel-msg loop | ||||||
|  |                 await channel.send(None) | ||||||
|  |                 # await channel.aclose() | ||||||
| 
 | 
 | ||||||
|             # cancel background msg loop task |             # cancel background msg loop task | ||||||
|             if msg_loop_cs: |             if msg_loop_cs: | ||||||
|  |  | ||||||
							
								
								
									
										372
									
								
								tractor/_rpc.py
								
								
								
								
							
							
						
						
									
										372
									
								
								tractor/_rpc.py
								
								
								
								
							|  | @ -55,21 +55,20 @@ from ._exceptions import ( | ||||||
|     TransportClosed, |     TransportClosed, | ||||||
| ) | ) | ||||||
| from .devx import ( | from .devx import ( | ||||||
|  |     pause, | ||||||
|     maybe_wait_for_debugger, |     maybe_wait_for_debugger, | ||||||
|     _debug, |     _debug, | ||||||
| ) | ) | ||||||
| from . import _state | from . import _state | ||||||
| from .log import get_logger | from .log import get_logger | ||||||
| from tractor.msg.types import ( | from tractor.msg.types import ( | ||||||
|     CancelAck, |  | ||||||
|     Error, |  | ||||||
|     Msg, |  | ||||||
|     Return, |  | ||||||
|     Start, |     Start, | ||||||
|     StartAck, |     StartAck, | ||||||
|     Started, |     Started, | ||||||
|     Stop, |     Stop, | ||||||
|     Yield, |     Yield, | ||||||
|  |     Return, | ||||||
|  |     Error, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -91,7 +90,6 @@ async def _invoke_non_context( | ||||||
| 
 | 
 | ||||||
|     treat_as_gen: bool, |     treat_as_gen: bool, | ||||||
|     is_rpc: bool, |     is_rpc: bool, | ||||||
|     return_msg: Return|CancelAck = Return, |  | ||||||
| 
 | 
 | ||||||
|     task_status: TaskStatus[ |     task_status: TaskStatus[ | ||||||
|         Context | BaseException |         Context | BaseException | ||||||
|  | @ -100,6 +98,7 @@ async def _invoke_non_context( | ||||||
| 
 | 
 | ||||||
|     # TODO: can we unify this with the `context=True` impl below? |     # TODO: can we unify this with the `context=True` impl below? | ||||||
|     if inspect.isasyncgen(coro): |     if inspect.isasyncgen(coro): | ||||||
|  |         # await chan.send({ | ||||||
|         await chan.send( |         await chan.send( | ||||||
|             StartAck( |             StartAck( | ||||||
|                 cid=cid, |                 cid=cid, | ||||||
|  | @ -125,6 +124,11 @@ async def _invoke_non_context( | ||||||
|                     # to_send = await chan.recv_nowait() |                     # to_send = await chan.recv_nowait() | ||||||
|                     # if to_send is not None: |                     # if to_send is not None: | ||||||
|                     #     to_yield = await coro.asend(to_send) |                     #     to_yield = await coro.asend(to_send) | ||||||
|  |                     # await chan.send({ | ||||||
|  |                     #     # Yield() | ||||||
|  |                     #     'cid': cid, | ||||||
|  |                     #     'yield': item, | ||||||
|  |                     # }) | ||||||
|                     await chan.send( |                     await chan.send( | ||||||
|                         Yield( |                         Yield( | ||||||
|                             cid=cid, |                             cid=cid, | ||||||
|  | @ -139,6 +143,11 @@ async def _invoke_non_context( | ||||||
|         await chan.send( |         await chan.send( | ||||||
|             Stop(cid=cid) |             Stop(cid=cid) | ||||||
|         ) |         ) | ||||||
|  |         # await chan.send({ | ||||||
|  |         #     # Stop( | ||||||
|  |         #     'cid': cid, | ||||||
|  |         #     'stop': True, | ||||||
|  |         # }) | ||||||
| 
 | 
 | ||||||
|     # one way @stream func that gets treated like an async gen |     # one way @stream func that gets treated like an async gen | ||||||
|     # TODO: can we unify this with the `context=True` impl below? |     # TODO: can we unify this with the `context=True` impl below? | ||||||
|  | @ -149,6 +158,11 @@ async def _invoke_non_context( | ||||||
|                 functype='asyncgen', |                 functype='asyncgen', | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
|  |         # await chan.send({ | ||||||
|  |         #     # StartAck() | ||||||
|  |         #     'cid': cid, | ||||||
|  |         #     'functype': 'asyncgen', | ||||||
|  |         # }) | ||||||
|         # XXX: the async-func may spawn further tasks which push |         # XXX: the async-func may spawn further tasks which push | ||||||
|         # back values like an async-generator would but must |         # back values like an async-generator would but must | ||||||
|         # manualy construct the response dict-packet-responses as |         # manualy construct the response dict-packet-responses as | ||||||
|  | @ -164,6 +178,11 @@ async def _invoke_non_context( | ||||||
|             await chan.send( |             await chan.send( | ||||||
|                 Stop(cid=cid) |                 Stop(cid=cid) | ||||||
|             ) |             ) | ||||||
|  |             # await chan.send({ | ||||||
|  |             #     # Stop( | ||||||
|  |             #     'cid': cid, | ||||||
|  |             #     'stop': True, | ||||||
|  |             # }) | ||||||
|     else: |     else: | ||||||
|         # regular async function/method |         # regular async function/method | ||||||
|         # XXX: possibly just a scheduled `Actor._cancel_task()` |         # XXX: possibly just a scheduled `Actor._cancel_task()` | ||||||
|  | @ -181,6 +200,11 @@ async def _invoke_non_context( | ||||||
|                     functype='asyncfunc', |                     functype='asyncfunc', | ||||||
|                 ) |                 ) | ||||||
|             ) |             ) | ||||||
|  |             # await chan.send({ | ||||||
|  |             #     # StartAck() | ||||||
|  |             #     'cid': cid, | ||||||
|  |             #     'functype': 'asyncfunc', | ||||||
|  |             # }) | ||||||
|         except ( |         except ( | ||||||
|             trio.ClosedResourceError, |             trio.ClosedResourceError, | ||||||
|             trio.BrokenResourceError, |             trio.BrokenResourceError, | ||||||
|  | @ -214,8 +238,13 @@ async def _invoke_non_context( | ||||||
|                 and chan.connected() |                 and chan.connected() | ||||||
|             ): |             ): | ||||||
|                 try: |                 try: | ||||||
|  |                     # await chan.send({ | ||||||
|  |                     #     # Return() | ||||||
|  |                     #     'cid': cid, | ||||||
|  |                     #     'return': result, | ||||||
|  |                     # }) | ||||||
|                     await chan.send( |                     await chan.send( | ||||||
|                         return_msg( |                         Return( | ||||||
|                             cid=cid, |                             cid=cid, | ||||||
|                             pld=result, |                             pld=result, | ||||||
|                         ) |                         ) | ||||||
|  | @ -380,7 +409,6 @@ async def _invoke( | ||||||
| 
 | 
 | ||||||
|     is_rpc: bool = True, |     is_rpc: bool = True, | ||||||
|     hide_tb: bool = True, |     hide_tb: bool = True, | ||||||
|     return_msg: Return|CancelAck = Return, |  | ||||||
| 
 | 
 | ||||||
|     task_status: TaskStatus[ |     task_status: TaskStatus[ | ||||||
|         Context | BaseException |         Context | BaseException | ||||||
|  | @ -401,6 +429,8 @@ async def _invoke( | ||||||
|         # XXX for .pause_from_sync()` usage we need to make sure |         # XXX for .pause_from_sync()` usage we need to make sure | ||||||
|         # `greenback` is boostrapped in the subactor! |         # `greenback` is boostrapped in the subactor! | ||||||
|         await _debug.maybe_init_greenback() |         await _debug.maybe_init_greenback() | ||||||
|  |     # else: | ||||||
|  |     #     await pause() | ||||||
| 
 | 
 | ||||||
|     # TODO: possibly a specially formatted traceback |     # TODO: possibly a specially formatted traceback | ||||||
|     # (not sure what typing is for this..)? |     # (not sure what typing is for this..)? | ||||||
|  | @ -490,7 +520,6 @@ async def _invoke( | ||||||
|                 kwargs, |                 kwargs, | ||||||
|                 treat_as_gen, |                 treat_as_gen, | ||||||
|                 is_rpc, |                 is_rpc, | ||||||
|                 return_msg, |  | ||||||
|                 task_status, |                 task_status, | ||||||
|             ) |             ) | ||||||
|             # below is only for `@context` funcs |             # below is only for `@context` funcs | ||||||
|  | @ -521,6 +550,11 @@ async def _invoke( | ||||||
|                 functype='context', |                 functype='context', | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
|  |         # await chan.send({ | ||||||
|  |         #     # StartAck() | ||||||
|  |         #     'cid': cid, | ||||||
|  |         #     'functype': 'context', | ||||||
|  |         # }) | ||||||
| 
 | 
 | ||||||
|         # TODO: should we also use an `.open_context()` equiv |         # TODO: should we also use an `.open_context()` equiv | ||||||
|         # for this callee side by factoring the impl from |         # for this callee side by factoring the impl from | ||||||
|  | @ -545,11 +579,16 @@ async def _invoke( | ||||||
| 
 | 
 | ||||||
|                 # deliver final result to caller side. |                 # deliver final result to caller side. | ||||||
|                 await chan.send( |                 await chan.send( | ||||||
|                     return_msg( |                     Return( | ||||||
|                         cid=cid, |                         cid=cid, | ||||||
|                         pld=res, |                         pld=res, | ||||||
|                     ) |                     ) | ||||||
|                 ) |                 ) | ||||||
|  |                 # await chan.send({ | ||||||
|  |                 #     # Return() | ||||||
|  |                 #     'cid': cid, | ||||||
|  |                 #     'return': res, | ||||||
|  |                 # }) | ||||||
| 
 | 
 | ||||||
|             # NOTE: this happens IFF `ctx._scope.cancel()` is |             # NOTE: this happens IFF `ctx._scope.cancel()` is | ||||||
|             # called by any of, |             # called by any of, | ||||||
|  | @ -638,6 +677,7 @@ async def _invoke( | ||||||
|                     ctxc = ContextCancelled( |                     ctxc = ContextCancelled( | ||||||
|                         msg, |                         msg, | ||||||
|                         boxed_type=trio.Cancelled, |                         boxed_type=trio.Cancelled, | ||||||
|  |                         # boxed_type_str='Cancelled', | ||||||
|                         canceller=canceller, |                         canceller=canceller, | ||||||
|                     ) |                     ) | ||||||
|                     # assign local error so that the `.outcome` |                     # assign local error so that the `.outcome` | ||||||
|  | @ -738,12 +778,12 @@ async def try_ship_error_to_remote( | ||||||
|             trio.BrokenResourceError, |             trio.BrokenResourceError, | ||||||
|             BrokenPipeError, |             BrokenPipeError, | ||||||
|         ): |         ): | ||||||
|  |             # err_msg: dict = msg['error']['tb_str'] | ||||||
|             log.critical( |             log.critical( | ||||||
|                 'IPC transport failure -> ' |                 'IPC transport failure -> ' | ||||||
|                 f'failed to ship error to {remote_descr}!\n\n' |                 f'failed to ship error to {remote_descr}!\n\n' | ||||||
|                 f'X=> {channel.uid}\n\n' |                 f'X=> {channel.uid}\n\n' | ||||||
| 
 |                 # f'{err_msg}\n' | ||||||
|                 # TODO: use `.msg.preetty_struct` for this! |  | ||||||
|                 f'{msg}\n' |                 f'{msg}\n' | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  | @ -785,8 +825,6 @@ async def process_messages( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     assert actor._service_n  # state sanity |  | ||||||
| 
 |  | ||||||
|     # TODO: once `trio` get's an "obvious way" for req/resp we |     # TODO: once `trio` get's an "obvious way" for req/resp we | ||||||
|     # should use it? |     # should use it? | ||||||
|     # https://github.com/python-trio/trio/issues/467 |     # https://github.com/python-trio/trio/issues/467 | ||||||
|  | @ -796,7 +834,7 @@ async def process_messages( | ||||||
|         f'|_{chan}\n' |         f'|_{chan}\n' | ||||||
|     ) |     ) | ||||||
|     nursery_cancelled_before_task: bool = False |     nursery_cancelled_before_task: bool = False | ||||||
|     msg: Msg|None = None |     msg: dict | None = None | ||||||
|     try: |     try: | ||||||
|         # NOTE: this internal scope allows for keeping this |         # NOTE: this internal scope allows for keeping this | ||||||
|         # message loop running despite the current task having |         # message loop running despite the current task having | ||||||
|  | @ -805,148 +843,215 @@ async def process_messages( | ||||||
|         # using ``scope = Nursery.start()`` |         # using ``scope = Nursery.start()`` | ||||||
|         with CancelScope(shield=shield) as loop_cs: |         with CancelScope(shield=shield) as loop_cs: | ||||||
|             task_status.started(loop_cs) |             task_status.started(loop_cs) | ||||||
| 
 |  | ||||||
|             async for msg in chan: |             async for msg in chan: | ||||||
|                 log.transport(   # type: ignore |                 log.transport(   # type: ignore | ||||||
|                     f'<= IPC msg from peer: {chan.uid}\n\n' |                     f'<= IPC msg from peer: {chan.uid}\n\n' | ||||||
| 
 | 
 | ||||||
|                     # TODO: avoid fmting depending on loglevel for perf? |                     # TODO: conditionally avoid fmting depending | ||||||
|                     # -[ ] specifically `pformat()` sub-call..? |                     # on log level (for perf)? | ||||||
|                     # -[ ] use `.msg.pretty_struct` here now instead! |                     # => specifically `pformat()` sub-call..? | ||||||
|                     f'{pformat(msg)}\n' |                     f'{pformat(msg)}\n' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|                 match msg: |                 match msg: | ||||||
|                     # msg for an ongoing IPC ctx session, deliver msg to | 
 | ||||||
|                     # local task. |                 # if msg is None: | ||||||
|  |                 # dedicated loop terminate sentinel | ||||||
|  |                     case None: | ||||||
|  | 
 | ||||||
|  |                         tasks: dict[ | ||||||
|  |                             tuple[Channel, str], | ||||||
|  |                             tuple[Context, Callable, trio.Event] | ||||||
|  |                         ] = actor._rpc_tasks.copy() | ||||||
|  |                         log.cancel( | ||||||
|  |                             f'Peer IPC channel terminated via `None` setinel msg?\n' | ||||||
|  |                             f'=> Cancelling all {len(tasks)} local RPC tasks..\n' | ||||||
|  |                             f'peer: {chan.uid}\n' | ||||||
|  |                             f'|_{chan}\n' | ||||||
|  |                         ) | ||||||
|  |                         for (channel, cid) in tasks: | ||||||
|  |                             if channel is chan: | ||||||
|  |                                 await actor._cancel_task( | ||||||
|  |                                     cid, | ||||||
|  |                                     channel, | ||||||
|  |                                     requesting_uid=channel.uid, | ||||||
|  | 
 | ||||||
|  |                                     ipc_msg=msg, | ||||||
|  |                                 ) | ||||||
|  |                         break | ||||||
|  | 
 | ||||||
|  |                 # cid = msg.get('cid') | ||||||
|  |                 # if cid: | ||||||
|                     case ( |                     case ( | ||||||
|                         StartAck(cid=cid) |                         StartAck(cid=cid) | ||||||
|                         | Started(cid=cid) |                         | Started(cid=cid) | ||||||
|                         | Yield(cid=cid) |                         | Yield(cid=cid) | ||||||
|                         | Stop(cid=cid) |                         | Stop(cid=cid) | ||||||
|                         | Return(cid=cid) |                         | Return(cid=cid) | ||||||
|                         | CancelAck(cid=cid) |                         | Error(cid=cid) | ||||||
|                         | Error(cid=cid)  # RPC-task ctx specific |  | ||||||
|                     ): |                     ): | ||||||
|                         # deliver response to local caller/waiter |                         # deliver response to local caller/waiter | ||||||
|                         # via its per-remote-context memory channel. |                         # via its per-remote-context memory channel. | ||||||
|                         await actor._deliver_ctx_payload( |                         await actor._push_result( | ||||||
|                             chan, |                             chan, | ||||||
|                             cid, |                             cid, | ||||||
|                             msg, |                             msg, | ||||||
|                         ) |                         ) | ||||||
| 
 | 
 | ||||||
|                     # `Actor`(-internal) runtime cancel requests |  | ||||||
|                     case Start( |  | ||||||
|                         ns='self', |  | ||||||
|                         func='cancel', |  | ||||||
|                         cid=cid, |  | ||||||
|                         kwargs=kwargs, |  | ||||||
|                     ): |  | ||||||
|                         kwargs |= {'req_chan': chan} |  | ||||||
| 
 |  | ||||||
|                         # XXX NOTE XXX don't start entire actor |  | ||||||
|                         # runtime cancellation if this actor is |  | ||||||
|                         # currently in debug mode! |  | ||||||
|                         pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete |  | ||||||
|                         if pdb_complete: |  | ||||||
|                             await pdb_complete.wait() |  | ||||||
| 
 |  | ||||||
|                         # Either of  `Actor.cancel()`/`.cancel_soon()` |  | ||||||
|                         # was called, so terminate this IPC msg |  | ||||||
|                         # loop, exit back out into `async_main()`, |  | ||||||
|                         # and immediately start the core runtime |  | ||||||
|                         # machinery shutdown! |  | ||||||
|                         with CancelScope(shield=True): |  | ||||||
|                             await _invoke( |  | ||||||
|                                 actor, |  | ||||||
|                                 cid, |  | ||||||
|                                 chan, |  | ||||||
|                                 actor.cancel, |  | ||||||
|                                 kwargs, |  | ||||||
|                                 is_rpc=False, |  | ||||||
|                                 return_msg=CancelAck, |  | ||||||
|                             ) |  | ||||||
| 
 |  | ||||||
|                         log.runtime( |                         log.runtime( | ||||||
|                             'Cancelling IPC transport msg-loop with peer:\n' |                             'Waiting on next IPC msg from\n' | ||||||
|  |                             f'peer: {chan.uid}:\n' | ||||||
|                             f'|_{chan}\n' |                             f'|_{chan}\n' | ||||||
|  | 
 | ||||||
|  |                             # f'last msg: {msg}\n' | ||||||
|                         ) |                         ) | ||||||
|                         loop_cs.cancel() |                         continue | ||||||
|                         break |  | ||||||
| 
 | 
 | ||||||
|                     case Start( |                     # process a 'cmd' request-msg upack | ||||||
|                         ns='self', |                     # TODO: impl with native `msgspec.Struct` support !! | ||||||
|                         func='_cancel_task', |                     # -[ ] implement with ``match:`` syntax? | ||||||
|                         cid=cid, |                     # -[ ] discard un-authed msgs as per, | ||||||
|                         kwargs=kwargs, |                     # <TODO put issue for typed msging structs> | ||||||
|                     ): |  | ||||||
|                         target_cid: str = kwargs['cid'] |  | ||||||
|                         kwargs |= { |  | ||||||
|                             'requesting_uid': chan.uid, |  | ||||||
|                             'ipc_msg': msg, |  | ||||||
| 
 |  | ||||||
|                             # XXX NOTE! ONLY the rpc-task-owning |  | ||||||
|                             # parent IPC channel should be able to |  | ||||||
|                             # cancel it! |  | ||||||
|                             'parent_chan': chan, |  | ||||||
|                         } |  | ||||||
|                         try: |  | ||||||
|                             await _invoke( |  | ||||||
|                                 actor, |  | ||||||
|                                 cid, |  | ||||||
|                                 chan, |  | ||||||
|                                 actor._cancel_task, |  | ||||||
|                                 kwargs, |  | ||||||
|                                 is_rpc=False, |  | ||||||
|                                 return_msg=CancelAck, |  | ||||||
|                             ) |  | ||||||
|                         except BaseException: |  | ||||||
|                             log.exception( |  | ||||||
|                                 'Failed to cancel task?\n' |  | ||||||
|                                 f'<= canceller: {chan.uid}\n' |  | ||||||
|                                 f'  |_{chan}\n\n' |  | ||||||
|                                 f'=> {actor}\n' |  | ||||||
|                                 f'  |_cid: {target_cid}\n' |  | ||||||
|                             ) |  | ||||||
| 
 |  | ||||||
|                     # the "MAIN" RPC endpoint to schedule-a-`trio.Task` |  | ||||||
|                     #                ------ - ------ |  | ||||||
|                     # -[x] discard un-authed msgs as per, |  | ||||||
|                     #    <TODO put issue for typed msging structs> |  | ||||||
|                     case Start( |                     case Start( | ||||||
|                         cid=cid, |                         cid=cid, | ||||||
|                         ns=ns, |                         ns=ns, | ||||||
|                         func=funcname, |                         func=funcname, | ||||||
|                         kwargs=kwargs,  # type-spec this? see `msg.types` |                         kwargs=kwargs, | ||||||
|                         uid=actorid, |                         uid=actorid, | ||||||
|                     ): |                     ): | ||||||
|  |                         # try: | ||||||
|  |                         #     ( | ||||||
|  |                         #         ns, | ||||||
|  |                         #         funcname, | ||||||
|  |                         #         kwargs, | ||||||
|  |                         #         actorid, | ||||||
|  |                         #         cid, | ||||||
|  |                         #     ) = msg['cmd'] | ||||||
|  | 
 | ||||||
|  |                         # # TODO: put in `case Error():` right? | ||||||
|  |                         # except KeyError: | ||||||
|  |                         #     # This is the non-rpc error case, that is, an | ||||||
|  |                         #     # error **not** raised inside a call to ``_invoke()`` | ||||||
|  |                         #     # (i.e. no cid was provided in the msg - see above). | ||||||
|  |                         #     # Push this error to all local channel consumers | ||||||
|  |                         #     # (normally portals) by marking the channel as errored | ||||||
|  |                         #     assert chan.uid | ||||||
|  |                         #     exc = unpack_error(msg, chan=chan) | ||||||
|  |                         #     chan._exc = exc | ||||||
|  |                         #     raise exc | ||||||
|  | 
 | ||||||
|                         log.runtime( |                         log.runtime( | ||||||
|                             'Handling RPC `Start` request from\n' |                             'Handling RPC `Start` request from\n' | ||||||
|                             f'peer: {actorid}\n' |                             f'peer: {actorid}\n' | ||||||
|                             '\n' |                             '\n' | ||||||
|                             f'=> {ns}.{funcname}({kwargs})\n' |                             f'=> {ns}.{funcname}({kwargs})\n' | ||||||
|                         ) |                         ) | ||||||
| 
 |                         # case Start( | ||||||
|                         # runtime-internal endpoint: `Actor.<funcname>` |                         #     ns='self', | ||||||
|                         # only registry methods exist now yah, |                         #     funcname='cancel', | ||||||
|                         # like ``.register_actor()`` etc. ? |                         # ): | ||||||
|                         if ns == 'self': |                         if ns == 'self': | ||||||
|                             func: Callable = getattr(actor, funcname) |                             if funcname == 'cancel': | ||||||
|  |                                 func: Callable = actor.cancel | ||||||
|  |                                 kwargs |= { | ||||||
|  |                                     'req_chan': chan, | ||||||
|  |                                 } | ||||||
| 
 | 
 | ||||||
|                         # application RPC endpoint |                                 # don't start entire actor runtime cancellation | ||||||
|                         else: |                                 # if this actor is currently in debug mode! | ||||||
|                             try: |                                 pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete | ||||||
|                                 func: Callable = actor._get_rpc_func( |                                 if pdb_complete: | ||||||
|                                     ns, |                                     await pdb_complete.wait() | ||||||
|                                     funcname, | 
 | ||||||
|  |                                 # Either of  `Actor.cancel()`/`.cancel_soon()` | ||||||
|  |                                 # was called, so terminate this IPC msg | ||||||
|  |                                 # loop, exit back out into `async_main()`, | ||||||
|  |                                 # and immediately start the core runtime | ||||||
|  |                                 # machinery shutdown! | ||||||
|  |                                 with CancelScope(shield=True): | ||||||
|  |                                     await _invoke( | ||||||
|  |                                         actor, | ||||||
|  |                                         cid, | ||||||
|  |                                         chan, | ||||||
|  |                                         func, | ||||||
|  |                                         kwargs, | ||||||
|  |                                         is_rpc=False, | ||||||
|  |                                     ) | ||||||
|  | 
 | ||||||
|  |                                 log.runtime( | ||||||
|  |                                     'Cancelling IPC transport msg-loop with peer:\n' | ||||||
|  |                                     f'|_{chan}\n' | ||||||
|                                 ) |                                 ) | ||||||
|  |                                 loop_cs.cancel() | ||||||
|  |                                 break | ||||||
|  | 
 | ||||||
|  |                         # case Start( | ||||||
|  |                         #     ns='self', | ||||||
|  |                         #     funcname='_cancel_task', | ||||||
|  |                         # ): | ||||||
|  |                             if funcname == '_cancel_task': | ||||||
|  |                                 func: Callable = actor._cancel_task | ||||||
|  | 
 | ||||||
|  |                                 # we immediately start the runtime machinery | ||||||
|  |                                 # shutdown | ||||||
|  |                                 # with CancelScope(shield=True): | ||||||
|  |                                 target_cid: str = kwargs['cid'] | ||||||
|  |                                 kwargs |= { | ||||||
|  |                                     # NOTE: ONLY the rpc-task-owning | ||||||
|  |                                     # parent IPC channel should be able to | ||||||
|  |                                     # cancel it! | ||||||
|  |                                     'parent_chan': chan, | ||||||
|  |                                     'requesting_uid': chan.uid, | ||||||
|  |                                     'ipc_msg': msg, | ||||||
|  |                                 } | ||||||
|  |                                 # TODO: remove? already have emit in meth. | ||||||
|  |                                 # log.runtime( | ||||||
|  |                                 #     f'Rx RPC task cancel request\n' | ||||||
|  |                                 #     f'<= canceller: {chan.uid}\n' | ||||||
|  |                                 #     f'  |_{chan}\n\n' | ||||||
|  |                                 #     f'=> {actor}\n' | ||||||
|  |                                 #     f'  |_cid: {target_cid}\n' | ||||||
|  |                                 # ) | ||||||
|  |                                 try: | ||||||
|  |                                     await _invoke( | ||||||
|  |                                         actor, | ||||||
|  |                                         cid, | ||||||
|  |                                         chan, | ||||||
|  |                                         func, | ||||||
|  |                                         kwargs, | ||||||
|  |                                         is_rpc=False, | ||||||
|  |                                     ) | ||||||
|  |                                 except BaseException: | ||||||
|  |                                     log.exception( | ||||||
|  |                                         'Failed to cancel task?\n' | ||||||
|  |                                         f'<= canceller: {chan.uid}\n' | ||||||
|  |                                         f'  |_{chan}\n\n' | ||||||
|  |                                         f'=> {actor}\n' | ||||||
|  |                                         f'  |_cid: {target_cid}\n' | ||||||
|  |                                     ) | ||||||
|  |                                 continue | ||||||
|  | 
 | ||||||
|  |                             # case Start( | ||||||
|  |                             #     ns='self', | ||||||
|  |                             #     funcname='register_actor', | ||||||
|  |                             # ): | ||||||
|  |                             else: | ||||||
|  |                                 # normally registry methods, eg. | ||||||
|  |                                 # ``.register_actor()`` etc. | ||||||
|  |                                 func: Callable = getattr(actor, funcname) | ||||||
|  | 
 | ||||||
|  |                         # case Start( | ||||||
|  |                         #     ns=str(), | ||||||
|  |                         #     funcname=funcname, | ||||||
|  |                         # ): | ||||||
|  |                         else: | ||||||
|  |                             # complain to client about restricted modules | ||||||
|  |                             try: | ||||||
|  |                                 func = actor._get_rpc_func(ns, funcname) | ||||||
|                             except ( |                             except ( | ||||||
|                                 ModuleNotExposed, |                                 ModuleNotExposed, | ||||||
|                                 AttributeError, |                                 AttributeError, | ||||||
|                             ) as err: |                             ) as err: | ||||||
|                                 # always complain to requester |  | ||||||
|                                 # client about un-enabled modules |  | ||||||
|                                 err_msg: dict[str, dict] = pack_error( |                                 err_msg: dict[str, dict] = pack_error( | ||||||
|                                     err, |                                     err, | ||||||
|                                     cid=cid, |                                     cid=cid, | ||||||
|  | @ -956,7 +1061,6 @@ async def process_messages( | ||||||
| 
 | 
 | ||||||
|                         # schedule a task for the requested RPC function |                         # schedule a task for the requested RPC function | ||||||
|                         # in the actor's main "service nursery". |                         # in the actor's main "service nursery". | ||||||
|                         # |  | ||||||
|                         # TODO: possibly a service-tn per IPC channel for |                         # TODO: possibly a service-tn per IPC channel for | ||||||
|                         # supervision isolation? would avoid having to |                         # supervision isolation? would avoid having to | ||||||
|                         # manage RPC tasks individually in `._rpc_tasks` |                         # manage RPC tasks individually in `._rpc_tasks` | ||||||
|  | @ -965,7 +1069,7 @@ async def process_messages( | ||||||
|                             f'Spawning task for RPC request\n' |                             f'Spawning task for RPC request\n' | ||||||
|                             f'<= caller: {chan.uid}\n' |                             f'<= caller: {chan.uid}\n' | ||||||
|                             f'  |_{chan}\n\n' |                             f'  |_{chan}\n\n' | ||||||
|                             # ^-TODO-^ maddr style repr? |                             # TODO: maddr style repr? | ||||||
|                             # f'  |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' |                             # f'  |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' | ||||||
|                             # f'cid="{cid[-16:]} .."\n\n' |                             # f'cid="{cid[-16:]} .."\n\n' | ||||||
| 
 | 
 | ||||||
|  | @ -973,6 +1077,7 @@ async def process_messages( | ||||||
|                             f'  |_cid: {cid}\n' |                             f'  |_cid: {cid}\n' | ||||||
|                             f'   |>> {func}()\n' |                             f'   |>> {func}()\n' | ||||||
|                         ) |                         ) | ||||||
|  |                         assert actor._service_n  # wait why? do it at top? | ||||||
|                         try: |                         try: | ||||||
|                             ctx: Context = await actor._service_n.start( |                             ctx: Context = await actor._service_n.start( | ||||||
|                                 partial( |                                 partial( | ||||||
|  | @ -1002,12 +1107,13 @@ async def process_messages( | ||||||
|                             log.warning( |                             log.warning( | ||||||
|                                 'Task for RPC failed?' |                                 'Task for RPC failed?' | ||||||
|                                 f'|_ {func}()\n\n' |                                 f'|_ {func}()\n\n' | ||||||
|  | 
 | ||||||
|                                 f'{err}' |                                 f'{err}' | ||||||
|                             ) |                             ) | ||||||
|                             continue |                             continue | ||||||
| 
 | 
 | ||||||
|                         else: |                         else: | ||||||
|                             # mark our global state with ongoing rpc tasks |                             # mark that we have ongoing rpc tasks | ||||||
|                             actor._ongoing_rpc_tasks = trio.Event() |                             actor._ongoing_rpc_tasks = trio.Event() | ||||||
| 
 | 
 | ||||||
|                             # store cancel scope such that the rpc task can be |                             # store cancel scope such that the rpc task can be | ||||||
|  | @ -1018,26 +1124,23 @@ async def process_messages( | ||||||
|                                 trio.Event(), |                                 trio.Event(), | ||||||
|                             ) |                             ) | ||||||
| 
 | 
 | ||||||
|                     # XXX remote (runtime scoped) error or uknown |                     case Error()|_: | ||||||
|                     # msg (type). |                         # This is the non-rpc error case, that is, an | ||||||
|                     case Error() | _: |                         # error **not** raised inside a call to ``_invoke()`` | ||||||
|                         # NOTE: this is the non-rpc error case, |                         # (i.e. no cid was provided in the msg - see above). | ||||||
|                         # that is, an error **not** raised inside |                         # Push this error to all local channel consumers | ||||||
|                         # a call to ``_invoke()`` (i.e. no cid was |                         # (normally portals) by marking the channel as errored | ||||||
|                         # provided in the msg - see above). Push |  | ||||||
|                         # this error to all local channel |  | ||||||
|                         # consumers (normally portals) by marking |  | ||||||
|                         # the channel as errored |  | ||||||
|                         log.exception( |                         log.exception( | ||||||
|                             f'Unhandled IPC msg:\n\n' |                             f'Unhandled IPC msg:\n\n' | ||||||
|                             f'{msg}\n' |                             f'{msg}\n' | ||||||
|                         ) |                         ) | ||||||
|                         # assert chan.uid |                         assert chan.uid | ||||||
|                         chan._exc: Exception = unpack_error( |                         exc = unpack_error( | ||||||
|                             msg, |                             msg, | ||||||
|                             chan=chan, |                             chan=chan, | ||||||
|                         ) |                         ) | ||||||
|                         raise chan._exc |                         chan._exc = exc | ||||||
|  |                         raise exc | ||||||
| 
 | 
 | ||||||
|                 log.runtime( |                 log.runtime( | ||||||
|                     'Waiting on next IPC msg from\n' |                     'Waiting on next IPC msg from\n' | ||||||
|  | @ -1045,12 +1148,10 @@ async def process_messages( | ||||||
|                     f'|_{chan}\n' |                     f'|_{chan}\n' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|             # END-OF `async for`: |             # end of async for, channel disconnect vis | ||||||
|             # IPC disconnected via `trio.EndOfChannel`, likely |             # ``trio.EndOfChannel`` | ||||||
|             # due to a (graceful) `Channel.aclose()`. |  | ||||||
|             log.runtime( |             log.runtime( | ||||||
|                 f'channel for {chan.uid} disconnected, cancelling RPC tasks\n' |                 f"{chan} for {chan.uid} disconnected, cancelling tasks" | ||||||
|                 f'|_{chan}\n' |  | ||||||
|             ) |             ) | ||||||
|             await actor.cancel_rpc_tasks( |             await actor.cancel_rpc_tasks( | ||||||
|                 req_uid=actor.uid, |                 req_uid=actor.uid, | ||||||
|  | @ -1067,10 +1168,9 @@ async def process_messages( | ||||||
|         # connection-reset) is ok since we don't have a teardown |         # connection-reset) is ok since we don't have a teardown | ||||||
|         # handshake for them (yet) and instead we simply bail out of |         # handshake for them (yet) and instead we simply bail out of | ||||||
|         # the message loop and expect the teardown sequence to clean |         # the message loop and expect the teardown sequence to clean | ||||||
|         # up.. |         # up. | ||||||
|         # TODO: add a teardown handshake? and, |         # TODO: don't show this msg if it's an emphemeral | ||||||
|         # -[ ] don't show this msg if it's an ephemeral discovery ep call? |         # discovery ep call? | ||||||
|         # -[ ] figure out how this will break with other transports? |  | ||||||
|         log.runtime( |         log.runtime( | ||||||
|             f'channel closed abruptly with\n' |             f'channel closed abruptly with\n' | ||||||
|             f'peer: {chan.uid}\n'  |             f'peer: {chan.uid}\n'  | ||||||
|  |  | ||||||
|  | @ -65,12 +65,7 @@ from trio import ( | ||||||
|     TaskStatus, |     TaskStatus, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| from tractor.msg import ( | from .msg import NamespacePath | ||||||
|     pretty_struct, |  | ||||||
|     NamespacePath, |  | ||||||
|     types as msgtypes, |  | ||||||
|     Msg, |  | ||||||
| ) |  | ||||||
| from ._ipc import Channel | from ._ipc import Channel | ||||||
| from ._context import ( | from ._context import ( | ||||||
|     mk_context, |     mk_context, | ||||||
|  | @ -78,10 +73,9 @@ from ._context import ( | ||||||
| ) | ) | ||||||
| from .log import get_logger | from .log import get_logger | ||||||
| from ._exceptions import ( | from ._exceptions import ( | ||||||
|     ContextCancelled, |  | ||||||
|     ModuleNotExposed, |  | ||||||
|     MsgTypeError, |  | ||||||
|     unpack_error, |     unpack_error, | ||||||
|  |     ModuleNotExposed, | ||||||
|  |     ContextCancelled, | ||||||
|     TransportClosed, |     TransportClosed, | ||||||
| ) | ) | ||||||
| from .devx import ( | from .devx import ( | ||||||
|  | @ -97,6 +91,10 @@ from ._rpc import ( | ||||||
|     process_messages, |     process_messages, | ||||||
|     try_ship_error_to_remote, |     try_ship_error_to_remote, | ||||||
| ) | ) | ||||||
|  | from tractor.msg import ( | ||||||
|  |     types as msgtypes, | ||||||
|  |     pretty_struct, | ||||||
|  | ) | ||||||
| # from tractor.msg.types import ( | # from tractor.msg.types import ( | ||||||
| #     Aid, | #     Aid, | ||||||
| #     SpawnSpec, | #     SpawnSpec, | ||||||
|  | @ -166,15 +164,18 @@ class Actor: | ||||||
|     # Information about `__main__` from parent |     # Information about `__main__` from parent | ||||||
|     _parent_main_data: dict[str, str] |     _parent_main_data: dict[str, str] | ||||||
|     _parent_chan_cs: CancelScope|None = None |     _parent_chan_cs: CancelScope|None = None | ||||||
|     _spawn_spec: msgtypes.SpawnSpec|None = None |     _spawn_spec: SpawnSpec|None = None | ||||||
| 
 | 
 | ||||||
|     # syncs for setup/teardown sequences |     # syncs for setup/teardown sequences | ||||||
|     _server_down: trio.Event|None = None |     _server_down: trio.Event|None = None | ||||||
| 
 | 
 | ||||||
|  |     # user toggled crash handling (including monkey-patched in | ||||||
|  |     # `trio.open_nursery()` via `.trionics._supervisor` B) | ||||||
|  |     _debug_mode: bool = False | ||||||
|  | 
 | ||||||
|     # if started on ``asycio`` running ``trio`` in guest mode |     # if started on ``asycio`` running ``trio`` in guest mode | ||||||
|     _infected_aio: bool = False |     _infected_aio: bool = False | ||||||
| 
 | 
 | ||||||
|     # TODO: nursery tracking like `trio` does? |  | ||||||
|     # _ans: dict[ |     # _ans: dict[ | ||||||
|     #     tuple[str, str], |     #     tuple[str, str], | ||||||
|     #     list[ActorNursery], |     #     list[ActorNursery], | ||||||
|  | @ -395,9 +396,8 @@ class Actor: | ||||||
| 
 | 
 | ||||||
|             raise mne |             raise mne | ||||||
| 
 | 
 | ||||||
|     # TODO: maybe change to mod-func and rename for implied |  | ||||||
|     # multi-transport semantics? |  | ||||||
|     async def _stream_handler( |     async def _stream_handler( | ||||||
|  | 
 | ||||||
|         self, |         self, | ||||||
|         stream: trio.SocketStream, |         stream: trio.SocketStream, | ||||||
| 
 | 
 | ||||||
|  | @ -559,7 +559,7 @@ class Actor: | ||||||
|                         cid: str|None = msg.cid |                         cid: str|None = msg.cid | ||||||
|                         if cid: |                         if cid: | ||||||
|                             # deliver response to local caller/waiter |                             # deliver response to local caller/waiter | ||||||
|                             await self._deliver_ctx_payload( |                             await self._push_result( | ||||||
|                                 chan, |                                 chan, | ||||||
|                                 cid, |                                 cid, | ||||||
|                                 msg, |                                 msg, | ||||||
|  | @ -716,13 +716,43 @@ class Actor: | ||||||
|                         # TODO: figure out why this breaks tests.. |                         # TODO: figure out why this breaks tests.. | ||||||
|                         db_cs.cancel() |                         db_cs.cancel() | ||||||
| 
 | 
 | ||||||
|  |             # XXX: is this necessary (GC should do it)? | ||||||
|  |             # XXX WARNING XXX | ||||||
|  |             # Be AWARE OF THE INDENT LEVEL HERE | ||||||
|  |             # -> ONLY ENTER THIS BLOCK WHEN ._peers IS | ||||||
|  |             # EMPTY!!!! | ||||||
|  |             if ( | ||||||
|  |                 not self._peers | ||||||
|  |                 and chan.connected() | ||||||
|  |             ): | ||||||
|  |                     # if the channel is still connected it may mean the far | ||||||
|  |                     # end has not closed and we may have gotten here due to | ||||||
|  |                     # an error and so we should at least try to terminate | ||||||
|  |                     # the channel from this end gracefully. | ||||||
|  |                     log.runtime( | ||||||
|  |                         'Terminating channel with `None` setinel msg\n' | ||||||
|  |                         f'|_{chan}\n' | ||||||
|  |                     ) | ||||||
|  |                     try: | ||||||
|  |                         # send msg loop terminate sentinel which | ||||||
|  |                         # triggers cancellation of all remotely | ||||||
|  |                         # started tasks. | ||||||
|  |                         await chan.send(None) | ||||||
|  | 
 | ||||||
|  |                         # XXX: do we want this? no right? | ||||||
|  |                         # causes "[104] connection reset by peer" on other end | ||||||
|  |                         # await chan.aclose() | ||||||
|  | 
 | ||||||
|  |                     except trio.BrokenResourceError: | ||||||
|  |                         log.runtime(f"Channel {chan.uid} was already closed") | ||||||
|  | 
 | ||||||
|     # TODO: rename to `._deliver_payload()` since this handles |     # TODO: rename to `._deliver_payload()` since this handles | ||||||
|     # more then just `result` msgs now obvi XD |     # more then just `result` msgs now obvi XD | ||||||
|     async def _deliver_ctx_payload( |     async def _push_result( | ||||||
|         self, |         self, | ||||||
|         chan: Channel, |         chan: Channel, | ||||||
|         cid: str, |         cid: str, | ||||||
|         msg: Msg|MsgTypeError, |         msg: dict[str, Any], | ||||||
| 
 | 
 | ||||||
|     ) -> None|bool: |     ) -> None|bool: | ||||||
|         ''' |         ''' | ||||||
|  | @ -744,16 +774,12 @@ class Actor: | ||||||
|             log.warning( |             log.warning( | ||||||
|                 'Ignoring invalid IPC ctx msg!\n\n' |                 'Ignoring invalid IPC ctx msg!\n\n' | ||||||
|                 f'<= sender: {uid}\n' |                 f'<= sender: {uid}\n' | ||||||
|                 # XXX don't need right since it's always in msg? |                 f'=> cid: {cid}\n\n' | ||||||
|                 # f'=> cid: {cid}\n\n' |  | ||||||
| 
 | 
 | ||||||
|                 f'{pretty_struct.Struct.pformat(msg)}\n' |                 f'{msg}\n' | ||||||
|             ) |             ) | ||||||
|             return |             return | ||||||
| 
 | 
 | ||||||
|         # if isinstance(msg, MsgTypeError): |  | ||||||
|         #     return await ctx._deliver_bad_msg() |  | ||||||
| 
 |  | ||||||
|         return await ctx._deliver_msg(msg) |         return await ctx._deliver_msg(msg) | ||||||
| 
 | 
 | ||||||
|     def get_context( |     def get_context( | ||||||
|  | @ -1411,7 +1437,7 @@ class Actor: | ||||||
|             ) |             ) | ||||||
|         await self._ongoing_rpc_tasks.wait() |         await self._ongoing_rpc_tasks.wait() | ||||||
| 
 | 
 | ||||||
|     def cancel_server(self) -> bool: |     def cancel_server(self) -> None: | ||||||
|         ''' |         ''' | ||||||
|         Cancel the internal IPC transport server nursery thereby |         Cancel the internal IPC transport server nursery thereby | ||||||
|         preventing any new inbound IPC connections establishing. |         preventing any new inbound IPC connections establishing. | ||||||
|  | @ -1420,9 +1446,6 @@ class Actor: | ||||||
|         if self._server_n: |         if self._server_n: | ||||||
|             log.runtime("Shutting down channel server") |             log.runtime("Shutting down channel server") | ||||||
|             self._server_n.cancel_scope.cancel() |             self._server_n.cancel_scope.cancel() | ||||||
|             return True |  | ||||||
| 
 |  | ||||||
|         return False |  | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def accept_addrs(self) -> list[tuple[str, int]]: |     def accept_addrs(self) -> list[tuple[str, int]]: | ||||||
|  |  | ||||||
|  | @ -46,6 +46,7 @@ from .trionics import ( | ||||||
| from tractor.msg import ( | from tractor.msg import ( | ||||||
|     Stop, |     Stop, | ||||||
|     Yield, |     Yield, | ||||||
|  |     Error, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||||
|  | @ -183,7 +184,7 @@ class MsgStream(trio.abc.Channel): | ||||||
|         # - via a received `{'stop': ...}` msg from remote side. |         # - via a received `{'stop': ...}` msg from remote side. | ||||||
|         #   |_ NOTE: previously this was triggered by calling |         #   |_ NOTE: previously this was triggered by calling | ||||||
|         #   ``._rx_chan.aclose()`` on the send side of the channel inside |         #   ``._rx_chan.aclose()`` on the send side of the channel inside | ||||||
|         #   `Actor._deliver_ctx_payload()`, but now the 'stop' message handling |         #   `Actor._push_result()`, but now the 'stop' message handling | ||||||
|         #   has been put just above inside `_raise_from_no_key_in_msg()`. |         #   has been put just above inside `_raise_from_no_key_in_msg()`. | ||||||
|         except ( |         except ( | ||||||
|             trio.EndOfChannel, |             trio.EndOfChannel, | ||||||
|  | @ -390,11 +391,11 @@ class MsgStream(trio.abc.Channel): | ||||||
| 
 | 
 | ||||||
|         if not self._eoc: |         if not self._eoc: | ||||||
|             log.cancel( |             log.cancel( | ||||||
|                 'Stream closed by self before it received an EoC?\n' |                 'Stream closed before it received an EoC?\n' | ||||||
|                 'Setting eoc manually..\n..' |                 'Setting eoc manually..\n..' | ||||||
|             ) |             ) | ||||||
|             self._eoc: bool = trio.EndOfChannel( |             self._eoc: bool = trio.EndOfChannel( | ||||||
|                 f'Context stream closed by self({self._ctx.side})\n' |                 f'Context stream closed by {self._ctx.side}\n' | ||||||
|                 f'|_{self}\n' |                 f'|_{self}\n' | ||||||
|             ) |             ) | ||||||
|         # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? |         # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? | ||||||
|  |  | ||||||
|  | @ -454,10 +454,6 @@ _runtime_msgs: list[Msg] = [ | ||||||
|     # emission from `MsgStream.aclose()` |     # emission from `MsgStream.aclose()` | ||||||
|     Stop, |     Stop, | ||||||
| 
 | 
 | ||||||
|     # `Return` sub-type that we always accept from |  | ||||||
|     # runtime-internal cancel endpoints |  | ||||||
|     CancelAck, |  | ||||||
| 
 |  | ||||||
|     # box remote errors, normally subtypes |     # box remote errors, normally subtypes | ||||||
|     # of `RemoteActorError`. |     # of `RemoteActorError`. | ||||||
|     Error, |     Error, | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue