Compare commits
	
		
			7 Commits 
		
	
	
		
			0e8c60ee4a
			...
			8ea0f08386
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 8ea0f08386 | |
|  | 13ea500a44 | |
|  | 2f854a3e86 | |
|  | cdb1311e40 | |
|  | fcd089c08f | |
|  | 993281882b | |
|  | bbb4d4e52c | 
|  | @ -0,0 +1,56 @@ | ||||||
|  | import trio | ||||||
|  | import tractor | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def name_error( | ||||||
|  |     ctx: tractor.Context, | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Raise a `NameError`, catch it and enter `.post_mortem()`, then | ||||||
|  |     expect the `._rpc._invoke()` crash handler to also engage. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     try: | ||||||
|  |         getattr(doggypants)  # noqa (on purpose) | ||||||
|  |     except NameError: | ||||||
|  |         await tractor.post_mortem() | ||||||
|  |         raise | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def main(): | ||||||
|  |     ''' | ||||||
|  |     Test 3 `PdbREPL` entries: | ||||||
|  |       - one in the child due to manual `.post_mortem()`, | ||||||
|  |       - another in the child due to runtime RPC crash handling. | ||||||
|  |       - final one here in parent from the RAE. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # XXX NOTE: ideally the REPL arrives at this frame in the parent | ||||||
|  |     # ONE UP FROM the inner ctx block below! | ||||||
|  |     async with tractor.open_nursery( | ||||||
|  |         debug_mode=True, | ||||||
|  |         # loglevel='cancel', | ||||||
|  |     ) as an: | ||||||
|  |         p: tractor.Portal = await an.start_actor( | ||||||
|  |             'child', | ||||||
|  |             enable_modules=[__name__], | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # XXX should raise `RemoteActorError[NameError]` | ||||||
|  |         # AND be the active frame when REPL enters! | ||||||
|  |         try: | ||||||
|  |             async with p.open_context(name_error) as (ctx, first): | ||||||
|  |                 assert first | ||||||
|  |         except tractor.RemoteActorError as rae: | ||||||
|  |             assert rae.boxed_type is NameError | ||||||
|  | 
 | ||||||
|  |             # manually handle in root's parent task | ||||||
|  |             await tractor.post_mortem() | ||||||
|  |             raise | ||||||
|  |         else: | ||||||
|  |             raise RuntimeError('IPC ctx should have remote errored!?') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  |     trio.run(main) | ||||||
|  | @ -0,0 +1,88 @@ | ||||||
|  | import trio | ||||||
|  | import tractor | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def cancellable_pause_loop( | ||||||
|  |     task_status: trio.TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED | ||||||
|  | ): | ||||||
|  |     with trio.CancelScope() as cs: | ||||||
|  |         task_status.started(cs) | ||||||
|  |         for _ in range(3): | ||||||
|  |             try: | ||||||
|  |                 # ON first entry, there is no level triggered | ||||||
|  |                 # cancellation yet, so this cp does a parent task | ||||||
|  |                 # ctx-switch so that this scope raises for the NEXT | ||||||
|  |                 # checkpoint we hit. | ||||||
|  |                 await trio.lowlevel.checkpoint() | ||||||
|  |                 await tractor.pause() | ||||||
|  | 
 | ||||||
|  |                 cs.cancel() | ||||||
|  | 
 | ||||||
|  |                 # parent should have called `cs.cancel()` by now | ||||||
|  |                 await trio.lowlevel.checkpoint() | ||||||
|  | 
 | ||||||
|  |             except trio.Cancelled: | ||||||
|  |                 print('INSIDE SHIELDED PAUSE') | ||||||
|  |                 await tractor.pause(shield=True) | ||||||
|  |         else: | ||||||
|  |             # should raise it again, bubbling up to parent | ||||||
|  |             print('BUBBLING trio.Cancelled to parent task-nursery') | ||||||
|  |             await trio.lowlevel.checkpoint() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def pm_on_cancelled(): | ||||||
|  |     async with trio.open_nursery() as tn: | ||||||
|  |         tn.cancel_scope.cancel() | ||||||
|  |         try: | ||||||
|  |             await trio.sleep_forever() | ||||||
|  |         except trio.Cancelled: | ||||||
|  |             # should also raise `Cancelled` since | ||||||
|  |             # we didn't pass `shield=True`. | ||||||
|  |             try: | ||||||
|  |                 await tractor.post_mortem(hide_tb=False) | ||||||
|  |             except trio.Cancelled as taskc: | ||||||
|  | 
 | ||||||
|  |                 # should enter just fine, in fact it should | ||||||
|  |                 # be debugging the internals of the previous | ||||||
|  |                 # sin-shield call above Bo | ||||||
|  |                 await tractor.post_mortem( | ||||||
|  |                     hide_tb=False, | ||||||
|  |                     shield=True, | ||||||
|  |                 ) | ||||||
|  |                 raise taskc | ||||||
|  | 
 | ||||||
|  |         else: | ||||||
|  |             raise RuntimeError('Dint cancel as expected!?') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def cancelled_before_pause( | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Verify that using a shielded pause works despite surrounding | ||||||
|  |     cancellation called state in the calling task. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     async with trio.open_nursery() as tn: | ||||||
|  |         cs: trio.CancelScope = await tn.start(cancellable_pause_loop) | ||||||
|  |         await trio.sleep(0.1) | ||||||
|  | 
 | ||||||
|  |     assert cs.cancelled_caught | ||||||
|  | 
 | ||||||
|  |     await pm_on_cancelled() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def main(): | ||||||
|  |     async with tractor.open_nursery( | ||||||
|  |         debug_mode=True, | ||||||
|  |     ) as n: | ||||||
|  |         portal: tractor.Portal = await n.run_in_actor( | ||||||
|  |             cancelled_before_pause, | ||||||
|  |         ) | ||||||
|  |         await portal.result() | ||||||
|  | 
 | ||||||
|  |         # ensure the same works in the root actor! | ||||||
|  |         await pm_on_cancelled() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  |     trio.run(main) | ||||||
|  | @ -161,6 +161,10 @@ def in_prompt_msg( | ||||||
| 
 | 
 | ||||||
|     return True |     return True | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: todo support terminal color-chars stripping so we can match | ||||||
|  | # against call stack frame output from the the 'll' command the like! | ||||||
|  | # -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789 | ||||||
| def assert_before( | def assert_before( | ||||||
|     child, |     child, | ||||||
|     patts: list[str], |     patts: list[str], | ||||||
|  | @ -1125,7 +1129,187 @@ def test_pause_from_sync( | ||||||
|     child.expect(pexpect.EOF) |     child.expect(pexpect.EOF) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO! | def test_post_mortem_api( | ||||||
|  |     spawn, | ||||||
|  |     ctlc: bool, | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Verify the `tractor.post_mortem()` API works in an exception | ||||||
|  |     handler block. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     child = spawn('pm_in_subactor') | ||||||
|  | 
 | ||||||
|  |     # First entry is via manual `.post_mortem()` | ||||||
|  |     child.expect(PROMPT) | ||||||
|  |     assert_before( | ||||||
|  |         child, | ||||||
|  |         [ | ||||||
|  |             _crash_msg, | ||||||
|  |             "<Task 'name_error'", | ||||||
|  |             "NameError", | ||||||
|  |             "('child'", | ||||||
|  |             "tractor.post_mortem()", | ||||||
|  |         ] | ||||||
|  |     ) | ||||||
|  |     if ctlc: | ||||||
|  |         do_ctlc(child) | ||||||
|  |     child.sendline('c') | ||||||
|  | 
 | ||||||
|  |     # 2nd is RPC crash handler | ||||||
|  |     child.expect(PROMPT) | ||||||
|  |     assert_before( | ||||||
|  |         child, | ||||||
|  |         [ | ||||||
|  |             _crash_msg, | ||||||
|  |             "<Task 'name_error'", | ||||||
|  |             "NameError", | ||||||
|  |             "('child'", | ||||||
|  |         ] | ||||||
|  |     ) | ||||||
|  |     if ctlc: | ||||||
|  |         do_ctlc(child) | ||||||
|  |     child.sendline('c') | ||||||
|  | 
 | ||||||
|  |     # 3rd is via RAE bubbled to root's parent ctx task and | ||||||
|  |     # crash-handled via another manual pm call. | ||||||
|  |     child.expect(PROMPT) | ||||||
|  |     assert_before( | ||||||
|  |         child, | ||||||
|  |         [ | ||||||
|  |             _crash_msg, | ||||||
|  |             "<Task '__main__.main'", | ||||||
|  |             "('root'", | ||||||
|  |             "NameError", | ||||||
|  |             "tractor.post_mortem()", | ||||||
|  |             "src_uid=('child'", | ||||||
|  |         ] | ||||||
|  |     ) | ||||||
|  |     if ctlc: | ||||||
|  |         do_ctlc(child) | ||||||
|  |     child.sendline('c') | ||||||
|  | 
 | ||||||
|  |     # 4th and FINAL is via RAE bubbled to root's parent ctx task and | ||||||
|  |     # crash-handled via another manual pm call. | ||||||
|  |     child.expect(PROMPT) | ||||||
|  |     assert_before( | ||||||
|  |         child, | ||||||
|  |         [ | ||||||
|  |             _crash_msg, | ||||||
|  |             "<Task '__main__.main'", | ||||||
|  |             "('root'", | ||||||
|  |             "NameError", | ||||||
|  |             "src_uid=('child'", | ||||||
|  |         ] | ||||||
|  |     ) | ||||||
|  |     if ctlc: | ||||||
|  |         do_ctlc(child) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |     # TODO: ensure we're stopped and showing the right call stack frame | ||||||
|  |     # -[ ] need a way to strip the terminal color chars in order to | ||||||
|  |     #    pattern match... see TODO around `assert_before()` above! | ||||||
|  |     # child.sendline('w') | ||||||
|  |     # child.expect(PROMPT) | ||||||
|  |     # assert_before( | ||||||
|  |     #     child, | ||||||
|  |     #     [ | ||||||
|  |     #         # error src block annot at ctx open | ||||||
|  |     #         '-> async with p.open_context(name_error) as (ctx, first):', | ||||||
|  |     #     ] | ||||||
|  |     # ) | ||||||
|  | 
 | ||||||
|  |     # # step up a frame to ensure the it's the root's nursery | ||||||
|  |     # child.sendline('u') | ||||||
|  |     # child.expect(PROMPT) | ||||||
|  |     # assert_before( | ||||||
|  |     #     child, | ||||||
|  |     #     [ | ||||||
|  |     #         # handler block annotation | ||||||
|  |     #         '-> async with tractor.open_nursery(', | ||||||
|  |     #     ] | ||||||
|  |     # ) | ||||||
|  | 
 | ||||||
|  |     child.sendline('c') | ||||||
|  |     child.expect(pexpect.EOF) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_shield_pause( | ||||||
|  |     spawn, | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Verify the `tractor.pause()/.post_mortem()` API works inside an | ||||||
|  |     already cancelled `trio.CancelScope` and that you can step to the | ||||||
|  |     next checkpoint wherein the cancelled will get raised. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     child = spawn('shielded_pause') | ||||||
|  | 
 | ||||||
|  |     # First entry is via manual `.post_mortem()` | ||||||
|  |     child.expect(PROMPT) | ||||||
|  |     assert_before( | ||||||
|  |         child, | ||||||
|  |         [ | ||||||
|  |             _pause_msg, | ||||||
|  |             "cancellable_pause_loop'", | ||||||
|  |             "('cancelled_before_pause'",  # actor name | ||||||
|  |         ] | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     # since 3 tries in ex. shield pause loop | ||||||
|  |     for i in range(3): | ||||||
|  |         child.sendline('c') | ||||||
|  |         child.expect(PROMPT) | ||||||
|  |         assert_before( | ||||||
|  |             child, | ||||||
|  |             [ | ||||||
|  |                 _pause_msg, | ||||||
|  |                 "INSIDE SHIELDED PAUSE", | ||||||
|  |                 "('cancelled_before_pause'",  # actor name | ||||||
|  |             ] | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     # back inside parent task that opened nursery | ||||||
|  |     child.sendline('c') | ||||||
|  |     child.expect(PROMPT) | ||||||
|  |     assert_before( | ||||||
|  |         child, | ||||||
|  |         [ | ||||||
|  |             _crash_msg, | ||||||
|  |             "('cancelled_before_pause'",  # actor name | ||||||
|  |             "Failed to engage debugger via `_pause()`", | ||||||
|  |             "trio.Cancelled", | ||||||
|  |             "raise Cancelled._create()", | ||||||
|  | 
 | ||||||
|  |             # we should be handling a taskc inside | ||||||
|  |             # the first `.port_mortem()` sin-shield! | ||||||
|  |             'await DebugStatus.req_finished.wait()', | ||||||
|  |         ] | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     # same as above but in the root actor's task | ||||||
|  |     child.sendline('c') | ||||||
|  |     child.expect(PROMPT) | ||||||
|  |     assert_before( | ||||||
|  |         child, | ||||||
|  |         [ | ||||||
|  |             _crash_msg, | ||||||
|  |             "('root'",  # actor name | ||||||
|  |             "Failed to engage debugger via `_pause()`", | ||||||
|  |             "trio.Cancelled", | ||||||
|  |             "raise Cancelled._create()", | ||||||
|  | 
 | ||||||
|  |             # handling a taskc inside the first unshielded | ||||||
|  |             # `.port_mortem()`. | ||||||
|  |             # BUT in this case in the root-proc path ;) | ||||||
|  |             'wait Lock._debug_lock.acquire()', | ||||||
|  |         ] | ||||||
|  |     ) | ||||||
|  |     child.sendline('c') | ||||||
|  |     child.expect(pexpect.EOF) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: needs ANSI code stripping tho, see `assert_before()` # above! | ||||||
| def test_correct_frames_below_hidden(): | def test_correct_frames_below_hidden(): | ||||||
|     ''' |     ''' | ||||||
|     Ensure that once a `tractor.pause()` enages, when the user |     Ensure that once a `tractor.pause()` enages, when the user | ||||||
|  | @ -1138,4 +1322,11 @@ def test_correct_frames_below_hidden(): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_cant_pause_from_paused_task(): | def test_cant_pause_from_paused_task(): | ||||||
|  |     ''' | ||||||
|  |     Pausing from with an already paused task should raise an error. | ||||||
|  | 
 | ||||||
|  |     Normally this should only happen in practise while debugging the call stack of `tractor.pause()` itself, likely | ||||||
|  |     by a `.pause()` line somewhere inside our runtime. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|     ... |     ... | ||||||
|  |  | ||||||
|  | @ -46,7 +46,7 @@ maybe_msg_spec = PldMsg|None | ||||||
| async def maybe_expect_raises( | async def maybe_expect_raises( | ||||||
|     raises: BaseException|None = None, |     raises: BaseException|None = None, | ||||||
|     ensure_in_message: list[str]|None = None, |     ensure_in_message: list[str]|None = None, | ||||||
|     reraise: bool = False, |     post_mortem: bool = False, | ||||||
|     timeout: int = 3, |     timeout: int = 3, | ||||||
| ) -> None: | ) -> None: | ||||||
|     ''' |     ''' | ||||||
|  | @ -86,8 +86,8 @@ async def maybe_expect_raises( | ||||||
|                                 f'{inner_err.args}' |                                 f'{inner_err.args}' | ||||||
|                         ) |                         ) | ||||||
| 
 | 
 | ||||||
