Compare commits
No commits in common. "8ea0f08386ec62721581a792156f62d124b0d2aa" and "0e8c60ee4aab56c4668f192b541bd7804256e6f1" have entirely different histories.
8ea0f08386
...
0e8c60ee4a
|
@ -1,56 +0,0 @@
|
||||||
import trio
|
|
||||||
import tractor
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def name_error(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Raise a `NameError`, catch it and enter `.post_mortem()`, then
|
|
||||||
expect the `._rpc._invoke()` crash handler to also engage.
|
|
||||||
|
|
||||||
'''
|
|
||||||
try:
|
|
||||||
getattr(doggypants) # noqa (on purpose)
|
|
||||||
except NameError:
|
|
||||||
await tractor.post_mortem()
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
'''
|
|
||||||
Test 3 `PdbREPL` entries:
|
|
||||||
- one in the child due to manual `.post_mortem()`,
|
|
||||||
- another in the child due to runtime RPC crash handling.
|
|
||||||
- final one here in parent from the RAE.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# XXX NOTE: ideally the REPL arrives at this frame in the parent
|
|
||||||
# ONE UP FROM the inner ctx block below!
|
|
||||||
async with tractor.open_nursery(
|
|
||||||
debug_mode=True,
|
|
||||||
# loglevel='cancel',
|
|
||||||
) as an:
|
|
||||||
p: tractor.Portal = await an.start_actor(
|
|
||||||
'child',
|
|
||||||
enable_modules=[__name__],
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX should raise `RemoteActorError[NameError]`
|
|
||||||
# AND be the active frame when REPL enters!
|
|
||||||
try:
|
|
||||||
async with p.open_context(name_error) as (ctx, first):
|
|
||||||
assert first
|
|
||||||
except tractor.RemoteActorError as rae:
|
|
||||||
assert rae.boxed_type is NameError
|
|
||||||
|
|
||||||
# manually handle in root's parent task
|
|
||||||
await tractor.post_mortem()
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
raise RuntimeError('IPC ctx should have remote errored!?')
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
trio.run(main)
|
|
|
@ -1,88 +0,0 @@
|
||||||
import trio
|
|
||||||
import tractor
|
|
||||||
|
|
||||||
|
|
||||||
async def cancellable_pause_loop(
|
|
||||||
task_status: trio.TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
|
|
||||||
):
|
|
||||||
with trio.CancelScope() as cs:
|
|
||||||
task_status.started(cs)
|
|
||||||
for _ in range(3):
|
|
||||||
try:
|
|
||||||
# ON first entry, there is no level triggered
|
|
||||||
# cancellation yet, so this cp does a parent task
|
|
||||||
# ctx-switch so that this scope raises for the NEXT
|
|
||||||
# checkpoint we hit.
|
|
||||||
await trio.lowlevel.checkpoint()
|
|
||||||
await tractor.pause()
|
|
||||||
|
|
||||||
cs.cancel()
|
|
||||||
|
|
||||||
# parent should have called `cs.cancel()` by now
|
|
||||||
await trio.lowlevel.checkpoint()
|
|
||||||
|
|
||||||
except trio.Cancelled:
|
|
||||||
print('INSIDE SHIELDED PAUSE')
|
|
||||||
await tractor.pause(shield=True)
|
|
||||||
else:
|
|
||||||
# should raise it again, bubbling up to parent
|
|
||||||
print('BUBBLING trio.Cancelled to parent task-nursery')
|
|
||||||
await trio.lowlevel.checkpoint()
|
|
||||||
|
|
||||||
|
|
||||||
async def pm_on_cancelled():
|
|
||||||
async with trio.open_nursery() as tn:
|
|
||||||
tn.cancel_scope.cancel()
|
|
||||||
try:
|
|
||||||
await trio.sleep_forever()
|
|
||||||
except trio.Cancelled:
|
|
||||||
# should also raise `Cancelled` since
|
|
||||||
# we didn't pass `shield=True`.
|
|
||||||
try:
|
|
||||||
await tractor.post_mortem(hide_tb=False)
|
|
||||||
except trio.Cancelled as taskc:
|
|
||||||
|
|
||||||
# should enter just fine, in fact it should
|
|
||||||
# be debugging the internals of the previous
|
|
||||||
# sin-shield call above Bo
|
|
||||||
await tractor.post_mortem(
|
|
||||||
hide_tb=False,
|
|
||||||
shield=True,
|
|
||||||
)
|
|
||||||
raise taskc
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise RuntimeError('Dint cancel as expected!?')
|
|
||||||
|
|
||||||
|
|
||||||
async def cancelled_before_pause(
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Verify that using a shielded pause works despite surrounding
|
|
||||||
cancellation called state in the calling task.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async with trio.open_nursery() as tn:
|
|
||||||
cs: trio.CancelScope = await tn.start(cancellable_pause_loop)
|
|
||||||
await trio.sleep(0.1)
|
|
||||||
|
|
||||||
assert cs.cancelled_caught
|
|
||||||
|
|
||||||
await pm_on_cancelled()
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
async with tractor.open_nursery(
|
|
||||||
debug_mode=True,
|
|
||||||
) as n:
|
|
||||||
portal: tractor.Portal = await n.run_in_actor(
|
|
||||||
cancelled_before_pause,
|
|
||||||
)
|
|
||||||
await portal.result()
|
|
||||||
|
|
||||||
# ensure the same works in the root actor!
|
|
||||||
await pm_on_cancelled()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
trio.run(main)
|
|
|
@ -161,10 +161,6 @@ def in_prompt_msg(
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
# TODO: todo support terminal color-chars stripping so we can match
|
|
||||||
# against call stack frame output from the the 'll' command the like!
|
|
||||||
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
|
|
||||||
def assert_before(
|
def assert_before(
|
||||||
child,
|
child,
|
||||||
patts: list[str],
|
patts: list[str],
|
||||||
|
@ -1129,187 +1125,7 @@ def test_pause_from_sync(
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
def test_post_mortem_api(
|
# TODO!
|
||||||
spawn,
|
|
||||||
ctlc: bool,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Verify the `tractor.post_mortem()` API works in an exception
|
|
||||||
handler block.
|
|
||||||
|
|
||||||
'''
|
|
||||||
child = spawn('pm_in_subactor')
|
|
||||||
|
|
||||||
# First entry is via manual `.post_mortem()`
|
|
||||||
child.expect(PROMPT)
|
|
||||||
assert_before(
|
|
||||||
child,
|
|
||||||
[
|
|
||||||
_crash_msg,
|
|
||||||
"<Task 'name_error'",
|
|
||||||
"NameError",
|
|
||||||
"('child'",
|
|
||||||
"tractor.post_mortem()",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
if ctlc:
|
|
||||||
do_ctlc(child)
|
|
||||||
child.sendline('c')
|
|
||||||
|
|
||||||
# 2nd is RPC crash handler
|
|
||||||
child.expect(PROMPT)
|
|
||||||
assert_before(
|
|
||||||
child,
|
|
||||||
[
|
|
||||||
_crash_msg,
|
|
||||||
"<Task 'name_error'",
|
|
||||||
"NameError",
|
|
||||||
"('child'",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
if ctlc:
|
|
||||||
do_ctlc(child)
|
|
||||||
child.sendline('c')
|
|
||||||
|
|
||||||
# 3rd is via RAE bubbled to root's parent ctx task and
|
|
||||||
# crash-handled via another manual pm call.
|
|
||||||
child.expect(PROMPT)
|
|
||||||
assert_before(
|
|
||||||
child,
|
|
||||||
[
|
|
||||||
_crash_msg,
|
|
||||||
"<Task '__main__.main'",
|
|
||||||
"('root'",
|
|
||||||
"NameError",
|
|
||||||
"tractor.post_mortem()",
|
|
||||||
"src_uid=('child'",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
if ctlc:
|
|
||||||
do_ctlc(child)
|
|
||||||
child.sendline('c')
|
|
||||||
|
|
||||||
# 4th and FINAL is via RAE bubbled to root's parent ctx task and
|
|
||||||
# crash-handled via another manual pm call.
|
|
||||||
child.expect(PROMPT)
|
|
||||||
assert_before(
|
|
||||||
child,
|
|
||||||
[
|
|
||||||
_crash_msg,
|
|
||||||
"<Task '__main__.main'",
|
|
||||||
"('root'",
|
|
||||||
"NameError",
|
|
||||||
"src_uid=('child'",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
if ctlc:
|
|
||||||
do_ctlc(child)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: ensure we're stopped and showing the right call stack frame
|
|
||||||
# -[ ] need a way to strip the terminal color chars in order to
|
|
||||||
# pattern match... see TODO around `assert_before()` above!
|
|
||||||
# child.sendline('w')
|
|
||||||
# child.expect(PROMPT)
|
|
||||||
# assert_before(
|
|
||||||
# child,
|
|
||||||
# [
|
|
||||||
# # error src block annot at ctx open
|
|
||||||
# '-> async with p.open_context(name_error) as (ctx, first):',
|
|
||||||
# ]
|
|
||||||
# )
|
|
||||||
|
|
||||||
# # step up a frame to ensure the it's the root's nursery
|
|
||||||
# child.sendline('u')
|
|
||||||
# child.expect(PROMPT)
|
|
||||||
# assert_before(
|
|
||||||
# child,
|
|
||||||
# [
|
|
||||||
# # handler block annotation
|
|
||||||
# '-> async with tractor.open_nursery(',
|
|
||||||
# ]
|
|
||||||
# )
|
|
||||||
|
|
||||||
child.sendline('c')
|
|
||||||
child.expect(pexpect.EOF)
|
|
||||||
|
|
||||||
|
|
||||||
def test_shield_pause(
|
|
||||||
spawn,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
|
||||||
already cancelled `trio.CancelScope` and that you can step to the
|
|
||||||
next checkpoint wherein the cancelled will get raised.
|
|
||||||
|
|
||||||
'''
|
|
||||||
child = spawn('shielded_pause')
|
|
||||||
|
|
||||||
# First entry is via manual `.post_mortem()`
|
|
||||||
child.expect(PROMPT)
|
|
||||||
assert_before(
|
|
||||||
child,
|
|
||||||
[
|
|
||||||
_pause_msg,
|
|
||||||
"cancellable_pause_loop'",
|
|
||||||
"('cancelled_before_pause'", # actor name
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# since 3 tries in ex. shield pause loop
|
|
||||||
for i in range(3):
|
|
||||||
child.sendline('c')
|
|
||||||
child.expect(PROMPT)
|
|
||||||
assert_before(
|
|
||||||
child,
|
|
||||||
[
|
|
||||||
_pause_msg,
|
|
||||||
"INSIDE SHIELDED PAUSE",
|
|
||||||
"('cancelled_before_pause'", # actor name
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# back inside parent task that opened nursery
|
|
||||||
child.sendline('c')
|
|
||||||
child.expect(PROMPT)
|
|
||||||
assert_before(
|
|
||||||
child,
|
|
||||||
[
|
|
||||||
_crash_msg,
|
|
||||||
"('cancelled_before_pause'", # actor name
|
|
||||||
"Failed to engage debugger via `_pause()`",
|
|
||||||
"trio.Cancelled",
|
|
||||||
"raise Cancelled._create()",
|
|
||||||
|
|
||||||
# we should be handling a taskc inside
|
|
||||||
# the first `.port_mortem()` sin-shield!
|
|
||||||
'await DebugStatus.req_finished.wait()',
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# same as above but in the root actor's task
|
|
||||||
child.sendline('c')
|
|
||||||
child.expect(PROMPT)
|
|
||||||
assert_before(
|
|
||||||
child,
|
|
||||||
[
|
|
||||||
_crash_msg,
|
|
||||||
"('root'", # actor name
|
|
||||||
"Failed to engage debugger via `_pause()`",
|
|
||||||
"trio.Cancelled",
|
|
||||||
"raise Cancelled._create()",
|
|
||||||
|
|
||||||
# handling a taskc inside the first unshielded
|
|
||||||
# `.port_mortem()`.
|
|
||||||
# BUT in this case in the root-proc path ;)
|
|
||||||
'wait Lock._debug_lock.acquire()',
|
|
||||||
]
|
|
||||||
)
|
|
||||||
child.sendline('c')
|
|
||||||
child.expect(pexpect.EOF)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
|
||||||
def test_correct_frames_below_hidden():
|
def test_correct_frames_below_hidden():
|
||||||
'''
|
'''
|
||||||
Ensure that once a `tractor.pause()` enages, when the user
|
Ensure that once a `tractor.pause()` enages, when the user
|
||||||
|
@ -1322,11 +1138,4 @@ def test_correct_frames_below_hidden():
|
||||||
|
|
||||||
|
|
||||||
def test_cant_pause_from_paused_task():
|
def test_cant_pause_from_paused_task():
|
||||||
'''
|
|
||||||
Pausing from with an already paused task should raise an error.
|
|
||||||
|
|
||||||
Normally this should only happen in practise while debugging the call stack of `tractor.pause()` itself, likely
|
|
||||||
by a `.pause()` line somewhere inside our runtime.
|
|
||||||
|
|
||||||
'''
|
|
||||||
...
|
...
|
||||||
|
|
|
@ -46,7 +46,7 @@ maybe_msg_spec = PldMsg|None
|
||||||
async def maybe_expect_raises(
|
async def maybe_expect_raises(
|
||||||
raises: BaseException|None = None,
|
raises: BaseException|None = None,
|
||||||
ensure_in_message: list[str]|None = None,
|
ensure_in_message: list[str]|None = None,
|
||||||
post_mortem: bool = False,
|
reraise: bool = False,
|
||||||
timeout: int = 3,
|
timeout: int = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -86,8 +86,8 @@ async def maybe_expect_raises(
|
||||||
f'{inner_err.args}'
|
f'{inner_err.args}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if post_mortem:
|
if reraise:
|
||||||
await tractor.post_mortem()
|
raise inner_err
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if raises:
|
if raises:
|
||||||
|
@ -314,8 +314,6 @@ def test_basic_payload_spec(
|
||||||
f"value: `{bad_value_str}` does not "
|
f"value: `{bad_value_str}` does not "
|
||||||
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
||||||
],
|
],
|
||||||
# only for debug
|
|
||||||
post_mortem=True,
|
|
||||||
),
|
),
|
||||||
p.open_context(
|
p.open_context(
|
||||||
child,
|
child,
|
||||||
|
|
|
@ -1190,7 +1190,6 @@ class Context:
|
||||||
self,
|
self,
|
||||||
remote_error: Exception,
|
remote_error: Exception,
|
||||||
|
|
||||||
from_src_exc: BaseException|None|bool = False,
|
|
||||||
raise_ctxc_from_self_call: bool = False,
|
raise_ctxc_from_self_call: bool = False,
|
||||||
raise_overrun_from_self: bool = True,
|
raise_overrun_from_self: bool = True,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
@ -1285,10 +1284,7 @@ class Context:
|
||||||
# runtime frames from the tb explicitly?
|
# runtime frames from the tb explicitly?
|
||||||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||||
# https://stackoverflow.com/a/24752607
|
# https://stackoverflow.com/a/24752607
|
||||||
if from_src_exc is not False:
|
raise remote_error # from None
|
||||||
raise remote_error from from_src_exc
|
|
||||||
|
|
||||||
raise remote_error
|
|
||||||
|
|
||||||
# TODO: change to `.wait_for_result()`?
|
# TODO: change to `.wait_for_result()`?
|
||||||
async def result(
|
async def result(
|
||||||
|
@ -2100,11 +2096,7 @@ async def open_context_from_portal(
|
||||||
# `._maybe_cancel_and_set_remote_error()` so ensure
|
# `._maybe_cancel_and_set_remote_error()` so ensure
|
||||||
# we raise the underlying `._remote_error` directly
|
# we raise the underlying `._remote_error` directly
|
||||||
# instead of bubbling that taskc.
|
# instead of bubbling that taskc.
|
||||||
ctx.maybe_raise(
|
ctx.maybe_raise()
|
||||||
# mask the above taskc from the tb
|
|
||||||
from_src_exc=None,
|
|
||||||
hide_tb=hide_tb,
|
|
||||||
)
|
|
||||||
|
|
||||||
# OW, some other unexpected cancel condition
|
# OW, some other unexpected cancel condition
|
||||||
# that should prolly never happen right?
|
# that should prolly never happen right?
|
||||||
|
@ -2116,14 +2108,13 @@ async def open_context_from_portal(
|
||||||
ctx._started_msg: bool = started_msg
|
ctx._started_msg: bool = started_msg
|
||||||
ctx._started_pld: bool = first
|
ctx._started_pld: bool = first
|
||||||
|
|
||||||
# deliver context ref and `.started()` msg payload value
|
# deliver context instance and .started() msg value
|
||||||
# in `__aenter__` tuple.
|
# in enter tuple.
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
||||||
# ??TODO??: do we still want to consider this or is
|
# ??TODO??: do we still want to consider this or is
|
||||||
# the `else:` block handling via a `.result()`
|
# the `else:` block handling via a `.result()`
|
||||||
# call below enough??
|
# call below enough??
|
||||||
#
|
|
||||||
# -[ ] pretty sure `.result()` internals do the
|
# -[ ] pretty sure `.result()` internals do the
|
||||||
# same as our ctxc handler below so it ended up
|
# same as our ctxc handler below so it ended up
|
||||||
# being same (repeated?) behaviour, but ideally we
|
# being same (repeated?) behaviour, but ideally we
|
||||||
|
@ -2132,13 +2123,33 @@ async def open_context_from_portal(
|
||||||
# that we can re-use it around the `yield` ^ here
|
# that we can re-use it around the `yield` ^ here
|
||||||
# or vice versa?
|
# or vice versa?
|
||||||
#
|
#
|
||||||
# maybe TODO NOTE: between the caller exiting and
|
# NOTE: between the caller exiting and arriving
|
||||||
# arriving here the far end may have sent a ctxc-msg or
|
# here the far end may have sent a ctxc-msg or
|
||||||
# other error, so the quetion is whether we should check
|
# other error, so check for it here immediately
|
||||||
# for it here immediately and maybe raise so as to engage
|
# and maybe raise so as to engage the ctxc
|
||||||
# the ctxc handling block below ????
|
# handling block below!
|
||||||
#
|
#
|
||||||
# self.maybe_raise()
|
# if re := ctx._remote_error:
|
||||||
|
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
|
||||||
|
# re,
|
||||||
|
# # TODO: do we want this to always raise?
|
||||||
|
# # - means that on self-ctxc, if/when the
|
||||||
|
# # block is exited before the msg arrives
|
||||||
|
# # but then the msg during __exit__
|
||||||
|
# # calling we may not activate the
|
||||||
|
# # ctxc-handler block below? should we
|
||||||
|
# # be?
|
||||||
|
# # - if there's a remote error that arrives
|
||||||
|
# # after the child has exited, we won't
|
||||||
|
# # handle until the `finally:` block
|
||||||
|
# # where `.result()` is always called,
|
||||||
|
# # again in which case we handle it
|
||||||
|
# # differently then in the handler block
|
||||||
|
# # that would normally engage from THIS
|
||||||
|
# # block?
|
||||||
|
# raise_ctxc_from_self_call=True,
|
||||||
|
# )
|
||||||
|
# ctxc_from_callee = maybe_ctxc
|
||||||
|
|
||||||
# when in allow_overruns mode there may be
|
# when in allow_overruns mode there may be
|
||||||
# lingering overflow sender tasks remaining?
|
# lingering overflow sender tasks remaining?
|
||||||
|
@ -2449,7 +2460,7 @@ async def open_context_from_portal(
|
||||||
#
|
#
|
||||||
# NOTE: further, this should be the only place the
|
# NOTE: further, this should be the only place the
|
||||||
# underlying feeder channel is
|
# underlying feeder channel is
|
||||||
# once-forever-and-only-CLOSED!
|
# once-and-only-CLOSED!
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await ctx._rx_chan.aclose()
|
await ctx._rx_chan.aclose()
|
||||||
|
|
||||||
|
|
|
@ -518,6 +518,7 @@ class RemoteActorError(Exception):
|
||||||
def pformat(
|
def pformat(
|
||||||
self,
|
self,
|
||||||
with_type_header: bool = True,
|
with_type_header: bool = True,
|
||||||
|
# with_ascii_box: bool = True,
|
||||||
|
|
||||||
) -> str:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
|
@ -884,9 +885,9 @@ class MsgTypeError(
|
||||||
extra_msgdata['_bad_msg'] = bad_msg
|
extra_msgdata['_bad_msg'] = bad_msg
|
||||||
extra_msgdata['cid'] = bad_msg.cid
|
extra_msgdata['cid'] = bad_msg.cid
|
||||||
|
|
||||||
extra_msgdata.setdefault('boxed_type', cls)
|
|
||||||
return cls(
|
return cls(
|
||||||
message=message,
|
message=message,
|
||||||
|
boxed_type=cls,
|
||||||
**extra_msgdata,
|
**extra_msgdata,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1110,7 +1111,7 @@ def is_multi_cancelled(
|
||||||
def _raise_from_unexpected_msg(
|
def _raise_from_unexpected_msg(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
src_err: Exception,
|
src_err: AttributeError,
|
||||||
log: StackLevelAdapter, # caller specific `log` obj
|
log: StackLevelAdapter, # caller specific `log` obj
|
||||||
|
|
||||||
expect_msg: Type[MsgType],
|
expect_msg: Type[MsgType],
|
||||||
|
@ -1211,7 +1212,7 @@ def _raise_from_unexpected_msg(
|
||||||
# in case there already is some underlying remote error
|
# in case there already is some underlying remote error
|
||||||
# that arrived which is probably the source of this stream
|
# that arrived which is probably the source of this stream
|
||||||
# closure
|
# closure
|
||||||
ctx.maybe_raise(from_src_exc=src_err)
|
ctx.maybe_raise()
|
||||||
raise eoc from src_err
|
raise eoc from src_err
|
||||||
|
|
||||||
# TODO: our own transport/IPC-broke error subtype?
|
# TODO: our own transport/IPC-broke error subtype?
|
||||||
|
@ -1360,7 +1361,6 @@ def _mk_msg_type_err(
|
||||||
message=message,
|
message=message,
|
||||||
bad_msg=bad_msg,
|
bad_msg=bad_msg,
|
||||||
bad_msg_as_dict=msg_dict,
|
bad_msg_as_dict=msg_dict,
|
||||||
boxed_type=type(src_validation_error),
|
|
||||||
|
|
||||||
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
||||||
# - for the send-side `.started()` pld-validate
|
# - for the send-side `.started()` pld-validate
|
||||||
|
|
|
@ -80,6 +80,7 @@ from tractor.msg.types import (
|
||||||
Yield,
|
Yield,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
|
|
||||||
|
@ -327,6 +328,7 @@ async def _errors_relayed_via_ipc(
|
||||||
f'|_{ctx}'
|
f'|_{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# ALWAYS try to ship RPC errors back to parent/caller task
|
# ALWAYS try to ship RPC errors back to parent/caller task
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
|
|
||||||
|
@ -817,12 +819,6 @@ async def try_ship_error_to_remote(
|
||||||
# TODO: use `.msg.preetty_struct` for this!
|
# TODO: use `.msg.preetty_struct` for this!
|
||||||
f'{msg}\n'
|
f'{msg}\n'
|
||||||
)
|
)
|
||||||
except BaseException:
|
|
||||||
log.exception(
|
|
||||||
'Errored while attempting error shipment?'
|
|
||||||
)
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
async def process_messages(
|
async def process_messages(
|
||||||
|
|
|
@ -233,7 +233,6 @@ class MsgStream(trio.abc.Channel):
|
||||||
# ctx: Context = self._ctx
|
# ctx: Context = self._ctx
|
||||||
ctx.maybe_raise(
|
ctx.maybe_raise(
|
||||||
raise_ctxc_from_self_call=True,
|
raise_ctxc_from_self_call=True,
|
||||||
from_src_exc=src_err,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# propagate any error but hide low-level frame details
|
# propagate any error but hide low-level frame details
|
||||||
|
|
|
@ -1600,14 +1600,12 @@ async def _pause(
|
||||||
f'REPL: {Lock.repl}\n'
|
f'REPL: {Lock.repl}\n'
|
||||||
# TODO: use `._frame_stack` scanner to find the @api_frame
|
# TODO: use `._frame_stack` scanner to find the @api_frame
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=shield):
|
|
||||||
await trio.lowlevel.checkpoint()
|
await trio.lowlevel.checkpoint()
|
||||||
return
|
return
|
||||||
|
|
||||||
# XXX: since we need to enter pdb synchronously below,
|
# XXX: since we need to enter pdb synchronously below,
|
||||||
# we have to release the lock manually from pdb completion
|
# we have to release the lock manually from pdb completion
|
||||||
# callbacks. Can't think of a nicer way then this atm.
|
# callbacks. Can't think of a nicer way then this atm.
|
||||||
with trio.CancelScope(shield=shield):
|
|
||||||
if Lock._debug_lock.locked():
|
if Lock._debug_lock.locked():
|
||||||
log.warning(
|
log.warning(
|
||||||
'attempting to shield-acquire active TTY lock owned by\n'
|
'attempting to shield-acquire active TTY lock owned by\n'
|
||||||
|
@ -1616,7 +1614,7 @@ async def _pause(
|
||||||
|
|
||||||
# must shield here to avoid hitting a ``Cancelled`` and
|
# must shield here to avoid hitting a ``Cancelled`` and
|
||||||
# a child getting stuck bc we clobbered the tty
|
# a child getting stuck bc we clobbered the tty
|
||||||
# with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await Lock._debug_lock.acquire()
|
await Lock._debug_lock.acquire()
|
||||||
else:
|
else:
|
||||||
# may be cancelled
|
# may be cancelled
|
||||||
|
@ -1661,7 +1659,6 @@ async def _pause(
|
||||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||||
f'ignoring..'
|
f'ignoring..'
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=shield):
|
|
||||||
await trio.lowlevel.checkpoint()
|
await trio.lowlevel.checkpoint()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -1674,7 +1671,6 @@ async def _pause(
|
||||||
f'{task}@{actor.uid} already has TTY lock\n'
|
f'{task}@{actor.uid} already has TTY lock\n'
|
||||||
f'waiting for release..'
|
f'waiting for release..'
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=shield):
|
|
||||||
await DebugStatus.repl_release.wait()
|
await DebugStatus.repl_release.wait()
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
@ -1687,7 +1683,6 @@ async def _pause(
|
||||||
|
|
||||||
'Waiting for previous request to complete..\n'
|
'Waiting for previous request to complete..\n'
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=shield):
|
|
||||||
await DebugStatus.req_finished.wait()
|
await DebugStatus.req_finished.wait()
|
||||||
|
|
||||||
# this **must** be awaited by the caller and is done using the
|
# this **must** be awaited by the caller and is done using the
|
||||||
|
@ -1726,7 +1721,6 @@ async def _pause(
|
||||||
'Starting request task\n'
|
'Starting request task\n'
|
||||||
f'|_{task}\n'
|
f'|_{task}\n'
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=shield):
|
|
||||||
req_ctx: Context = await actor._service_n.start(
|
req_ctx: Context = await actor._service_n.start(
|
||||||
partial(
|
partial(
|
||||||
request_root_stdio_lock,
|
request_root_stdio_lock,
|
||||||
|
@ -2153,13 +2147,6 @@ async def post_mortem(
|
||||||
**_pause_kwargs,
|
**_pause_kwargs,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
|
||||||
`tractor`'s builtin async equivalient of `pdb.post_mortem()`
|
|
||||||
which can be used inside exception handlers.
|
|
||||||
|
|
||||||
It's also used for the crash handler when `debug_mode == True` ;)
|
|
||||||
|
|
||||||
'''
|
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
tb: TracebackType = tb or sys.exc_info()[2]
|
tb: TracebackType = tb or sys.exc_info()[2]
|
||||||
|
|
|
@ -167,7 +167,7 @@ class PldRx(Struct):
|
||||||
ipc_msg: MsgType|None = None,
|
ipc_msg: MsgType|None = None,
|
||||||
expect_msg: Type[MsgType]|None = None,
|
expect_msg: Type[MsgType]|None = None,
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
**dec_pld_kwargs,
|
**dec_msg_kwargs,
|
||||||
|
|
||||||
) -> Any|Raw:
|
) -> Any|Raw:
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
@ -179,12 +179,12 @@ class PldRx(Struct):
|
||||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||||
ipc._rx_chan.receive_nowait()
|
ipc._rx_chan.receive_nowait()
|
||||||
)
|
)
|
||||||
return self.decode_pld(
|
return self.dec_msg(
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
**dec_pld_kwargs,
|
**dec_msg_kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def recv_pld(
|
async def recv_pld(
|
||||||
|
@ -194,7 +194,7 @@ class PldRx(Struct):
|
||||||
expect_msg: Type[MsgType]|None = None,
|
expect_msg: Type[MsgType]|None = None,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
**dec_pld_kwargs,
|
**dec_msg_kwargs,
|
||||||
|
|
||||||
) -> Any|Raw:
|
) -> Any|Raw:
|
||||||
'''
|
'''
|
||||||
|
@ -208,14 +208,17 @@ class PldRx(Struct):
|
||||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
await ipc._rx_chan.receive()
|
await ipc._rx_chan.receive()
|
||||||
)
|
)
|
||||||
return self.decode_pld(
|
return self.dec_msg(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
**dec_pld_kwargs,
|
**dec_msg_kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
def decode_pld(
|
# TODO: rename to,
|
||||||
|
# -[ ] `.decode_pld()`?
|
||||||
|
# -[ ] `.dec_pld()`?
|
||||||
|
def dec_msg(
|
||||||
self,
|
self,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
ipc: Context|MsgStream,
|
ipc: Context|MsgStream,
|
||||||
|
@ -296,6 +299,9 @@ class PldRx(Struct):
|
||||||
if not is_started_send_side
|
if not is_started_send_side
|
||||||
else ipc._actor.uid
|
else ipc._actor.uid
|
||||||
),
|
),
|
||||||
|
# tb=valerr.__traceback__,
|
||||||
|
# tb_str=mte._message,
|
||||||
|
# message=mte._message,
|
||||||
)
|
)
|
||||||
mte._ipc_msg = err_msg
|
mte._ipc_msg = err_msg
|
||||||
|
|
||||||
|
@ -311,6 +317,29 @@ class PldRx(Struct):
|
||||||
# validation error.
|
# validation error.
|
||||||
src_err = valerr
|
src_err = valerr
|
||||||
|
|
||||||
|
# TODO: should we instead make this explicit and
|
||||||
|
# use the above masked `is_started_send_decode`,
|
||||||
|
# expecting the `Context.started()` caller to set
|
||||||
|
# it? Rn this is kinda, howyousayyy, implicitly
|
||||||
|
# edge-case-y..
|
||||||
|
# TODO: remove this since it's been added to
|
||||||
|
# `_raise_from_unexpected_msg()`..?
|
||||||
|
# if (
|
||||||
|
# expect_msg is not Started
|
||||||
|
# and not is_started_send_side
|
||||||
|
# ):
|
||||||
|
# # set emulated remote error more-or-less as the
|
||||||
|
# # runtime would
|
||||||
|
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
|
# ctx._maybe_cancel_and_set_remote_error(mte)
|
||||||
|
|
||||||
|
# XXX some other decoder specific failure?
|
||||||
|
# except TypeError as src_error:
|
||||||
|
# from .devx import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
# raise src_error
|
||||||
|
# ^-TODO-^ can remove?
|
||||||
|
|
||||||
# a runtime-internal RPC endpoint response.
|
# a runtime-internal RPC endpoint response.
|
||||||
# always passthrough since (internal) runtime
|
# always passthrough since (internal) runtime
|
||||||
# responses are generally never exposed to consumer
|
# responses are generally never exposed to consumer
|
||||||
|
@ -406,8 +435,6 @@ class PldRx(Struct):
|
||||||
__tracebackhide__: bool = False
|
__tracebackhide__: bool = False
|
||||||
raise
|
raise
|
||||||
|
|
||||||
dec_msg = decode_pld
|
|
||||||
|
|
||||||
async def recv_msg_w_pld(
|
async def recv_msg_w_pld(
|
||||||
self,
|
self,
|
||||||
ipc: Context|MsgStream,
|
ipc: Context|MsgStream,
|
||||||
|
@ -436,7 +463,7 @@ class PldRx(Struct):
|
||||||
# TODO: is there some way we can inject the decoded
|
# TODO: is there some way we can inject the decoded
|
||||||
# payload into an existing output buffer for the original
|
# payload into an existing output buffer for the original
|
||||||
# msg instance?
|
# msg instance?
|
||||||
pld: PayloadT = self.decode_pld(
|
pld: PayloadT = self.dec_msg(
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
|
@ -583,10 +610,7 @@ async def drain_to_final_msg(
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
# cancellation.
|
# cancellation.
|
||||||
ctx.maybe_raise(
|
ctx.maybe_raise()
|
||||||
# TODO: when use this/
|
|
||||||
# from_src_exc=taskc,
|
|
||||||
)
|
|
||||||
|
|
||||||
# CASE 1: we DID request the cancel we simply
|
# CASE 1: we DID request the cancel we simply
|
||||||
# continue to bubble up as normal.
|
# continue to bubble up as normal.
|
||||||
|
@ -759,7 +783,7 @@ def validate_payload_msg(
|
||||||
try:
|
try:
|
||||||
roundtripped: Started = codec.decode(msg_bytes)
|
roundtripped: Started = codec.decode(msg_bytes)
|
||||||
ctx: Context = getattr(ipc, 'ctx', ipc)
|
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
pld: PayloadT = ctx.pld_rx.decode_pld(
|
pld: PayloadT = ctx.pld_rx.dec_msg(
|
||||||
msg=roundtripped,
|
msg=roundtripped,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
expect_msg=Started,
|
expect_msg=Started,
|
||||||
|
|
Loading…
Reference in New Issue