Compare commits
7 Commits
0e8c60ee4a
...
8ea0f08386
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 8ea0f08386 | |
Tyler Goodlet | 13ea500a44 | |
Tyler Goodlet | 2f854a3e86 | |
Tyler Goodlet | cdb1311e40 | |
Tyler Goodlet | fcd089c08f | |
Tyler Goodlet | 993281882b | |
Tyler Goodlet | bbb4d4e52c |
|
@ -0,0 +1,56 @@
|
|||
import trio
|
||||
import tractor
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def name_error(
|
||||
ctx: tractor.Context,
|
||||
):
|
||||
'''
|
||||
Raise a `NameError`, catch it and enter `.post_mortem()`, then
|
||||
expect the `._rpc._invoke()` crash handler to also engage.
|
||||
|
||||
'''
|
||||
try:
|
||||
getattr(doggypants) # noqa (on purpose)
|
||||
except NameError:
|
||||
await tractor.post_mortem()
|
||||
raise
|
||||
|
||||
|
||||
async def main():
|
||||
'''
|
||||
Test 3 `PdbREPL` entries:
|
||||
- one in the child due to manual `.post_mortem()`,
|
||||
- another in the child due to runtime RPC crash handling.
|
||||
- final one here in parent from the RAE.
|
||||
|
||||
'''
|
||||
# XXX NOTE: ideally the REPL arrives at this frame in the parent
|
||||
# ONE UP FROM the inner ctx block below!
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=True,
|
||||
# loglevel='cancel',
|
||||
) as an:
|
||||
p: tractor.Portal = await an.start_actor(
|
||||
'child',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
# XXX should raise `RemoteActorError[NameError]`
|
||||
# AND be the active frame when REPL enters!
|
||||
try:
|
||||
async with p.open_context(name_error) as (ctx, first):
|
||||
assert first
|
||||
except tractor.RemoteActorError as rae:
|
||||
assert rae.boxed_type is NameError
|
||||
|
||||
# manually handle in root's parent task
|
||||
await tractor.post_mortem()
|
||||
raise
|
||||
else:
|
||||
raise RuntimeError('IPC ctx should have remote errored!?')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trio.run(main)
|
|
@ -0,0 +1,88 @@
|
|||
import trio
|
||||
import tractor
|
||||
|
||||
|
||||
async def cancellable_pause_loop(
|
||||
task_status: trio.TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
with trio.CancelScope() as cs:
|
||||
task_status.started(cs)
|
||||
for _ in range(3):
|
||||
try:
|
||||
# ON first entry, there is no level triggered
|
||||
# cancellation yet, so this cp does a parent task
|
||||
# ctx-switch so that this scope raises for the NEXT
|
||||
# checkpoint we hit.
|
||||
await trio.lowlevel.checkpoint()
|
||||
await tractor.pause()
|
||||
|
||||
cs.cancel()
|
||||
|
||||
# parent should have called `cs.cancel()` by now
|
||||
await trio.lowlevel.checkpoint()
|
||||
|
||||
except trio.Cancelled:
|
||||
print('INSIDE SHIELDED PAUSE')
|
||||
await tractor.pause(shield=True)
|
||||
else:
|
||||
# should raise it again, bubbling up to parent
|
||||
print('BUBBLING trio.Cancelled to parent task-nursery')
|
||||
await trio.lowlevel.checkpoint()
|
||||
|
||||
|
||||
async def pm_on_cancelled():
|
||||
async with trio.open_nursery() as tn:
|
||||
tn.cancel_scope.cancel()
|
||||
try:
|
||||
await trio.sleep_forever()
|
||||
except trio.Cancelled:
|
||||
# should also raise `Cancelled` since
|
||||
# we didn't pass `shield=True`.
|
||||
try:
|
||||
await tractor.post_mortem(hide_tb=False)
|
||||
except trio.Cancelled as taskc:
|
||||
|
||||
# should enter just fine, in fact it should
|
||||
# be debugging the internals of the previous
|
||||
# sin-shield call above Bo
|
||||
await tractor.post_mortem(
|
||||
hide_tb=False,
|
||||
shield=True,
|
||||
)
|
||||
raise taskc
|
||||
|
||||
else:
|
||||
raise RuntimeError('Dint cancel as expected!?')
|
||||
|
||||
|
||||
async def cancelled_before_pause(
|
||||
):
|
||||
'''
|
||||
Verify that using a shielded pause works despite surrounding
|
||||
cancellation called state in the calling task.
|
||||
|
||||
'''
|
||||
async with trio.open_nursery() as tn:
|
||||
cs: trio.CancelScope = await tn.start(cancellable_pause_loop)
|
||||
await trio.sleep(0.1)
|
||||
|
||||
assert cs.cancelled_caught
|
||||
|
||||
await pm_on_cancelled()
|
||||
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=True,
|
||||
) as n:
|
||||
portal: tractor.Portal = await n.run_in_actor(
|
||||
cancelled_before_pause,
|
||||
)
|
||||
await portal.result()
|
||||
|
||||
# ensure the same works in the root actor!
|
||||
await pm_on_cancelled()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trio.run(main)
|
|
@ -161,6 +161,10 @@ def in_prompt_msg(
|
|||
|
||||
return True
|
||||
|
||||
|
||||
# TODO: todo support terminal color-chars stripping so we can match
|
||||
# against call stack frame output from the the 'll' command the like!
|
||||
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
|
||||
def assert_before(
|
||||
child,
|
||||
patts: list[str],
|
||||
|
@ -1125,7 +1129,187 @@ def test_pause_from_sync(
|
|||
child.expect(pexpect.EOF)
|
||||
|
||||
|
||||
# TODO!
|
||||
def test_post_mortem_api(
|
||||
spawn,
|
||||
ctlc: bool,
|
||||
):
|
||||
'''
|
||||
Verify the `tractor.post_mortem()` API works in an exception
|
||||
handler block.
|
||||
|
||||
'''
|
||||
child = spawn('pm_in_subactor')
|
||||
|
||||
# First entry is via manual `.post_mortem()`
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_crash_msg,
|
||||
"<Task 'name_error'",
|
||||
"NameError",
|
||||
"('child'",
|
||||
"tractor.post_mortem()",
|
||||
]
|
||||
)
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
child.sendline('c')
|
||||
|
||||
# 2nd is RPC crash handler
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_crash_msg,
|
||||
"<Task 'name_error'",
|
||||
"NameError",
|
||||
"('child'",
|
||||
]
|
||||
)
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
child.sendline('c')
|
||||
|
||||
# 3rd is via RAE bubbled to root's parent ctx task and
|
||||
# crash-handled via another manual pm call.
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_crash_msg,
|
||||
"<Task '__main__.main'",
|
||||
"('root'",
|
||||
"NameError",
|
||||
"tractor.post_mortem()",
|
||||
"src_uid=('child'",
|
||||
]
|
||||
)
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
child.sendline('c')
|
||||
|
||||
# 4th and FINAL is via RAE bubbled to root's parent ctx task and
|
||||
# crash-handled via another manual pm call.
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_crash_msg,
|
||||
"<Task '__main__.main'",
|
||||
"('root'",
|
||||
"NameError",
|
||||
"src_uid=('child'",
|
||||
]
|
||||
)
|
||||
if ctlc:
|
||||
do_ctlc(child)
|
||||
|
||||
|
||||
# TODO: ensure we're stopped and showing the right call stack frame
|
||||
# -[ ] need a way to strip the terminal color chars in order to
|
||||
# pattern match... see TODO around `assert_before()` above!
|
||||
# child.sendline('w')
|
||||
# child.expect(PROMPT)
|
||||
# assert_before(
|
||||
# child,
|
||||
# [
|
||||
# # error src block annot at ctx open
|
||||
# '-> async with p.open_context(name_error) as (ctx, first):',
|
||||
# ]
|
||||
# )
|
||||
|
||||
# # step up a frame to ensure the it's the root's nursery
|
||||
# child.sendline('u')
|
||||
# child.expect(PROMPT)
|
||||
# assert_before(
|
||||
# child,
|
||||
# [
|
||||
# # handler block annotation
|
||||
# '-> async with tractor.open_nursery(',
|
||||
# ]
|
||||
# )
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
|
||||
|
||||
def test_shield_pause(
|
||||
spawn,
|
||||
):
|
||||
'''
|
||||
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
||||
already cancelled `trio.CancelScope` and that you can step to the
|
||||
next checkpoint wherein the cancelled will get raised.
|
||||
|
||||
'''
|
||||
child = spawn('shielded_pause')
|
||||
|
||||
# First entry is via manual `.post_mortem()`
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_pause_msg,
|
||||
"cancellable_pause_loop'",
|
||||
"('cancelled_before_pause'", # actor name
|
||||
]
|
||||
)
|
||||
|
||||
# since 3 tries in ex. shield pause loop
|
||||
for i in range(3):
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_pause_msg,
|
||||
"INSIDE SHIELDED PAUSE",
|
||||
"('cancelled_before_pause'", # actor name
|
||||
]
|
||||
)
|
||||
|
||||
# back inside parent task that opened nursery
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_crash_msg,
|
||||
"('cancelled_before_pause'", # actor name
|
||||
"Failed to engage debugger via `_pause()`",
|
||||
"trio.Cancelled",
|
||||
"raise Cancelled._create()",
|
||||
|
||||
# we should be handling a taskc inside
|
||||
# the first `.port_mortem()` sin-shield!
|
||||
'await DebugStatus.req_finished.wait()',
|
||||
]
|
||||
)
|
||||
|
||||
# same as above but in the root actor's task
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_crash_msg,
|
||||
"('root'", # actor name
|
||||
"Failed to engage debugger via `_pause()`",
|
||||
"trio.Cancelled",
|
||||
"raise Cancelled._create()",
|
||||
|
||||
# handling a taskc inside the first unshielded
|
||||
# `.port_mortem()`.
|
||||
# BUT in this case in the root-proc path ;)
|
||||
'wait Lock._debug_lock.acquire()',
|
||||
]
|
||||
)
|
||||
child.sendline('c')
|
||||
child.expect(pexpect.EOF)
|
||||
|
||||
|
||||
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
||||
def test_correct_frames_below_hidden():
|
||||
'''
|
||||
Ensure that once a `tractor.pause()` enages, when the user
|
||||
|
@ -1138,4 +1322,11 @@ def test_correct_frames_below_hidden():
|
|||
|
||||
|
||||
def test_cant_pause_from_paused_task():
|
||||
'''
|
||||
Pausing from with an already paused task should raise an error.
|
||||
|
||||
Normally this should only happen in practise while debugging the call stack of `tractor.pause()` itself, likely
|
||||
by a `.pause()` line somewhere inside our runtime.
|
||||
|
||||
'''
|
||||
...
|
||||
|
|
|
@ -46,7 +46,7 @@ maybe_msg_spec = PldMsg|None
|
|||
async def maybe_expect_raises(
|
||||
raises: BaseException|None = None,
|
||||
ensure_in_message: list[str]|None = None,
|
||||
reraise: bool = False,
|
||||
post_mortem: bool = False,
|
||||
timeout: int = 3,
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -86,8 +86,8 @@ async def maybe_expect_raises(
|
|||
f'{inner_err.args}'
|
||||
)
|
||||
|
||||
if reraise:
|
||||
raise inner_err
|
||||
if post_mortem:
|
||||
await tractor.post_mortem()
|
||||
|
||||
else:
|
||||
if raises:
|
||||
|
@ -314,6 +314,8 @@ def test_basic_payload_spec(
|
|||
f"value: `{bad_value_str}` does not "
|
||||
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
||||
],
|
||||
# only for debug
|
||||
post_mortem=True,
|
||||
),
|
||||
p.open_context(
|
||||
child,
|
||||
|
|
|
@ -1190,6 +1190,7 @@ class Context:
|
|||
self,
|
||||
remote_error: Exception,
|
||||
|
||||
from_src_exc: BaseException|None|bool = False,
|
||||
raise_ctxc_from_self_call: bool = False,
|
||||
raise_overrun_from_self: bool = True,
|
||||
hide_tb: bool = True,
|
||||
|
@ -1284,7 +1285,10 @@ class Context:
|
|||
# runtime frames from the tb explicitly?
|
||||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||
# https://stackoverflow.com/a/24752607
|
||||
raise remote_error # from None
|
||||
if from_src_exc is not False:
|
||||
raise remote_error from from_src_exc
|
||||
|
||||
raise remote_error
|
||||
|
||||
# TODO: change to `.wait_for_result()`?
|
||||
async def result(
|
||||
|
@ -2096,7 +2100,11 @@ async def open_context_from_portal(
|
|||
# `._maybe_cancel_and_set_remote_error()` so ensure
|
||||
# we raise the underlying `._remote_error` directly
|
||||
# instead of bubbling that taskc.
|
||||
ctx.maybe_raise()
|
||||
ctx.maybe_raise(
|
||||
# mask the above taskc from the tb
|
||||
from_src_exc=None,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
|
||||
# OW, some other unexpected cancel condition
|
||||
# that should prolly never happen right?
|
||||
|
@ -2108,13 +2116,14 @@ async def open_context_from_portal(
|
|||
ctx._started_msg: bool = started_msg
|
||||
ctx._started_pld: bool = first
|
||||
|
||||
# deliver context instance and .started() msg value
|
||||
# in enter tuple.
|
||||
# deliver context ref and `.started()` msg payload value
|
||||
# in `__aenter__` tuple.
|
||||
yield ctx, first
|
||||
|
||||
# ??TODO??: do we still want to consider this or is
|
||||
# the `else:` block handling via a `.result()`
|
||||
# call below enough??
|
||||
#
|
||||
# -[ ] pretty sure `.result()` internals do the
|
||||
# same as our ctxc handler below so it ended up
|
||||
# being same (repeated?) behaviour, but ideally we
|
||||
|
@ -2123,33 +2132,13 @@ async def open_context_from_portal(
|
|||
# that we can re-use it around the `yield` ^ here
|
||||
# or vice versa?
|
||||
#
|
||||
# NOTE: between the caller exiting and arriving
|
||||
# here the far end may have sent a ctxc-msg or
|
||||
# other error, so check for it here immediately
|
||||
# and maybe raise so as to engage the ctxc
|
||||
# handling block below!
|
||||
# maybe TODO NOTE: between the caller exiting and
|
||||
# arriving here the far end may have sent a ctxc-msg or
|
||||
# other error, so the quetion is whether we should check
|
||||
# for it here immediately and maybe raise so as to engage
|
||||
# the ctxc handling block below ????
|
||||
#
|
||||
# if re := ctx._remote_error:
|
||||
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
|
||||
# re,
|
||||
# # TODO: do we want this to always raise?
|
||||
# # - means that on self-ctxc, if/when the
|
||||
# # block is exited before the msg arrives
|
||||
# # but then the msg during __exit__
|
||||
# # calling we may not activate the
|
||||
# # ctxc-handler block below? should we
|
||||
# # be?
|
||||
# # - if there's a remote error that arrives
|
||||
# # after the child has exited, we won't
|
||||
# # handle until the `finally:` block
|
||||
# # where `.result()` is always called,
|
||||
# # again in which case we handle it
|
||||
# # differently then in the handler block
|
||||
# # that would normally engage from THIS
|
||||
# # block?
|
||||
# raise_ctxc_from_self_call=True,
|
||||
# )
|
||||
# ctxc_from_callee = maybe_ctxc
|
||||
# self.maybe_raise()
|
||||
|
||||
# when in allow_overruns mode there may be
|
||||
# lingering overflow sender tasks remaining?
|
||||
|
@ -2460,7 +2449,7 @@ async def open_context_from_portal(
|
|||
#
|
||||
# NOTE: further, this should be the only place the
|
||||
# underlying feeder channel is
|
||||
# once-and-only-CLOSED!
|
||||
# once-forever-and-only-CLOSED!
|
||||
with trio.CancelScope(shield=True):
|
||||
await ctx._rx_chan.aclose()
|
||||
|
||||
|
|
|
@ -518,7 +518,6 @@ class RemoteActorError(Exception):
|
|||
def pformat(
|
||||
self,
|
||||
with_type_header: bool = True,
|
||||
# with_ascii_box: bool = True,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
|
@ -885,9 +884,9 @@ class MsgTypeError(
|
|||
extra_msgdata['_bad_msg'] = bad_msg
|
||||
extra_msgdata['cid'] = bad_msg.cid
|
||||
|
||||
extra_msgdata.setdefault('boxed_type', cls)
|
||||
return cls(
|
||||
message=message,
|
||||
boxed_type=cls,
|
||||
**extra_msgdata,
|
||||
)
|
||||
|
||||
|
@ -1111,7 +1110,7 @@ def is_multi_cancelled(
|
|||
def _raise_from_unexpected_msg(
|
||||
ctx: Context,
|
||||
msg: MsgType,
|
||||
src_err: AttributeError,
|
||||
src_err: Exception,
|
||||
log: StackLevelAdapter, # caller specific `log` obj
|
||||
|
||||
expect_msg: Type[MsgType],
|
||||
|
@ -1212,7 +1211,7 @@ def _raise_from_unexpected_msg(
|
|||
# in case there already is some underlying remote error
|
||||
# that arrived which is probably the source of this stream
|
||||
# closure
|
||||
ctx.maybe_raise()
|
||||
ctx.maybe_raise(from_src_exc=src_err)
|
||||
raise eoc from src_err
|
||||
|
||||
# TODO: our own transport/IPC-broke error subtype?
|
||||
|
@ -1361,6 +1360,7 @@ def _mk_msg_type_err(
|
|||
message=message,
|
||||
bad_msg=bad_msg,
|
||||
bad_msg_as_dict=msg_dict,
|
||||
boxed_type=type(src_validation_error),
|
||||
|
||||
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
|
||||
# - for the send-side `.started()` pld-validate
|
||||
|
|
|
@ -80,7 +80,6 @@ from tractor.msg.types import (
|
|||
Yield,
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
|
||||
|
@ -328,7 +327,6 @@ async def _errors_relayed_via_ipc(
|
|||
f'|_{ctx}'
|
||||
)
|
||||
|
||||
|
||||
# ALWAYS try to ship RPC errors back to parent/caller task
|
||||
if is_rpc:
|
||||
|
||||
|
@ -819,6 +817,12 @@ async def try_ship_error_to_remote(
|
|||
# TODO: use `.msg.preetty_struct` for this!
|
||||
f'{msg}\n'
|
||||
)
|
||||
except BaseException:
|
||||
log.exception(
|
||||
'Errored while attempting error shipment?'
|
||||
)
|
||||
__tracebackhide__: bool = False
|
||||
raise
|
||||
|
||||
|
||||
async def process_messages(
|
||||
|
|
|
@ -233,6 +233,7 @@ class MsgStream(trio.abc.Channel):
|
|||
# ctx: Context = self._ctx
|
||||
ctx.maybe_raise(
|
||||
raise_ctxc_from_self_call=True,
|
||||
from_src_exc=src_err,
|
||||
)
|
||||
|
||||
# propagate any error but hide low-level frame details
|
||||
|
|
|
@ -1600,12 +1600,14 @@ async def _pause(
|
|||
f'REPL: {Lock.repl}\n'
|
||||
# TODO: use `._frame_stack` scanner to find the @api_frame
|
||||
)
|
||||
with trio.CancelScope(shield=shield):
|
||||
await trio.lowlevel.checkpoint()
|
||||
return
|
||||
|
||||
# XXX: since we need to enter pdb synchronously below,
|
||||
# we have to release the lock manually from pdb completion
|
||||
# callbacks. Can't think of a nicer way then this atm.
|
||||
with trio.CancelScope(shield=shield):
|
||||
if Lock._debug_lock.locked():
|
||||
log.warning(
|
||||
'attempting to shield-acquire active TTY lock owned by\n'
|
||||
|
@ -1614,7 +1616,7 @@ async def _pause(
|
|||
|
||||
# must shield here to avoid hitting a ``Cancelled`` and
|
||||
# a child getting stuck bc we clobbered the tty
|
||||
with trio.CancelScope(shield=True):
|
||||
# with trio.CancelScope(shield=True):
|
||||
await Lock._debug_lock.acquire()
|
||||
else:
|
||||
# may be cancelled
|
||||
|
@ -1659,6 +1661,7 @@ async def _pause(
|
|||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||
f'ignoring..'
|
||||
)
|
||||
with trio.CancelScope(shield=shield):
|
||||
await trio.lowlevel.checkpoint()
|
||||
return
|
||||
|
||||
|
@ -1671,6 +1674,7 @@ async def _pause(
|
|||
f'{task}@{actor.uid} already has TTY lock\n'
|
||||
f'waiting for release..'
|
||||
)
|
||||
with trio.CancelScope(shield=shield):
|
||||
await DebugStatus.repl_release.wait()
|
||||
await trio.sleep(0.1)
|
||||
|
||||
|
@ -1683,6 +1687,7 @@ async def _pause(
|
|||
|
||||
'Waiting for previous request to complete..\n'
|
||||
)
|
||||
with trio.CancelScope(shield=shield):
|
||||
await DebugStatus.req_finished.wait()
|
||||
|
||||
# this **must** be awaited by the caller and is done using the
|
||||
|
@ -1721,6 +1726,7 @@ async def _pause(
|
|||
'Starting request task\n'
|
||||
f'|_{task}\n'
|
||||
)
|
||||
with trio.CancelScope(shield=shield):
|
||||
req_ctx: Context = await actor._service_n.start(
|
||||
partial(
|
||||
request_root_stdio_lock,
|
||||
|
@ -2147,6 +2153,13 @@ async def post_mortem(
|
|||
**_pause_kwargs,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
`tractor`'s builtin async equivalient of `pdb.post_mortem()`
|
||||
which can be used inside exception handlers.
|
||||
|
||||
It's also used for the crash handler when `debug_mode == True` ;)
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
tb: TracebackType = tb or sys.exc_info()[2]
|
||||
|
|
|
@ -167,7 +167,7 @@ class PldRx(Struct):
|
|||
ipc_msg: MsgType|None = None,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
hide_tb: bool = False,
|
||||
**dec_msg_kwargs,
|
||||
**dec_pld_kwargs,
|
||||
|
||||
) -> Any|Raw:
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
@ -179,12 +179,12 @@ class PldRx(Struct):
|
|||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||
ipc._rx_chan.receive_nowait()
|
||||
)
|
||||
return self.dec_msg(
|
||||
return self.decode_pld(
|
||||
msg,
|
||||
ipc=ipc,
|
||||
expect_msg=expect_msg,
|
||||
hide_tb=hide_tb,
|
||||
**dec_msg_kwargs,
|
||||
**dec_pld_kwargs,
|
||||
)
|
||||
|
||||
async def recv_pld(
|
||||
|
@ -194,7 +194,7 @@ class PldRx(Struct):
|
|||
expect_msg: Type[MsgType]|None = None,
|
||||
hide_tb: bool = True,
|
||||
|
||||
**dec_msg_kwargs,
|
||||
**dec_pld_kwargs,
|
||||
|
||||
) -> Any|Raw:
|
||||
'''
|
||||
|
@ -208,17 +208,14 @@ class PldRx(Struct):
|
|||
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||
await ipc._rx_chan.receive()
|
||||
)
|
||||
return self.dec_msg(
|
||||
return self.decode_pld(
|
||||
msg=msg,
|
||||
ipc=ipc,
|
||||
expect_msg=expect_msg,
|
||||
**dec_msg_kwargs,
|
||||
**dec_pld_kwargs,
|
||||
)
|
||||
|
||||
# TODO: rename to,
|
||||
# -[ ] `.decode_pld()`?
|
||||
# -[ ] `.dec_pld()`?
|
||||
def dec_msg(
|
||||
def decode_pld(
|
||||
self,
|
||||
msg: MsgType,
|
||||
ipc: Context|MsgStream,
|
||||
|
@ -299,9 +296,6 @@ class PldRx(Struct):
|
|||
if not is_started_send_side
|
||||
else ipc._actor.uid
|
||||
),
|
||||
# tb=valerr.__traceback__,
|
||||
# tb_str=mte._message,
|
||||
# message=mte._message,
|
||||
)
|
||||
mte._ipc_msg = err_msg
|
||||
|
||||
|
@ -317,29 +311,6 @@ class PldRx(Struct):
|
|||
# validation error.
|
||||
src_err = valerr
|
||||
|
||||
# TODO: should we instead make this explicit and
|
||||
# use the above masked `is_started_send_decode`,
|
||||
# expecting the `Context.started()` caller to set
|
||||
# it? Rn this is kinda, howyousayyy, implicitly
|
||||
# edge-case-y..
|
||||
# TODO: remove this since it's been added to
|
||||
# `_raise_from_unexpected_msg()`..?
|
||||
# if (
|
||||
# expect_msg is not Started
|
||||
# and not is_started_send_side
|
||||
# ):
|
||||
# # set emulated remote error more-or-less as the
|
||||
# # runtime would
|
||||
# ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||
# ctx._maybe_cancel_and_set_remote_error(mte)
|
||||
|
||||
# XXX some other decoder specific failure?
|
||||
# except TypeError as src_error:
|
||||
# from .devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
# raise src_error
|
||||
# ^-TODO-^ can remove?
|
||||
|
||||
# a runtime-internal RPC endpoint response.
|
||||
# always passthrough since (internal) runtime
|
||||
# responses are generally never exposed to consumer
|
||||
|
@ -435,6 +406,8 @@ class PldRx(Struct):
|
|||
__tracebackhide__: bool = False
|
||||
raise
|
||||
|
||||
dec_msg = decode_pld
|
||||
|
||||
async def recv_msg_w_pld(
|
||||
self,
|
||||
ipc: Context|MsgStream,
|
||||
|
@ -463,7 +436,7 @@ class PldRx(Struct):
|
|||
# TODO: is there some way we can inject the decoded
|
||||
# payload into an existing output buffer for the original
|
||||
# msg instance?
|
||||
pld: PayloadT = self.dec_msg(
|
||||
pld: PayloadT = self.decode_pld(
|
||||
msg,
|
||||
ipc=ipc,
|
||||
expect_msg=expect_msg,
|
||||
|
@ -610,7 +583,10 @@ async def drain_to_final_msg(
|
|||
# only when we are sure the remote error is
|
||||
# the source cause of this local task's
|
||||
# cancellation.
|
||||
ctx.maybe_raise()
|
||||
ctx.maybe_raise(
|
||||
# TODO: when use this/
|
||||
# from_src_exc=taskc,
|
||||
)
|
||||
|
||||
# CASE 1: we DID request the cancel we simply
|
||||
# continue to bubble up as normal.
|
||||
|
@ -783,7 +759,7 @@ def validate_payload_msg(
|
|||
try:
|
||||
roundtripped: Started = codec.decode(msg_bytes)
|
||||
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||
pld: PayloadT = ctx.pld_rx.dec_msg(
|
||||
pld: PayloadT = ctx.pld_rx.decode_pld(
|
||||
msg=roundtripped,
|
||||
ipc=ipc,
|
||||
expect_msg=Started,
|
||||
|
|
Loading…
Reference in New Issue