Compare commits
7 Commits
0e8c60ee4a
...
8ea0f08386
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 8ea0f08386 | |
Tyler Goodlet | 13ea500a44 | |
Tyler Goodlet | 2f854a3e86 | |
Tyler Goodlet | cdb1311e40 | |
Tyler Goodlet | fcd089c08f | |
Tyler Goodlet | 993281882b | |
Tyler Goodlet | bbb4d4e52c |
|
@ -0,0 +1,56 @@
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def name_error(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Raise a `NameError`, catch it and enter `.post_mortem()`, then
|
||||||
|
expect the `._rpc._invoke()` crash handler to also engage.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
getattr(doggypants) # noqa (on purpose)
|
||||||
|
except NameError:
|
||||||
|
await tractor.post_mortem()
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
'''
|
||||||
|
Test 3 `PdbREPL` entries:
|
||||||
|
- one in the child due to manual `.post_mortem()`,
|
||||||
|
- another in the child due to runtime RPC crash handling.
|
||||||
|
- final one here in parent from the RAE.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# XXX NOTE: ideally the REPL arrives at this frame in the parent
|
||||||
|
# ONE UP FROM the inner ctx block below!
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=True,
|
||||||
|
# loglevel='cancel',
|
||||||
|
) as an:
|
||||||
|
p: tractor.Portal = await an.start_actor(
|
||||||
|
'child',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX should raise `RemoteActorError[NameError]`
|
||||||
|
# AND be the active frame when REPL enters!
|
||||||
|
try:
|
||||||
|
async with p.open_context(name_error) as (ctx, first):
|
||||||
|
assert first
|
||||||
|
except tractor.RemoteActorError as rae:
|
||||||
|
assert rae.boxed_type is NameError
|
||||||
|
|
||||||
|
# manually handle in root's parent task
|
||||||
|
await tractor.post_mortem()
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
raise RuntimeError('IPC ctx should have remote errored!?')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -0,0 +1,88 @@
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
async def cancellable_pause_loop(
|
||||||
|
task_status: trio.TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
|
||||||
|
):
|
||||||
|
with trio.CancelScope() as cs:
|
||||||
|
task_status.started(cs)
|
||||||
|
for _ in range(3):
|
||||||
|
try:
|
||||||
|
# ON first entry, there is no level triggered
|
||||||
|
# cancellation yet, so this cp does a parent task
|
||||||
|
# ctx-switch so that this scope raises for the NEXT
|
||||||
|
# checkpoint we hit.
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
|
cs.cancel()
|
||||||
|
|
||||||
|
# parent should have called `cs.cancel()` by now
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
|
print('INSIDE SHIELDED PAUSE')
|
||||||
|
await tractor.pause(shield=True)
|
||||||
|
else:
|
||||||
|
# should raise it again, bubbling up to parent
|
||||||
|
print('BUBBLING trio.Cancelled to parent task-nursery')
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
|
||||||
|
async def pm_on_cancelled():
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
try:
|
||||||
|
await trio.sleep_forever()
|
||||||
|
except trio.Cancelled:
|
||||||
|
# should also raise `Cancelled` since
|
||||||
|
# we didn't pass `shield=True`.
|
||||||
|
try:
|
||||||
|
await tractor.post_mortem(hide_tb=False)
|
||||||
|
except trio.Cancelled as taskc:
|
||||||
|
|
||||||
|
# should enter just fine, in fact it should
|
||||||
|
# be debugging the internals of the previous
|
||||||
|
# sin-shield call above Bo
|
||||||
|
await tractor.post_mortem(
|
||||||
|
hide_tb=False,
|
||||||
|
shield=True,
|
||||||
|
)
|
||||||
|
raise taskc
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise RuntimeError('Dint cancel as expected!?')
|
||||||
|
|
||||||
|
|
||||||
|
async def cancelled_before_pause(
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that using a shielded pause works despite surrounding
|
||||||
|
cancellation called state in the calling task.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
|
cs: trio.CancelScope = await tn.start(cancellable_pause_loop)
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
assert cs.cancelled_caught
|
||||||
|
|
||||||
|
await pm_on_cancelled()
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=True,
|
||||||
|
) as n:
|
||||||
|
portal: tractor.Portal = await n.run_in_actor(
|
||||||
|
cancelled_before_pause,
|
||||||
|
)
|
||||||
|
await portal.result()
|
||||||
|
|
||||||
|
# ensure the same works in the root actor!
|
||||||
|
await pm_on_cancelled()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -161,6 +161,10 @@ def in_prompt_msg(
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: todo support terminal color-chars stripping so we can match
|
||||||
|
# against call stack frame output from the the 'll' command the like!
|
||||||
|
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
|
||||||
def assert_before(
|
def assert_before(
|
||||||
child,
|
child,
|
||||||
patts: list[str],
|
patts: list[str],
|
||||||
|
@ -1125,7 +1129,187 @@ def test_pause_from_sync(
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
# TODO!
|
def test_post_mortem_api(
|
||||||
|
spawn,
|
||||||
|
ctlc: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify the `tractor.post_mortem()` API works in an exception
|
||||||
|
handler block.
|
||||||
|
|
||||||
|
'''
|
||||||
|
child = spawn('pm_in_subactor')
|
||||||
|
|
||||||
|
# First entry is via manual `.post_mortem()`
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_crash_msg,
|
||||||
|
"<Task 'name_error'",
|
||||||
|
"NameError",
|
||||||
|
"('child'",
|
||||||
|
"tractor.post_mortem()",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
if ctlc:
|
||||||
|
do_ctlc(child)
|
||||||
|
child.sendline('c')
|
||||||
|
|
||||||
|
# 2nd is RPC crash handler
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_crash_msg,
|
||||||
|
"<Task 'name_error'",
|
||||||
|
"NameError",
|
||||||
|
"('child'",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
if ctlc:
|
||||||
|
do_ctlc(child)
|
||||||
|
child.sendline('c')
|
||||||
|
|
||||||
|
# 3rd is via RAE bubbled to root's parent ctx task and
|
||||||
|
# crash-handled via another manual pm call.
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_crash_msg,
|
||||||
|
"<Task '__main__.main'",
|
||||||
|
"('root'",
|
||||||
|
"NameError",
|
||||||
|
"tractor.post_mortem()",
|
||||||
|
"src_uid=('child'",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
if ctlc:
|
||||||
|
do_ctlc(child)
|
||||||
|
child.sendline('c')
|
||||||
|
|
||||||
|
# 4th and FINAL is via RAE bubbled to root's parent ctx task and
|
||||||
|
# crash-handled via another manual pm call.
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_crash_msg,
|
||||||
|
"<Task '__main__.main'",
|
||||||
|
"('root'",
|
||||||
|
"NameError",
|
||||||
|
"src_uid=('child'",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
if ctlc:
|
||||||
|
do_ctlc(child)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: ensure we're stopped and showing the right call stack frame
|
||||||
|
# -[ ] need a way to strip the terminal color chars in order to
|
||||||
|
# pattern match... see TODO around `assert_before()` above!
|
||||||
|
# child.sendline('w')
|
||||||
|
# child.expect(PROMPT)
|
||||||
|
# assert_before(
|
||||||
|
# child,
|
||||||
|
# [
|
||||||
|
# # error src block annot at ctx open
|
||||||
|
# '-> async with p.open_context(name_error) as (ctx, first):',
|
||||||
|
# ]
|
||||||
|
# )
|
||||||
|
|
||||||
|
# # step up a frame to ensure the it's the root's nursery
|
||||||
|
# child.sendline('u')
|
||||||
|
# child.expect(PROMPT)
|
||||||
|
# assert_before(
|
||||||
|
# child,
|
||||||
|
# [
|
||||||
|
# # handler block annotation
|
||||||
|
# '-> async with tractor.open_nursery(',
|
||||||
|
# ]
|
||||||
|
# )
|
||||||
|
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
|
def test_shield_pause(
|
||||||
|
spawn,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
||||||
|
already cancelled `trio.CancelScope` and that you can step to the
|
||||||
|
next checkpoint wherein the cancelled will get raised.
|
||||||
|
|
||||||
|
'''
|
||||||
|
child = spawn('shielded_pause')
|
||||||
|
|
||||||
|
# First entry is via manual `.post_mortem()`
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_pause_msg,
|
||||||
|
"cancellable_pause_loop'",
|
||||||
|
"('cancelled_before_pause'", # actor name
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# since 3 tries in ex. shield pause loop
|
||||||
|
for i in range(3):
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_pause_msg,
|
||||||
|
"INSIDE SHIELDED PAUSE",
|
||||||
|
"('cancelled_before_pause'", # actor name
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# back inside parent task that opened nursery
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_crash_msg,
|
||||||
|
"('cancelled_before_pause'", # actor name
|
||||||
|
"Failed to engage debugger via `_pause()`",
|
||||||
|
"trio.Cancelled",
|
||||||
|
"raise Cancelled._create()",
|
||||||
|
|
||||||
|
# we should be handling a taskc inside
|
||||||
|
# the first `.port_mortem()` sin-shield!
|
||||||
|
'await DebugStatus.req_finished.wait()',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# same as above but in the root actor's task
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(PROMPT)
|
||||||
|
assert_before(
|
||||||
|
child,
|
||||||
|
[
|
||||||
|
_crash_msg,
|
||||||
|
"('root'", # actor name
|
||||||
|
"Failed to engage debugger via `_pause()`",
|
||||||
|
"trio.Cancelled",
|
||||||
|
"raise Cancelled._create()",
|
||||||
|
|
||||||
|
# handling a taskc inside the first unshielded
|
||||||
|
# `.port_mortem()`.
|
||||||
|
# BUT in this case in the root-proc path ;)
|
||||||
|
'wait Lock._debug_lock.acquire()',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
||||||
def test_correct_frames_below_hidden():
|
def test_correct_frames_below_hidden():
|
||||||
'''
|
'''
|
||||||
Ensure that once a `tractor.pause()` enages, when the user
|
Ensure that once a `tractor.pause()` enages, when the user
|
||||||
|
@ -1138,4 +1322,11 @@ def test_correct_frames_below_hidden():
|
||||||
|
|
||||||
|
|
||||||
def test_cant_pause_from_paused_task():
|
def test_cant_pause_from_paused_task():
|
||||||
|
'''
|
||||||
|
Pausing from with an already paused task should raise an error.
|
||||||
|
|
||||||
|
Normally this should only happen in practise while debugging the call stack of `tractor.pause()` itself, likely
|
||||||
|
by a `.pause()` line somewhere inside our runtime.
|
||||||
|
|
||||||
|
'''
|
||||||
...
|
...
|
||||||
|
|
|
@ -46,7 +46,7 @@ maybe_msg_spec = PldMsg|None
|
||||||
async def maybe_expect_raises(
|
async def maybe_expect_raises(
|
||||||
raises: BaseException|None = None,
|
raises: BaseException|None = None,
|
||||||
ensure_in_message: list[str]|None = None,
|
ensure_in_message: list[str]|None = None,
|
||||||
reraise: bool = False,
|
post_mortem: bool = False,
|
||||||
timeout: int = 3,
|
timeout: int = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -86,8 +86,8 @@ async def maybe_expect_raises(
|
||||||
f'{inner_err.args}'
|
f'{inner_err.args}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if reraise:
|
if post_mortem:
|
||||||
raise inner_err
|
await tractor.post_mortem()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if raises:
|
if raises:
|
||||||
|
@ -314,6 +314,8 @@ def test_basic_payload_spec(
|
||||||
f"value: `{bad_value_str}` does not "
|
f"value: `{bad_value_str}` does not "
|
||||||
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
||||||
],
|
],
|
||||||
|
# only for debug
|
||||||
|
post_mortem=True,
|
||||||
),
|
),
|
||||||
p.open_context(
|
p.open_context(
|
||||||
child,
|
child,
|
||||||
|
|
|
@ -1190,6 +1190,7 @@ class Context:
|
||||||
self,
|
self,
|
||||||
remote_error: Exception,
|
remote_error: Exception,
|
||||||
|
|
||||||
|
from_src_exc: BaseException|None|bool = False,
|
||||||
raise_ctxc_from_self_call: bool = False,
|
raise_ctxc_from_self_call: bool = False,
|
||||||
raise_overrun_from_self: bool = True,
|
raise_overrun_from_self: bool = True,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
@ -1284,7 +1285,10 @@ class Context:
|
||||||
# runtime frames from the tb explicitly?
|
# runtime frames from the tb explicitly?
|
||||||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||||
# https://stackoverflow.com/a/24752607
|
# https://stackoverflow.com/a/24752607
|
||||||
raise remote_error # from None
|
if from_src_exc is not False:
|
||||||
|
raise remote_error from from_src_exc
|
||||||
|
|
||||||
|
raise remote_error
|
||||||
|
|
||||||
# TODO: change to `.wait_for_result()`?
|
# TODO: change to `.wait_for_result()`?
|
||||||
async def result(
|
async def result(
|
||||||
|
@ -2096,7 +2100,11 @@ async def open_context_from_portal(
|
||||||
# `._maybe_cancel_and_set_remote_error()` so ensure
|
# `._maybe_cancel_and_set_remote_error()` so ensure
|
||||||
# we raise the underlying `._remote_error` directly
|
# we raise the underlying `._remote_error` directly
|
||||||
# instead of bubbling that taskc.
|
# instead of bubbling that taskc.
|
||||||
ctx.maybe_raise()
|
ctx.maybe_raise(
|
||||||
|
# mask the above taskc from the tb
|
||||||
|
from_src_exc=None,
|
||||||
|
hide_tb=hide_tb,
|
||||||
|
)
|
||||||
|
|
||||||
# OW, some other unexpected cancel condition
|
# OW, some other unexpected cancel condition
|
||||||
# that should prolly never happen right?
|
# that should prolly never happen right?
|
||||||
|
@ -2108,13 +2116,14 @@ async def open_context_from_portal(
|
||||||
ctx._started_msg: bool = started_msg
|
ctx._started_msg: bool = started_msg
|
||||||
ctx._started_pld: bool = first
|
ctx._started_pld: bool = first
|
||||||
|
|
||||||
# deliver context instance and .started() msg value
|
# deliver context ref and `.started()` msg payload value
|
||||||
# in enter tuple.
|
# in `__aenter__` tuple.
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
||||||
# ??TODO??: do we still want to consider this or is
|
# ??TODO??: do we still want to consider this or is
|
||||||
# the `else:` block handling via a `.result()`
|
# the `else:` block handling via a `.result()`
|
||||||
# call below enough??
|
# call below enough??
|
||||||
|
#
|
||||||
# -[ ] pretty sure `.result()` internals do the
|
# -[ ] pretty sure `.result()` internals do the
|
||||||
# same as our ctxc handler below so it ended up
|
# same as our ctxc handler below so it ended up
|
||||||
# being same (repeated?) behaviour, but ideally we
|
# being same (repeated?) behaviour, but ideally we
|
||||||
|
@ -2123,33 +2132,13 @@ async def open_context_from_portal(
|
||||||
# that we can re-use it around the `yield` ^ here
|
# that we can re-use it around the `yield` ^ here
|
||||||
# or vice versa?
|
# or vice versa?
|
||||||
#
|
#
|
||||||
# NOTE: between the caller exiting and arriving
|
# maybe TODO NOTE: between the caller exiting and
|
||||||
# here the far end may have sent a ctxc-msg or
|
# arriving here the far end may have sent a ctxc-msg or
|
||||||
# other error, so check for it here immediately
|
# other error, so the quetion is whether we should check
|
||||||
# and maybe raise so as to engage the ctxc
|
# for it here immediately and maybe raise so as to engage
|
||||||
# handling block below!
|
# the ctxc handling block below ????
|
||||||
#
|
#
|
||||||
# if re := ctx._remote_error:
|
# self.maybe_raise()
|
||||||
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
|
|
||||||
# re,
|
|
||||||
# # TODO: do we want this to always raise?
|
|
||||||
# # - means that on self-ctxc, if/when the
|
|
||||||
# # block is exited before the msg arrives
|
|
||||||
# # but then the msg during __exit__
|
|
||||||
# # calling we may not activate the
|
|
||||||
# # ctxc-handler block below? should we
|
|
||||||
# # be?
|
|
||||||
# # - if there's a remote error that arrives
|
|
||||||
# # after the child has exited, we won't
|
|
||||||
# # handle until the `finally:` block
|
|
||||||
# # where `.result()` is always called,
|
|
||||||
# # again in which case we handle it
|
|
||||||
# # differently then in the handler block
|
|
||||||
# # that would normally engage from THIS
|
|
||||||
# # block?
|
|
||||||
# raise_ctxc_from_self_call=True,
|
|
||||||
# )
|
|
||||||
# ctxc_from_callee = maybe_ctxc
|
|
||||||
|
|
||||||
# when in allow_overruns mode there may be
|
# when in allow_overruns mode there may be
|
||||||
# lingering overflow sender tasks remaining?
|
# lingering overflow sender tasks remaining?
|
||||||
|
@ -2460,7 +2449,7 @@ async def open_context_from_portal(
|
||||||
#
|
#
|
||||||
# NOTE: further, this should be the only place the
|
# NOTE: further, this should be the only place the
|
||||||
# underlying feeder channel is
|
# underlying feeder channel is
|
||||||
# once-and-only-CLOSED!
|
# once-forever-and-only-CLOSED!
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await ctx._rx_chan.aclose()
|
await ctx._rx_chan.aclose()
|
||||||
|
|
||||||
|
|
|
@ -518,7 +518,6 @@ class RemoteActorError(Exception):
|
||||||
def pformat(
|
def pformat(
|
||||||
self,
|
self,
|
||||||
with_type_header: bool = True,
|
with_type_header: bool = True,
|
||||||
# with_ascii_box: bool = True,
|
|
||||||
|
|
||||||
) -> str:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
|
@ -885,9 +884,9 @@ class MsgTypeError(
|
||||||
extra_msgdata['_bad_msg'] = bad_msg
|
extra_msgdata['_bad_msg'] = bad_msg
|
||||||
extra_msgdata['cid'] = bad_msg.cid
|
extra_msgdata['cid'] = bad_msg.cid
|
||||||
|
|
||||||
|
extra_msgdata.setdefault('boxed_type', cls)
|
||||||
return cls(
|
return cls(
|
||||||
message=message,
|
message=message,
|
||||||
boxed_type=cls,
|
|
||||||
**extra_msgdata,
|
**extra_msgdata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1111,7 +1110,7 @@ def is_multi_cancelled(
|
||||||
def _raise_from_unexpected_msg(
|
def _raise_from_unexpected_msg(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
src_err: AttributeError,
|
src_err: Exception,
|
||||||
log: StackLevelAdapter, # caller specific `log` obj
|
log: StackLevelAdapter, # caller specific `log` obj
|
||||||
|
|
||||||
expect_msg: Type[MsgType],
|
expect_msg: Type[MsgType],
|
||||||
|
@ -1212,7 +1211,7 @@ def _raise_from_unexpected_msg(
|
||||||
# in case there already is some underlying remote error
|
# in case there already is some underlying remote error
|
||||||
# that arrived which is probably the source of this stream
|
# that arrived which is probably the source of this stream
|
||||||
# closure
|
# closure
|
||||||
ctx.maybe_raise()
|
ctx.maybe_raise(from_src_exc=src_err)
|
||||||
raise eoc from src_err
|
raise eoc from src_err
|
||||||
|
|
||||||
# TODO: our own transport/IPC-broke error subtype?
|
# TODO: our own transport/IPC-broke error subtype?
|
||||||
|
@ -1361,6 +1360,7 @@ def _mk_msg_type_err(
|
||||||
message=message,
|
message=message,
|
||||||
bad_msg=bad_msg,
|
bad_msg=bad_msg,
|
||||||
bad_msg_as_dict=msg_dict,
|
bad_msg_as_dict=msg_dict,
|
||||||
|
boxed_type=type(src_validation_error),
|
||||||
|
|
||||||
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
||||||
# - for the send-side `.started()` pld-validate
|
# - for the send-side `.started()` pld-validate
|
||||||
|
|
|
@ -80,7 +80,6 @@ from tractor.msg.types import (
|
||||||
Yield,
|
Yield,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
|
|
||||||
|
@ -328,7 +327,6 @@ async def _errors_relayed_via_ipc(
|
||||||
f'|_{ctx}'
|
f'|_{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# ALWAYS try to ship RPC errors back to parent/caller task
|
# ALWAYS try to ship RPC errors back to parent/caller task
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
|
|
||||||
|
@ -819,6 +817,12 @@ async def try_ship_error_to_remote(
|
||||||
# TODO: use `.msg.preetty_struct` for this!
|
# TODO: use `.msg.preetty_struct` for this!
|
||||||
f'{msg}\n'
|
f'{msg}\n'
|
||||||
)
|
)
|
||||||
|
except BaseException:
|
||||||
|
log.exception(
|
||||||
|
'Errored while attempting error shipment?'
|
||||||
|
)
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
async def process_messages(
|
async def process_messages(
|
||||||
|
|
|
@ -233,6 +233,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
# ctx: Context = self._ctx
|
# ctx: Context = self._ctx
|
||||||
ctx.maybe_raise(
|
ctx.maybe_raise(
|
||||||
raise_ctxc_from_self_call=True,
|
raise_ctxc_from_self_call=True,
|
||||||
|
from_src_exc=src_err,
|
||||||
)
|
)
|
||||||
|
|
||||||
# propagate any error but hide low-level frame details
|
# propagate any error but hide low-level frame details
|
||||||
|
|
|
@ -1600,25 +1600,27 @@ async def _pause(
|
||||||
f'REPL: {Lock.repl}\n'
|
f'REPL: {Lock.repl}\n'
|
||||||
# TODO: use `._frame_stack` scanner to find the @api_frame
|
# TODO: use `._frame_stack` scanner to find the @api_frame
|
||||||
)
|
)
|
||||||
await trio.lowlevel.checkpoint()
|
with trio.CancelScope(shield=shield):
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
return
|
return
|
||||||
|
|
||||||
# XXX: since we need to enter pdb synchronously below,
|
# XXX: since we need to enter pdb synchronously below,
|
||||||
# we have to release the lock manually from pdb completion
|
# we have to release the lock manually from pdb completion
|
||||||
# callbacks. Can't think of a nicer way then this atm.
|
# callbacks. Can't think of a nicer way then this atm.
|
||||||
if Lock._debug_lock.locked():
|
with trio.CancelScope(shield=shield):
|
||||||
log.warning(
|
if Lock._debug_lock.locked():
|
||||||
'attempting to shield-acquire active TTY lock owned by\n'
|
log.warning(
|
||||||
f'{ctx}'
|
'attempting to shield-acquire active TTY lock owned by\n'
|
||||||
)
|
f'{ctx}'
|
||||||
|
)
|
||||||
|
|
||||||
# must shield here to avoid hitting a ``Cancelled`` and
|
# must shield here to avoid hitting a ``Cancelled`` and
|
||||||
# a child getting stuck bc we clobbered the tty
|
# a child getting stuck bc we clobbered the tty
|
||||||
with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
|
await Lock._debug_lock.acquire()
|
||||||
|
else:
|
||||||
|
# may be cancelled
|
||||||
await Lock._debug_lock.acquire()
|
await Lock._debug_lock.acquire()
|
||||||
else:
|
|
||||||
# may be cancelled
|
|
||||||
await Lock._debug_lock.acquire()
|
|
||||||
|
|
||||||
# enter REPL from root, no TTY locking IPC ctx necessary
|
# enter REPL from root, no TTY locking IPC ctx necessary
|
||||||
_enter_repl_sync(debug_func)
|
_enter_repl_sync(debug_func)
|
||||||
|
@ -1659,7 +1661,8 @@ async def _pause(
|
||||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||||
f'ignoring..'
|
f'ignoring..'
|
||||||
)
|
)
|
||||||
await trio.lowlevel.checkpoint()
|
with trio.CancelScope(shield=shield):
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
return
|
return
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -1671,8 +1674,9 @@ async def _pause(
|
||||||
f'{task}@{actor.uid} already has TTY lock\n'
|
f'{task}@{actor.uid} already has TTY lock\n'
|
||||||
f'waiting for release..'
|
f'waiting for release..'
|
||||||
)
|
)
|
||||||
await DebugStatus.repl_release.wait()
|
with trio.CancelScope(shield=shield):
|
||||||
await trio.sleep(0.1)
|
await DebugStatus.repl_release.wait()
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
elif (
|
elif (
|
||||||
req_task
|
req_task
|
||||||
|
@ -1683,7 +1687,8 @@ async def _pause(
|
||||||
|
|
||||||
'Waiting for previous request to complete..\n'
|
'Waiting for previous request to complete..\n'
|
||||||
)
|
)
|
||||||
await DebugStatus.req_finished.wait()
|
with trio.CancelScope(shield=shield):
|
||||||
|
await DebugStatus.req_finished.wait()
|
||||||
|
|
||||||
# this **must** be awaited by the caller and is done using the
|
# this **must** be awaited by the caller and is done using the
|
||||||
# root nursery so that the debugger can continue to run without
|
# root nursery so that the debugger can continue to run without
|
||||||
|
@ -1721,14 +1726,15 @@ async def _pause(
|
||||||
'Starting request task\n'
|
'Starting request task\n'
|
||||||
f'|_{task}\n'
|
f'|_{task}\n'
|
||||||
)
|
)
|
||||||
req_ctx: Context = await actor._service_n.start(
|
with trio.CancelScope(shield=shield):
|
||||||
partial(
|
req_ctx: Context = await actor._service_n.start(
|
||||||
request_root_stdio_lock,
|
partial(
|
||||||
actor_uid=actor.uid,
|
request_root_stdio_lock,
|
||||||
task_uid=(task.name, id(task)), # task uuid (effectively)
|
actor_uid=actor.uid,
|
||||||
shield=shield,
|
task_uid=(task.name, id(task)), # task uuid (effectively)
|
||||||
|
shield=shield,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
# XXX sanity, our locker task should be the one which
|
# XXX sanity, our locker task should be the one which
|
||||||
# entered a new IPC ctx with the root actor, NOT the one
|
# entered a new IPC ctx with the root actor, NOT the one
|
||||||
# that exists around the task calling into `._pause()`.
|
# that exists around the task calling into `._pause()`.
|
||||||
|
@ -2147,6 +2153,13 @@ async def post_mortem(
|
||||||
**_pause_kwargs,
|
**_pause_kwargs,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
`tractor`'s builtin async equivalient of `pdb.post_mortem()`
|
||||||
|
which can be used inside exception handlers.
|
||||||
|
|
||||||
|
It's also used for the crash handler when `debug_mode == True` ;)
|
||||||
|
|
||||||
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
tb: TracebackType = tb or sys.exc_info()[2]
|
tb: TracebackType = tb or sys.exc_info()[2]
|
||||||
|
|
|
@ -167,7 +167,7 @@ class PldRx(Struct):
|
||||||
ipc_msg: MsgType|None = None,
|
ipc_msg: MsgType|None = None,
|
||||||
expect_msg: Type[MsgType]|None = None,
|
expect_msg: Type[MsgType]|None = None,
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
**dec_msg_kwargs,
|
**dec_pld_kwargs,
|
||||||
|
|
||||||
) -> Any|Raw:
|
) -> Any|Raw:
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
@ -179,12 +179,12 @@ class PldRx(Struct):
|
||||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||||
ipc._rx_chan.receive_nowait()
|
ipc._rx_chan.receive_nowait()
|
||||||
)
|
)
|
||||||
return self.dec_msg(
|
return self.decode_pld(
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
**dec_msg_kwargs,
|
**dec_pld_kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def recv_pld(
|
async def recv_pld(
|
||||||
|
@ -194,7 +194,7 @@ class PldRx(Struct):
|
||||||
expect_msg: Type[MsgType]|None = None,
|
expect_msg: Type[MsgType]|None = None,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
**dec_msg_kwargs,
|
**dec_pld_kwargs,
|
||||||
|
|
||||||
) -> Any|Raw:
|
) -> Any|Raw:
|
||||||
'''
|
'''
|
||||||
|
@ -208,17 +208,14 @@ class PldRx(Struct):
|
||||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
await ipc._rx_chan.receive()
|
await ipc._rx_chan.receive()
|
||||||
)
|
)
|
||||||
return self.dec_msg(
|
return self.decode_pld(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
**dec_msg_kwargs,
|
**dec_pld_kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: rename to,
|
def decode_pld(
|
||||||
# -[ ] `.decode_pld()`?
|
|
||||||
# -[ ] `.dec_pld()`?
|
|
||||||
def dec_msg(
|
|
||||||
self,
|
self,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
ipc: Context|MsgStream,
|
ipc: Context|MsgStream,
|
||||||
|
@ -299,9 +296,6 @@ class PldRx(Struct):
|
||||||
if not is_started_send_side
|
if not is_started_send_side
|
||||||
else ipc._actor.uid
|
else ipc._actor.uid
|
||||||
),
|
),
|
||||||
# tb=valerr.__traceback__,
|
|
||||||
# tb_str=mte._message,
|
|
||||||
# message=mte._message,
|
|
||||||
)
|
)
|
||||||
mte._ipc_msg = err_msg
|
mte._ipc_msg = err_msg
|
||||||
|
|
||||||
|
@ -317,29 +311,6 @@ class PldRx(Struct):
|
||||||
# validation error.
|
# validation error.
|
||||||
src_err = valerr
|
src_err = valerr
|
||||||
|
|
||||||
# TODO: should we instead make this explicit and
|
|
||||||
# use the above masked `is_started_send_decode`,
|
|
||||||
# expecting the `Context.started()` caller to set
|
|
||||||
# it? Rn this is kinda, howyousayyy, implicitly
|
|
||||||
# edge-case-y..
|
|
||||||
# TODO: remove this since it's been added to
|
|
||||||
# `_raise_from_unexpected_msg()`..?
|
|
||||||
# if (
|
|
||||||
# expect_msg is not Started
|
|
||||||
# and not is_started_send_side
|
|
||||||
# ):
|
|
||||||
# # set emulated remote error more-or-less as the
|
|
||||||
# # runtime would
|
|
||||||
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
|
||||||
# ctx._maybe_cancel_and_set_remote_error(mte)
|
|
||||||
|
|
||||||
# XXX some other decoder specific failure?
|
|
||||||
# except TypeError as src_error:
|
|
||||||
# from .devx import mk_pdb
|
|
||||||
# mk_pdb().set_trace()
|
|
||||||
# raise src_error
|
|
||||||
# ^-TODO-^ can remove?
|
|
||||||
|
|
||||||
# a runtime-internal RPC endpoint response.
|
# a runtime-internal RPC endpoint response.
|
||||||
# always passthrough since (internal) runtime
|
# always passthrough since (internal) runtime
|
||||||
# responses are generally never exposed to consumer
|
# responses are generally never exposed to consumer
|
||||||
|
@ -435,6 +406,8 @@ class PldRx(Struct):
|
||||||
__tracebackhide__: bool = False
|
__tracebackhide__: bool = False
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
dec_msg = decode_pld
|
||||||
|
|
||||||
async def recv_msg_w_pld(
|
async def recv_msg_w_pld(
|
||||||
self,
|
self,
|
||||||
ipc: Context|MsgStream,
|
ipc: Context|MsgStream,
|
||||||
|
@ -463,7 +436,7 @@ class PldRx(Struct):
|
||||||
# TODO: is there some way we can inject the decoded
|
# TODO: is there some way we can inject the decoded
|
||||||
# payload into an existing output buffer for the original
|
# payload into an existing output buffer for the original
|
||||||
# msg instance?
|
# msg instance?
|
||||||
pld: PayloadT = self.dec_msg(
|
pld: PayloadT = self.decode_pld(
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
|
@ -610,7 +583,10 @@ async def drain_to_final_msg(
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
# cancellation.
|
# cancellation.
|
||||||
ctx.maybe_raise()
|
ctx.maybe_raise(
|
||||||
|
# TODO: when use this/
|
||||||
|
# from_src_exc=taskc,
|
||||||
|
)
|
||||||
|
|
||||||
# CASE 1: we DID request the cancel we simply
|
# CASE 1: we DID request the cancel we simply
|
||||||
# continue to bubble up as normal.
|
# continue to bubble up as normal.
|
||||||
|
@ -783,7 +759,7 @@ def validate_payload_msg(
|
||||||
try:
|
try:
|
||||||
roundtripped: Started = codec.decode(msg_bytes)
|
roundtripped: Started = codec.decode(msg_bytes)
|
||||||
ctx: Context = getattr(ipc, 'ctx', ipc)
|
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
pld: PayloadT = ctx.pld_rx.dec_msg(
|
pld: PayloadT = ctx.pld_rx.decode_pld(
|
||||||
msg=roundtripped,
|
msg=roundtripped,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=Started,
|
expect_msg=Started,
|
||||||
|
|
Loading…
Reference in New Issue