|                 if reraise: |                 if post_mortem: | ||||||
|                     raise inner_err |                     await tractor.post_mortem() | ||||||
| 
 | 
 | ||||||
|         else: |         else: | ||||||
|             if raises: |             if raises: | ||||||
|  | @ -314,6 +314,8 @@ def test_basic_payload_spec( | ||||||
|                         f"value: `{bad_value_str}` does not " |                         f"value: `{bad_value_str}` does not " | ||||||
|                         f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`", |                         f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`", | ||||||
|                     ], |                     ], | ||||||
|  |                     # only for debug | ||||||
|  |                     post_mortem=True, | ||||||
|                 ), |                 ), | ||||||
|                 p.open_context( |                 p.open_context( | ||||||
|                     child, |                     child, | ||||||
|  |  | ||||||
|  | @ -1190,6 +1190,7 @@ class Context: | ||||||
|         self, |         self, | ||||||
|         remote_error: Exception, |         remote_error: Exception, | ||||||
| 
 | 
 | ||||||
|  |         from_src_exc: BaseException|None|bool = False, | ||||||
|         raise_ctxc_from_self_call: bool = False, |         raise_ctxc_from_self_call: bool = False, | ||||||
|         raise_overrun_from_self: bool = True, |         raise_overrun_from_self: bool = True, | ||||||
|         hide_tb: bool = True, |         hide_tb: bool = True, | ||||||
|  | @ -1284,7 +1285,10 @@ class Context: | ||||||
|         #       runtime frames from the tb explicitly? |         #       runtime frames from the tb explicitly? | ||||||
|         # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement |         # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement | ||||||
|         # https://stackoverflow.com/a/24752607 |         # https://stackoverflow.com/a/24752607 | ||||||
|         raise remote_error # from None |         if from_src_exc is not False: | ||||||
|  |             raise remote_error from from_src_exc | ||||||
|  | 
 | ||||||
|  |         raise remote_error | ||||||
| 
 | 
 | ||||||
|     # TODO: change  to `.wait_for_result()`? |     # TODO: change  to `.wait_for_result()`? | ||||||
|     async def result( |     async def result( | ||||||
|  | @ -2096,7 +2100,11 @@ async def open_context_from_portal( | ||||||
|                 # `._maybe_cancel_and_set_remote_error()` so ensure |                 # `._maybe_cancel_and_set_remote_error()` so ensure | ||||||
|                 # we raise the underlying `._remote_error` directly |                 # we raise the underlying `._remote_error` directly | ||||||
|                 # instead of bubbling that taskc. |                 # instead of bubbling that taskc. | ||||||
|                 ctx.maybe_raise() |                 ctx.maybe_raise( | ||||||
|  |                     # mask the above taskc from the tb | ||||||
|  |                     from_src_exc=None, | ||||||
|  |                     hide_tb=hide_tb, | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
|                 # OW, some other unexpected cancel condition |                 # OW, some other unexpected cancel condition | ||||||
|                 # that should prolly never happen right? |                 # that should prolly never happen right? | ||||||
|  | @ -2108,13 +2116,14 @@ async def open_context_from_portal( | ||||||
|             ctx._started_msg: bool = started_msg |             ctx._started_msg: bool = started_msg | ||||||
|             ctx._started_pld: bool = first |             ctx._started_pld: bool = first | ||||||
| 
 | 
 | ||||||
|             # deliver context instance and .started() msg value |             # deliver context ref and `.started()` msg payload value | ||||||
|             # in enter tuple. |             # in `__aenter__` tuple. | ||||||
|             yield ctx, first |             yield ctx, first | ||||||
| 
 | 
 | ||||||
|             # ??TODO??: do we still want to consider this or is |             # ??TODO??: do we still want to consider this or is | ||||||
|             # the `else:` block handling via a `.result()` |             # the `else:` block handling via a `.result()` | ||||||
|             # call below enough?? |             # call below enough?? | ||||||
|  |             # | ||||||
|             # -[ ] pretty sure `.result()` internals do the |             # -[ ] pretty sure `.result()` internals do the | ||||||
|             # same as our ctxc handler below so it ended up |             # same as our ctxc handler below so it ended up | ||||||
|             # being same (repeated?) behaviour, but ideally we |             # being same (repeated?) behaviour, but ideally we | ||||||
|  | @ -2123,33 +2132,13 @@ async def open_context_from_portal( | ||||||
|             # that we can re-use it around the `yield` ^ here |             # that we can re-use it around the `yield` ^ here | ||||||
|             # or vice versa? |             # or vice versa? | ||||||
|             # |             # | ||||||
|             # NOTE: between the caller exiting and arriving |             # maybe TODO NOTE: between the caller exiting and | ||||||
|             # here the far end may have sent a ctxc-msg or |             # arriving here the far end may have sent a ctxc-msg or | ||||||
|             # other error, so check for it here immediately |             # other error, so the quetion is whether we should check | ||||||
|             # and maybe raise so as to engage the ctxc |             # for it here immediately and maybe raise so as to engage | ||||||
|             # handling block below! |             # the ctxc handling block below ???? | ||||||
|             # |             # | ||||||
|             # if re := ctx._remote_error: |             # self.maybe_raise() | ||||||
|             #     maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( |  | ||||||
|             #         re, |  | ||||||
|             #         # TODO: do we want this to always raise? |  | ||||||
|             #         # - means that on self-ctxc, if/when the |  | ||||||
|             #         #   block is exited before the msg arrives |  | ||||||
|             #         #   but then the msg during __exit__ |  | ||||||
|             #         #   calling we may not activate the |  | ||||||
|             #         #   ctxc-handler block below? should we |  | ||||||
|             #         #   be? |  | ||||||
|             #         # - if there's a remote error that arrives |  | ||||||
|             #         #   after the child has exited, we won't |  | ||||||
|             #         #   handle until the `finally:` block |  | ||||||
|             #         #   where `.result()` is always called, |  | ||||||
|             #         #   again in which case we handle it |  | ||||||
|             #         #   differently then in the handler block |  | ||||||
|             #         #   that would normally engage from THIS |  | ||||||
|             #         #   block? |  | ||||||
|             #         raise_ctxc_from_self_call=True, |  | ||||||
|             #     ) |  | ||||||
|             #     ctxc_from_callee = maybe_ctxc |  | ||||||
| 
 | 
 | ||||||
|             # when in allow_overruns mode there may be |             # when in allow_overruns mode there may be | ||||||
|             # lingering overflow sender tasks remaining? |             # lingering overflow sender tasks remaining? | ||||||
|  | @ -2460,7 +2449,7 @@ async def open_context_from_portal( | ||||||
|             # |             # | ||||||
|             # NOTE: further, this should be the only place the |             # NOTE: further, this should be the only place the | ||||||
|             # underlying feeder channel is |             # underlying feeder channel is | ||||||
|             # once-and-only-CLOSED! |             # once-forever-and-only-CLOSED! | ||||||
|             with trio.CancelScope(shield=True): |             with trio.CancelScope(shield=True): | ||||||
|                 await ctx._rx_chan.aclose() |                 await ctx._rx_chan.aclose() | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -518,7 +518,6 @@ class RemoteActorError(Exception): | ||||||
|     def pformat( |     def pformat( | ||||||
|         self, |         self, | ||||||
|         with_type_header: bool = True, |         with_type_header: bool = True, | ||||||
|         # with_ascii_box: bool = True, |  | ||||||
| 
 | 
 | ||||||
|     ) -> str: |     ) -> str: | ||||||
|         ''' |         ''' | ||||||
|  | @ -885,9 +884,9 @@ class MsgTypeError( | ||||||
|             extra_msgdata['_bad_msg'] = bad_msg |             extra_msgdata['_bad_msg'] = bad_msg | ||||||
|             extra_msgdata['cid'] = bad_msg.cid |             extra_msgdata['cid'] = bad_msg.cid | ||||||
| 
 | 
 | ||||||
|  |         extra_msgdata.setdefault('boxed_type', cls) | ||||||
|         return cls( |         return cls( | ||||||
|             message=message, |             message=message, | ||||||
|             boxed_type=cls, |  | ||||||
|             **extra_msgdata, |             **extra_msgdata, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  | @ -1111,7 +1110,7 @@ def is_multi_cancelled( | ||||||
| def _raise_from_unexpected_msg( | def _raise_from_unexpected_msg( | ||||||
|     ctx: Context, |     ctx: Context, | ||||||
|     msg: MsgType, |     msg: MsgType, | ||||||
|     src_err: AttributeError, |     src_err: Exception, | ||||||
|     log: StackLevelAdapter,  # caller specific `log` obj |     log: StackLevelAdapter,  # caller specific `log` obj | ||||||
| 
 | 
 | ||||||
|     expect_msg: Type[MsgType], |     expect_msg: Type[MsgType], | ||||||
|  | @ -1212,7 +1211,7 @@ def _raise_from_unexpected_msg( | ||||||
|             # in case there already is some underlying remote error |             # in case there already is some underlying remote error | ||||||
|             # that arrived which is probably the source of this stream |             # that arrived which is probably the source of this stream | ||||||
|             # closure |             # closure | ||||||
|             ctx.maybe_raise() |             ctx.maybe_raise(from_src_exc=src_err) | ||||||
|             raise eoc from src_err |             raise eoc from src_err | ||||||
| 
 | 
 | ||||||
|         # TODO: our own transport/IPC-broke error subtype? |         # TODO: our own transport/IPC-broke error subtype? | ||||||
|  | @ -1361,6 +1360,7 @@ def _mk_msg_type_err( | ||||||
|             message=message, |             message=message, | ||||||
|             bad_msg=bad_msg, |             bad_msg=bad_msg, | ||||||
|             bad_msg_as_dict=msg_dict, |             bad_msg_as_dict=msg_dict, | ||||||
|  |             boxed_type=type(src_validation_error), | ||||||
| 
 | 
 | ||||||
|             # NOTE: for pld-spec MTEs we set the `._ipc_msg` manually: |             # NOTE: for pld-spec MTEs we set the `._ipc_msg` manually: | ||||||
|             # - for the send-side `.started()` pld-validate |             # - for the send-side `.started()` pld-validate | ||||||
|  |  | ||||||
|  | @ -80,7 +80,6 @@ from tractor.msg.types import ( | ||||||
|     Yield, |     Yield, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||||
|     from ._runtime import Actor |     from ._runtime import Actor | ||||||
| 
 | 
 | ||||||
|  | @ -328,7 +327,6 @@ async def _errors_relayed_via_ipc( | ||||||
|                         f'|_{ctx}' |                         f'|_{ctx}' | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
|         # ALWAYS try to ship RPC errors back to parent/caller task |         # ALWAYS try to ship RPC errors back to parent/caller task | ||||||
|         if is_rpc: |         if is_rpc: | ||||||
| 
 | 
 | ||||||
|  | @ -819,6 +817,12 @@ async def try_ship_error_to_remote( | ||||||
|                 # TODO: use `.msg.preetty_struct` for this! |                 # TODO: use `.msg.preetty_struct` for this! | ||||||
|                 f'{msg}\n' |                 f'{msg}\n' | ||||||
|             ) |             ) | ||||||
|  |         except BaseException: | ||||||
|  |             log.exception( | ||||||
|  |                 'Errored while attempting error shipment?' | ||||||
|  |             ) | ||||||
|  |             __tracebackhide__: bool = False | ||||||
|  |             raise | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def process_messages( | async def process_messages( | ||||||
|  |  | ||||||
|  | @ -233,6 +233,7 @@ class MsgStream(trio.abc.Channel): | ||||||
|         # ctx: Context = self._ctx |         # ctx: Context = self._ctx | ||||||
|         ctx.maybe_raise( |         ctx.maybe_raise( | ||||||
|             raise_ctxc_from_self_call=True, |             raise_ctxc_from_self_call=True, | ||||||
|  |             from_src_exc=src_err, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         # propagate any error but hide low-level frame details |         # propagate any error but hide low-level frame details | ||||||
|  |  | ||||||
|  | @ -1600,25 +1600,27 @@ async def _pause( | ||||||
|                     f'REPL: {Lock.repl}\n' |                     f'REPL: {Lock.repl}\n' | ||||||
|                     # TODO: use `._frame_stack` scanner to find the @api_frame |                     # TODO: use `._frame_stack` scanner to find the @api_frame | ||||||
|                 ) |                 ) | ||||||
|                 await trio.lowlevel.checkpoint() |                 with trio.CancelScope(shield=shield): | ||||||
|  |                     await trio.lowlevel.checkpoint() | ||||||
|                 return |                 return | ||||||
| 
 | 
 | ||||||
|             # XXX: since we need to enter pdb synchronously below, |             # XXX: since we need to enter pdb synchronously below, | ||||||
|             # we have to release the lock manually from pdb completion |             # we have to release the lock manually from pdb completion | ||||||
|             # callbacks. Can't think of a nicer way then this atm. |             # callbacks. Can't think of a nicer way then this atm. | ||||||
|             if Lock._debug_lock.locked(): |             with trio.CancelScope(shield=shield): | ||||||
|                 log.warning( |                 if Lock._debug_lock.locked(): | ||||||
|                     'attempting to shield-acquire active TTY lock owned by\n' |                     log.warning( | ||||||
|                     f'{ctx}' |                         'attempting to shield-acquire active TTY lock owned by\n' | ||||||
|                 ) |                         f'{ctx}' | ||||||
|  |                     ) | ||||||
| 
 | 
 | ||||||
|                 # must shield here to avoid hitting a ``Cancelled`` and |                     # must shield here to avoid hitting a ``Cancelled`` and | ||||||
|                 # a child getting stuck bc we clobbered the tty |                     # a child getting stuck bc we clobbered the tty | ||||||
|                 with trio.CancelScope(shield=True): |                     # with trio.CancelScope(shield=True): | ||||||
|  |                     await Lock._debug_lock.acquire() | ||||||
|  |                 else: | ||||||
|  |                     # may be cancelled | ||||||
|                     await Lock._debug_lock.acquire() |                     await Lock._debug_lock.acquire() | ||||||
|             else: |  | ||||||
|                 # may be cancelled |  | ||||||
|                 await Lock._debug_lock.acquire() |  | ||||||
| 
 | 
 | ||||||
|             # enter REPL from root, no TTY locking IPC ctx necessary |             # enter REPL from root, no TTY locking IPC ctx necessary | ||||||
|             _enter_repl_sync(debug_func) |             _enter_repl_sync(debug_func) | ||||||
|  | @ -1659,7 +1661,8 @@ async def _pause( | ||||||
|                             f'{task.name}@{actor.uid} already has TTY lock\n' |                             f'{task.name}@{actor.uid} already has TTY lock\n' | ||||||
|                             f'ignoring..' |                             f'ignoring..' | ||||||
|                         ) |                         ) | ||||||
|                         await trio.lowlevel.checkpoint() |                         with trio.CancelScope(shield=shield): | ||||||
|  |                             await trio.lowlevel.checkpoint() | ||||||
|                         return |                         return | ||||||
| 
 | 
 | ||||||
|                     else: |                     else: | ||||||
|  | @ -1671,8 +1674,9 @@ async def _pause( | ||||||
|                             f'{task}@{actor.uid} already has TTY lock\n' |                             f'{task}@{actor.uid} already has TTY lock\n' | ||||||
|                             f'waiting for release..' |                             f'waiting for release..' | ||||||
|                         ) |                         ) | ||||||
|                         await DebugStatus.repl_release.wait() |                         with trio.CancelScope(shield=shield): | ||||||
|                         await trio.sleep(0.1) |                             await DebugStatus.repl_release.wait() | ||||||
|  |                             await trio.sleep(0.1) | ||||||
| 
 | 
 | ||||||
|                 elif ( |                 elif ( | ||||||
|                     req_task |                     req_task | ||||||
|  | @ -1683,7 +1687,8 @@ async def _pause( | ||||||
| 
 | 
 | ||||||
|                         'Waiting for previous request to complete..\n' |                         'Waiting for previous request to complete..\n' | ||||||
|                     ) |                     ) | ||||||
|                     await DebugStatus.req_finished.wait() |                     with trio.CancelScope(shield=shield): | ||||||
|  |                         await DebugStatus.req_finished.wait() | ||||||
| 
 | 
 | ||||||
|             # this **must** be awaited by the caller and is done using the |             # this **must** be awaited by the caller and is done using the | ||||||
|             # root nursery so that the debugger can continue to run without |             # root nursery so that the debugger can continue to run without | ||||||
|  | @ -1721,14 +1726,15 @@ async def _pause( | ||||||
|                 'Starting request task\n' |                 'Starting request task\n' | ||||||
|                 f'|_{task}\n' |                 f'|_{task}\n' | ||||||
|             ) |             ) | ||||||
|             req_ctx: Context = await actor._service_n.start( |             with trio.CancelScope(shield=shield): | ||||||
|                 partial( |                 req_ctx: Context = await actor._service_n.start( | ||||||
|                     request_root_stdio_lock, |                     partial( | ||||||
|                     actor_uid=actor.uid, |                         request_root_stdio_lock, | ||||||
|                     task_uid=(task.name, id(task)),  # task uuid (effectively) |                         actor_uid=actor.uid, | ||||||
|                     shield=shield, |                         task_uid=(task.name, id(task)),  # task uuid (effectively) | ||||||
|  |                         shield=shield, | ||||||
|  |                     ) | ||||||
|                 ) |                 ) | ||||||
|             ) |  | ||||||
|             # XXX sanity, our locker task should be the one which |             # XXX sanity, our locker task should be the one which | ||||||
|             # entered a new IPC ctx with the root actor, NOT the one |             # entered a new IPC ctx with the root actor, NOT the one | ||||||
|             # that exists around the task calling into `._pause()`. |             # that exists around the task calling into `._pause()`. | ||||||
|  | @ -2147,6 +2153,13 @@ async def post_mortem( | ||||||
|     **_pause_kwargs, |     **_pause_kwargs, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|  |     ''' | ||||||
|  |     `tractor`'s builtin async equivalient of `pdb.post_mortem()` | ||||||
|  |     which can be used inside exception handlers. | ||||||
|  | 
 | ||||||
|  |     It's also used for the crash handler when `debug_mode == True` ;) | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|     __tracebackhide__: bool = hide_tb |     __tracebackhide__: bool = hide_tb | ||||||
| 
 | 
 | ||||||
|     tb: TracebackType = tb or sys.exc_info()[2] |     tb: TracebackType = tb or sys.exc_info()[2] | ||||||
|  |  | ||||||
|  | @ -167,7 +167,7 @@ class PldRx(Struct): | ||||||
|         ipc_msg: MsgType|None = None, |         ipc_msg: MsgType|None = None, | ||||||
|         expect_msg: Type[MsgType]|None = None, |         expect_msg: Type[MsgType]|None = None, | ||||||
|         hide_tb: bool = False, |         hide_tb: bool = False, | ||||||
|         **dec_msg_kwargs, |         **dec_pld_kwargs, | ||||||
| 
 | 
 | ||||||
|     ) -> Any|Raw: |     ) -> Any|Raw: | ||||||
|         __tracebackhide__: bool = hide_tb |         __tracebackhide__: bool = hide_tb | ||||||
|  | @ -179,12 +179,12 @@ class PldRx(Struct): | ||||||
|             # sync-rx msg from underlying IPC feeder (mem-)chan |             # sync-rx msg from underlying IPC feeder (mem-)chan | ||||||
|             ipc._rx_chan.receive_nowait() |             ipc._rx_chan.receive_nowait() | ||||||
|         ) |         ) | ||||||
|         return self.dec_msg( |         return self.decode_pld( | ||||||
|             msg, |             msg, | ||||||
|             ipc=ipc, |             ipc=ipc, | ||||||
|             expect_msg=expect_msg, |             expect_msg=expect_msg, | ||||||
|             hide_tb=hide_tb, |             hide_tb=hide_tb, | ||||||
|             **dec_msg_kwargs, |             **dec_pld_kwargs, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     async def recv_pld( |     async def recv_pld( | ||||||
|  | @ -194,7 +194,7 @@ class PldRx(Struct): | ||||||
|         expect_msg: Type[MsgType]|None = None, |         expect_msg: Type[MsgType]|None = None, | ||||||
|         hide_tb: bool = True, |         hide_tb: bool = True, | ||||||
| 
 | 
 | ||||||
|         **dec_msg_kwargs, |         **dec_pld_kwargs, | ||||||
| 
 | 
 | ||||||
|     ) -> Any|Raw: |     ) -> Any|Raw: | ||||||
|         ''' |         ''' | ||||||
|  | @ -208,17 +208,14 @@ class PldRx(Struct): | ||||||
|             # async-rx msg from underlying IPC feeder (mem-)chan |             # async-rx msg from underlying IPC feeder (mem-)chan | ||||||
|             await ipc._rx_chan.receive() |             await ipc._rx_chan.receive() | ||||||
|         ) |         ) | ||||||
|         return self.dec_msg( |         return self.decode_pld( | ||||||
|             msg=msg, |             msg=msg, | ||||||
|             ipc=ipc, |             ipc=ipc, | ||||||
|             expect_msg=expect_msg, |             expect_msg=expect_msg, | ||||||
|             **dec_msg_kwargs, |             **dec_pld_kwargs, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     # TODO: rename to, |     def decode_pld( | ||||||
|     # -[ ] `.decode_pld()`? |  | ||||||
|     # -[ ] `.dec_pld()`? |  | ||||||
|     def dec_msg( |  | ||||||
|         self, |         self, | ||||||
|         msg: MsgType, |         msg: MsgType, | ||||||
|         ipc: Context|MsgStream, |         ipc: Context|MsgStream, | ||||||
|  | @ -299,9 +296,6 @@ class PldRx(Struct): | ||||||
|                             if not is_started_send_side |                             if not is_started_send_side | ||||||
|                             else ipc._actor.uid |                             else ipc._actor.uid | ||||||
|                         ), |                         ), | ||||||
|                         # tb=valerr.__traceback__, |  | ||||||
|                         # tb_str=mte._message, |  | ||||||
|                         # message=mte._message, |  | ||||||
|                     ) |                     ) | ||||||
|                     mte._ipc_msg = err_msg |                     mte._ipc_msg = err_msg | ||||||
| 
 | 
 | ||||||
|  | @ -317,29 +311,6 @@ class PldRx(Struct): | ||||||
|                     # validation error. |                     # validation error. | ||||||
|                     src_err = valerr |                     src_err = valerr | ||||||
| 
 | 
 | ||||||
|                     # TODO: should we instead make this explicit and |  | ||||||
|                     # use the above masked `is_started_send_decode`, |  | ||||||
|                     # expecting the `Context.started()` caller to set |  | ||||||
|                     # it? Rn this is kinda, howyousayyy, implicitly |  | ||||||
|                     # edge-case-y.. |  | ||||||
|                     # TODO: remove this since it's been added to |  | ||||||
|                     # `_raise_from_unexpected_msg()`..? |  | ||||||
|                     # if ( |  | ||||||
|                     #     expect_msg is not Started |  | ||||||
|                     #     and not is_started_send_side |  | ||||||
|                     # ): |  | ||||||
|                     #     # set emulated remote error more-or-less as the |  | ||||||
|                     #     # runtime would |  | ||||||
|                     #     ctx: Context = getattr(ipc, 'ctx', ipc) |  | ||||||
|                     #     ctx._maybe_cancel_and_set_remote_error(mte) |  | ||||||
| 
 |  | ||||||
|                 # XXX some other decoder specific failure? |  | ||||||
|                 # except TypeError as src_error: |  | ||||||
|                 #     from .devx import mk_pdb |  | ||||||
|                 #     mk_pdb().set_trace() |  | ||||||
|                 #     raise src_error |  | ||||||
|                 # ^-TODO-^ can remove? |  | ||||||
| 
 |  | ||||||
|             # a runtime-internal RPC endpoint response. |             # a runtime-internal RPC endpoint response. | ||||||
|             # always passthrough since (internal) runtime |             # always passthrough since (internal) runtime | ||||||
|             # responses are generally never exposed to consumer |             # responses are generally never exposed to consumer | ||||||
|  | @ -435,6 +406,8 @@ class PldRx(Struct): | ||||||
|             __tracebackhide__: bool = False |             __tracebackhide__: bool = False | ||||||
|             raise |             raise | ||||||
| 
 | 
 | ||||||
|  |     dec_msg = decode_pld | ||||||
|  | 
 | ||||||
|     async def recv_msg_w_pld( |     async def recv_msg_w_pld( | ||||||
|         self, |         self, | ||||||
|         ipc: Context|MsgStream, |         ipc: Context|MsgStream, | ||||||
|  | @ -463,7 +436,7 @@ class PldRx(Struct): | ||||||
|         # TODO: is there some way we can inject the decoded |         # TODO: is there some way we can inject the decoded | ||||||
|         # payload into an existing output buffer for the original |         # payload into an existing output buffer for the original | ||||||
|         # msg instance? |         # msg instance? | ||||||
|         pld: PayloadT = self.dec_msg( |         pld: PayloadT = self.decode_pld( | ||||||
|             msg, |             msg, | ||||||
|             ipc=ipc, |             ipc=ipc, | ||||||
|             expect_msg=expect_msg, |             expect_msg=expect_msg, | ||||||
|  | @ -610,7 +583,10 @@ async def drain_to_final_msg( | ||||||
|             # only when we are sure the remote error is |             # only when we are sure the remote error is | ||||||
|             # the source cause of this local task's |             # the source cause of this local task's | ||||||
|             # cancellation. |             # cancellation. | ||||||
|             ctx.maybe_raise() |             ctx.maybe_raise( | ||||||
|  |                 # TODO: when use this/ | ||||||
|  |                 # from_src_exc=taskc, | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|             # CASE 1: we DID request the cancel we simply |             # CASE 1: we DID request the cancel we simply | ||||||
|             # continue to bubble up as normal. |             # continue to bubble up as normal. | ||||||
|  | @ -783,7 +759,7 @@ def validate_payload_msg( | ||||||
|     try: |     try: | ||||||
|         roundtripped: Started = codec.decode(msg_bytes) |         roundtripped: Started = codec.decode(msg_bytes) | ||||||
|         ctx: Context = getattr(ipc, 'ctx', ipc) |         ctx: Context = getattr(ipc, 'ctx', ipc) | ||||||
|         pld: PayloadT = ctx.pld_rx.dec_msg( |         pld: PayloadT = ctx.pld_rx.decode_pld( | ||||||
|             msg=roundtripped, |             msg=roundtripped, | ||||||
|             ipc=ipc, |             ipc=ipc, | ||||||
|             expect_msg=Started, |             expect_msg=Started, | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue