Compare commits

...

7 Commits

Author SHA1 Message Date
Tyler Goodlet 8ea0f08386 Finally, officially support shielded REPL-ing!
It's been a long time prepped and now finally implemented!

Offer a `shield: bool` argument from our async `._debug` APIs:
- `await tractor.pause(shield=True)`,
- `await tractor.post_mortem(shield=True)`

^-These-^ can now be used inside cancelled `trio.CancelScope`s,
something very handy when introspecting complex (distributed) system
tear/shut-downs particularly under remote error or (inter-peer)
cancellation conditions B)

Thanks to previous prepping in a prior attempt and various patches from
the rigorous rework of `.devx._debug` internals around typed msg specs,
there ain't much that was needed!

Impl deats
- obvi passthrough `shield` from the public API endpoints (was already
  done from a prior attempt).
- put ad-hoc internal `with trio.CancelScope(shield=shield):` around all
  checkpoints inside `._pause()` for both the root-process and subactor
  case branches.

Add a fairly rigorous example, `examples/debugging/shielded_pause.py`
with a wrapping `pexpect` test, `test_debugger.test_shield_pause()` and
ensure it covers as many cases as i can think of offhand:

- multiple `.pause()` entries in a loop despite parent scope
  cancellation in a subactor RPC task which itself spawns a sub-task.
- a `trio.Nursery.parent_task` which raises, is handled and
  tries to enter and unshielded `.post_mortem()`, which of course
  internally raises `Cancelled` in a `._pause()` checkpoint, so we catch
  the `Cancelled` again and then debug the debugger's internal
  cancellation with specific checks for the particular raising
  checkpoint-LOC.
- do ^- the latter -^ for both subactor and root cases to ensure we
  can debug `._pause()` itself when it tries to REPL engage from
  a cancelled task scope Bo
2024-05-30 17:52:24 -04:00
Tyler Goodlet 13ea500a44 Rename `PldRx.dec_msg()` -> `.decode_pld()`
Keep the old alias, but i think it's better form to use longer names for
internal public APIs and this name better reflects the functionality:
decoding and returning a `PayloadMsg.pld` field.
2024-05-30 16:09:59 -04:00
Tyler Goodlet 2f854a3e86 Add a `tractor.post_mortem()` API test + example
Since turns out we didn't have a single example using that API Bo

The test granular-ly checks all use cases:
- `.post_mortem()` manual calls in both subactor and root.
- ensuring built-in RPC crash handling activates after each manual one
  from ^.
- drafted some call-stack frame checking that i commented out for now
  since we need to first do ANSI escape code removal due to the
  colorization that `pdbp` does by default.
  |_ added a TODO with SO link on `assert_before()`.

Also todo-staged a shielded-pause test to match with the already
existing-but-needs-refinement example B)
2024-05-30 16:03:28 -04:00
Tyler Goodlet cdb1311e40 Change `reraise` to `post_mortem: bool` in `maybe_expect_raises()` 2024-05-30 16:02:59 -04:00
Tyler Goodlet fcd089c08f Always `.exception()` in `try_ship_error_to_remote()` on internal error 2024-05-30 16:02:25 -04:00
Tyler Goodlet 993281882b Pass `boxed_type` from `_mk_msg_type_err()`
Such that we're boxing the interchanged lib's specific error
`msgspec.ValidationError` in this case) type much like how
a `ContextCancelled[trio.Cancelled]` is composed; allows for seemless
multi-backend-codec support later as well B)

Pass `ctx.maybe_raise(from_src_exc=src_err)` where needed in a couple
spots; as `None` in the send-side `Started` MTE case to avoid showing
the `._scope1.cancel_called` result in the traceback from the
`.open_context()` child-sync phase.
2024-05-30 15:55:34 -04:00
Tyler Goodlet bbb4d4e52c Add `from_src_exc: BaseException` to maybe raisers
That is as a control to `Context._maybe_raise_remote_err()` such that
if set to anything other then the default (`False` value), we do
`raise remote_error from from_src_exc` such that caller can choose to
suppress or override the `.__cause__` tb.

Also tidy up and old masked TODO regarding calling `.maybe_raise()`
after the caller exits from the `yield` in `.open_context()`..
2024-05-30 15:24:25 -04:00
10 changed files with 423 additions and 103 deletions

View File

@ -0,0 +1,56 @@
import trio
import tractor
@tractor.context
async def name_error(
ctx: tractor.Context,
):
'''
Raise a `NameError`, catch it and enter `.post_mortem()`, then
expect the `._rpc._invoke()` crash handler to also engage.
'''
try:
getattr(doggypants) # noqa (on purpose)
except NameError:
await tractor.post_mortem()
raise
async def main():
'''
Test 3 `PdbREPL` entries:
- one in the child due to manual `.post_mortem()`,
- another in the child due to runtime RPC crash handling.
- final one here in parent from the RAE.
'''
# XXX NOTE: ideally the REPL arrives at this frame in the parent
# ONE UP FROM the inner ctx block below!
async with tractor.open_nursery(
debug_mode=True,
# loglevel='cancel',
) as an:
p: tractor.Portal = await an.start_actor(
'child',
enable_modules=[__name__],
)
# XXX should raise `RemoteActorError[NameError]`
# AND be the active frame when REPL enters!
try:
async with p.open_context(name_error) as (ctx, first):
assert first
except tractor.RemoteActorError as rae:
assert rae.boxed_type is NameError
# manually handle in root's parent task
await tractor.post_mortem()
raise
else:
raise RuntimeError('IPC ctx should have remote errored!?')
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,88 @@
import trio
import tractor
async def cancellable_pause_loop(
task_status: trio.TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
):
with trio.CancelScope() as cs:
task_status.started(cs)
for _ in range(3):
try:
# ON first entry, there is no level triggered
# cancellation yet, so this cp does a parent task
# ctx-switch so that this scope raises for the NEXT
# checkpoint we hit.
await trio.lowlevel.checkpoint()
await tractor.pause()
cs.cancel()
# parent should have called `cs.cancel()` by now
await trio.lowlevel.checkpoint()
except trio.Cancelled:
print('INSIDE SHIELDED PAUSE')
await tractor.pause(shield=True)
else:
# should raise it again, bubbling up to parent
print('BUBBLING trio.Cancelled to parent task-nursery')
await trio.lowlevel.checkpoint()
async def pm_on_cancelled():
async with trio.open_nursery() as tn:
tn.cancel_scope.cancel()
try:
await trio.sleep_forever()
except trio.Cancelled:
# should also raise `Cancelled` since
# we didn't pass `shield=True`.
try:
await tractor.post_mortem(hide_tb=False)
except trio.Cancelled as taskc:
# should enter just fine, in fact it should
# be debugging the internals of the previous
# sin-shield call above Bo
await tractor.post_mortem(
hide_tb=False,
shield=True,
)
raise taskc
else:
raise RuntimeError('Dint cancel as expected!?')
async def cancelled_before_pause(
):
'''
Verify that using a shielded pause works despite surrounding
cancellation called state in the calling task.
'''
async with trio.open_nursery() as tn:
cs: trio.CancelScope = await tn.start(cancellable_pause_loop)
await trio.sleep(0.1)
assert cs.cancelled_caught
await pm_on_cancelled()
async def main():
async with tractor.open_nursery(
debug_mode=True,
) as n:
portal: tractor.Portal = await n.run_in_actor(
cancelled_before_pause,
)
await portal.result()
# ensure the same works in the root actor!
await pm_on_cancelled()
if __name__ == '__main__':
trio.run(main)

View File

@ -161,6 +161,10 @@ def in_prompt_msg(
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
@ -1125,7 +1129,187 @@ def test_pause_from_sync(
child.expect(pexpect.EOF)
# TODO!
def test_post_mortem_api(
spawn,
ctlc: bool,
):
'''
Verify the `tractor.post_mortem()` API works in an exception
handler block.
'''
child = spawn('pm_in_subactor')
# First entry is via manual `.post_mortem()`
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"<Task 'name_error'",
"NameError",
"('child'",
"tractor.post_mortem()",
]
)
if ctlc:
do_ctlc(child)
child.sendline('c')
# 2nd is RPC crash handler
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"<Task 'name_error'",
"NameError",
"('child'",
]
)
if ctlc:
do_ctlc(child)
child.sendline('c')
# 3rd is via RAE bubbled to root's parent ctx task and
# crash-handled via another manual pm call.
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"<Task '__main__.main'",
"('root'",
"NameError",
"tractor.post_mortem()",
"src_uid=('child'",
]
)
if ctlc:
do_ctlc(child)
child.sendline('c')
# 4th and FINAL is via RAE bubbled to root's parent ctx task and
# crash-handled via another manual pm call.
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"<Task '__main__.main'",
"('root'",
"NameError",
"src_uid=('child'",
]
)
if ctlc:
do_ctlc(child)
# TODO: ensure we're stopped and showing the right call stack frame
# -[ ] need a way to strip the terminal color chars in order to
# pattern match... see TODO around `assert_before()` above!
# child.sendline('w')
# child.expect(PROMPT)
# assert_before(
# child,
# [
# # error src block annot at ctx open
# '-> async with p.open_context(name_error) as (ctx, first):',
# ]
# )
# # step up a frame to ensure the it's the root's nursery
# child.sendline('u')
# child.expect(PROMPT)
# assert_before(
# child,
# [
# # handler block annotation
# '-> async with tractor.open_nursery(',
# ]
# )
child.sendline('c')
child.expect(pexpect.EOF)
def test_shield_pause(
spawn,
):
'''
Verify the `tractor.pause()/.post_mortem()` API works inside an
already cancelled `trio.CancelScope` and that you can step to the
next checkpoint wherein the cancelled will get raised.
'''
child = spawn('shielded_pause')
# First entry is via manual `.post_mortem()`
child.expect(PROMPT)
assert_before(
child,
[
_pause_msg,
"cancellable_pause_loop'",
"('cancelled_before_pause'", # actor name
]
)
# since 3 tries in ex. shield pause loop
for i in range(3):
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_pause_msg,
"INSIDE SHIELDED PAUSE",
"('cancelled_before_pause'", # actor name
]
)
# back inside parent task that opened nursery
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"('cancelled_before_pause'", # actor name
"Failed to engage debugger via `_pause()`",
"trio.Cancelled",
"raise Cancelled._create()",
# we should be handling a taskc inside
# the first `.port_mortem()` sin-shield!
'await DebugStatus.req_finished.wait()',
]
)
# same as above but in the root actor's task
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"('root'", # actor name
"Failed to engage debugger via `_pause()`",
"trio.Cancelled",
"raise Cancelled._create()",
# handling a taskc inside the first unshielded
# `.port_mortem()`.
# BUT in this case in the root-proc path ;)
'wait Lock._debug_lock.acquire()',
]
)
child.sendline('c')
child.expect(pexpect.EOF)
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
def test_correct_frames_below_hidden():
'''
Ensure that once a `tractor.pause()` enages, when the user
@ -1138,4 +1322,11 @@ def test_correct_frames_below_hidden():
def test_cant_pause_from_paused_task():
'''
Pausing from with an already paused task should raise an error.
Normally this should only happen in practise while debugging the call stack of `tractor.pause()` itself, likely
by a `.pause()` line somewhere inside our runtime.
'''
...

View File

@ -46,7 +46,7 @@ maybe_msg_spec = PldMsg|None
async def maybe_expect_raises(
raises: BaseException|None = None,
ensure_in_message: list[str]|None = None,
reraise: bool = False,
post_mortem: bool = False,
timeout: int = 3,
) -> None:
'''
@ -86,8 +86,8 @@ async def maybe_expect_raises(
f'{inner_err.args}'
)
if reraise:
raise inner_err
if post_mortem:
await tractor.post_mortem()
else:
if raises:
@ -314,6 +314,8 @@ def test_basic_payload_spec(
f"value: `{bad_value_str}` does not "
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
],
# only for debug
post_mortem=True,
),
p.open_context(
child,

View File

@ -1190,6 +1190,7 @@ class Context:
self,
remote_error: Exception,
from_src_exc: BaseException|None|bool = False,
raise_ctxc_from_self_call: bool = False,
raise_overrun_from_self: bool = True,
hide_tb: bool = True,
@ -1284,7 +1285,10 @@ class Context:
# runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
raise remote_error # from None
if from_src_exc is not False:
raise remote_error from from_src_exc
raise remote_error
# TODO: change to `.wait_for_result()`?
async def result(
@ -2096,7 +2100,11 @@ async def open_context_from_portal(
# `._maybe_cancel_and_set_remote_error()` so ensure
# we raise the underlying `._remote_error` directly
# instead of bubbling that taskc.
ctx.maybe_raise()
ctx.maybe_raise(
# mask the above taskc from the tb
from_src_exc=None,
hide_tb=hide_tb,
)
# OW, some other unexpected cancel condition
# that should prolly never happen right?
@ -2108,13 +2116,14 @@ async def open_context_from_portal(
ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first
# deliver context instance and .started() msg value
# in enter tuple.
# deliver context ref and `.started()` msg payload value
# in `__aenter__` tuple.
yield ctx, first
# ??TODO??: do we still want to consider this or is
# the `else:` block handling via a `.result()`
# call below enough??
#
# -[ ] pretty sure `.result()` internals do the
# same as our ctxc handler below so it ended up
# being same (repeated?) behaviour, but ideally we
@ -2123,33 +2132,13 @@ async def open_context_from_portal(
# that we can re-use it around the `yield` ^ here
# or vice versa?
#
# NOTE: between the caller exiting and arriving
# here the far end may have sent a ctxc-msg or
# other error, so check for it here immediately
# and maybe raise so as to engage the ctxc
# handling block below!
# maybe TODO NOTE: between the caller exiting and
# arriving here the far end may have sent a ctxc-msg or
# other error, so the quetion is whether we should check
# for it here immediately and maybe raise so as to engage
# the ctxc handling block below ????
#
# if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re,
# # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives
# # but then the msg during __exit__
# # calling we may not activate the
# # ctxc-handler block below? should we
# # be?
# # - if there's a remote error that arrives
# # after the child has exited, we won't
# # handle until the `finally:` block
# # where `.result()` is always called,
# # again in which case we handle it
# # differently then in the handler block
# # that would normally engage from THIS
# # block?
# raise_ctxc_from_self_call=True,
# )
# ctxc_from_callee = maybe_ctxc
# self.maybe_raise()
# when in allow_overruns mode there may be
# lingering overflow sender tasks remaining?
@ -2460,7 +2449,7 @@ async def open_context_from_portal(
#
# NOTE: further, this should be the only place the
# underlying feeder channel is
# once-and-only-CLOSED!
# once-forever-and-only-CLOSED!
with trio.CancelScope(shield=True):
await ctx._rx_chan.aclose()

View File

@ -518,7 +518,6 @@ class RemoteActorError(Exception):
def pformat(
self,
with_type_header: bool = True,
# with_ascii_box: bool = True,
) -> str:
'''
@ -885,9 +884,9 @@ class MsgTypeError(
extra_msgdata['_bad_msg'] = bad_msg
extra_msgdata['cid'] = bad_msg.cid
extra_msgdata.setdefault('boxed_type', cls)
return cls(
message=message,
boxed_type=cls,
**extra_msgdata,
)
@ -1111,7 +1110,7 @@ def is_multi_cancelled(
def _raise_from_unexpected_msg(
ctx: Context,
msg: MsgType,
src_err: AttributeError,
src_err: Exception,
log: StackLevelAdapter, # caller specific `log` obj
expect_msg: Type[MsgType],
@ -1212,7 +1211,7 @@ def _raise_from_unexpected_msg(
# in case there already is some underlying remote error
# that arrived which is probably the source of this stream
# closure
ctx.maybe_raise()
ctx.maybe_raise(from_src_exc=src_err)
raise eoc from src_err
# TODO: our own transport/IPC-broke error subtype?
@ -1361,6 +1360,7 @@ def _mk_msg_type_err(
message=message,
bad_msg=bad_msg,
bad_msg_as_dict=msg_dict,
boxed_type=type(src_validation_error),
# NOTE: for pld-spec MTEs we set the `._ipc_msg` manually:
# - for the send-side `.started()` pld-validate

View File

@ -80,7 +80,6 @@ from tractor.msg.types import (
Yield,
)
if TYPE_CHECKING:
from ._runtime import Actor
@ -328,7 +327,6 @@ async def _errors_relayed_via_ipc(
f'|_{ctx}'
)
# ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc:
@ -819,6 +817,12 @@ async def try_ship_error_to_remote(
# TODO: use `.msg.preetty_struct` for this!
f'{msg}\n'
)
except BaseException:
log.exception(
'Errored while attempting error shipment?'
)
__tracebackhide__: bool = False
raise
async def process_messages(

View File

@ -233,6 +233,7 @@ class MsgStream(trio.abc.Channel):
# ctx: Context = self._ctx
ctx.maybe_raise(
raise_ctxc_from_self_call=True,
from_src_exc=src_err,
)
# propagate any error but hide low-level frame details

View File

@ -1600,25 +1600,27 @@ async def _pause(
f'REPL: {Lock.repl}\n'
# TODO: use `._frame_stack` scanner to find the @api_frame
)
await trio.lowlevel.checkpoint()
with trio.CancelScope(shield=shield):
await trio.lowlevel.checkpoint()
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
if Lock._debug_lock.locked():
log.warning(
'attempting to shield-acquire active TTY lock owned by\n'
f'{ctx}'
)
with trio.CancelScope(shield=shield):
if Lock._debug_lock.locked():
log.warning(
'attempting to shield-acquire active TTY lock owned by\n'
f'{ctx}'
)
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True):
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
# with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire()
else:
# may be cancelled
await Lock._debug_lock.acquire()
else:
# may be cancelled
await Lock._debug_lock.acquire()
# enter REPL from root, no TTY locking IPC ctx necessary
_enter_repl_sync(debug_func)
@ -1659,7 +1661,8 @@ async def _pause(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'ignoring..'
)
await trio.lowlevel.checkpoint()
with trio.CancelScope(shield=shield):
await trio.lowlevel.checkpoint()
return
else:
@ -1671,8 +1674,9 @@ async def _pause(
f'{task}@{actor.uid} already has TTY lock\n'
f'waiting for release..'
)
await DebugStatus.repl_release.wait()
await trio.sleep(0.1)
with trio.CancelScope(shield=shield):
await DebugStatus.repl_release.wait()
await trio.sleep(0.1)
elif (
req_task
@ -1683,7 +1687,8 @@ async def _pause(
'Waiting for previous request to complete..\n'
)
await DebugStatus.req_finished.wait()
with trio.CancelScope(shield=shield):
await DebugStatus.req_finished.wait()
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
@ -1721,14 +1726,15 @@ async def _pause(
'Starting request task\n'
f'|_{task}\n'
)
req_ctx: Context = await actor._service_n.start(
partial(
request_root_stdio_lock,
actor_uid=actor.uid,
task_uid=(task.name, id(task)), # task uuid (effectively)
shield=shield,
with trio.CancelScope(shield=shield):
req_ctx: Context = await actor._service_n.start(
partial(
request_root_stdio_lock,
actor_uid=actor.uid,
task_uid=(task.name, id(task)), # task uuid (effectively)
shield=shield,
)
)
)
# XXX sanity, our locker task should be the one which
# entered a new IPC ctx with the root actor, NOT the one
# that exists around the task calling into `._pause()`.
@ -2147,6 +2153,13 @@ async def post_mortem(
**_pause_kwargs,
) -> None:
'''
`tractor`'s builtin async equivalient of `pdb.post_mortem()`
which can be used inside exception handlers.
It's also used for the crash handler when `debug_mode == True` ;)
'''
__tracebackhide__: bool = hide_tb
tb: TracebackType = tb or sys.exc_info()[2]

View File

@ -167,7 +167,7 @@ class PldRx(Struct):
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = False,
**dec_msg_kwargs,
**dec_pld_kwargs,
) -> Any|Raw:
__tracebackhide__: bool = hide_tb
@ -179,12 +179,12 @@ class PldRx(Struct):
# sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait()
)
return self.dec_msg(
return self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**dec_msg_kwargs,
**dec_pld_kwargs,
)
async def recv_pld(
@ -194,7 +194,7 @@ class PldRx(Struct):
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
**dec_msg_kwargs,
**dec_pld_kwargs,
) -> Any|Raw:
'''
@ -208,17 +208,14 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive()
)
return self.dec_msg(
return self.decode_pld(
msg=msg,
ipc=ipc,
expect_msg=expect_msg,
**dec_msg_kwargs,
**dec_pld_kwargs,
)
# TODO: rename to,
# -[ ] `.decode_pld()`?
# -[ ] `.dec_pld()`?
def dec_msg(
def decode_pld(
self,
msg: MsgType,
ipc: Context|MsgStream,
@ -299,9 +296,6 @@ class PldRx(Struct):
if not is_started_send_side
else ipc._actor.uid
),
# tb=valerr.__traceback__,
# tb_str=mte._message,
# message=mte._message,
)
mte._ipc_msg = err_msg
@ -317,29 +311,6 @@ class PldRx(Struct):
# validation error.
src_err = valerr
# TODO: should we instead make this explicit and
# use the above masked `is_started_send_decode`,
# expecting the `Context.started()` caller to set
# it? Rn this is kinda, howyousayyy, implicitly
# edge-case-y..
# TODO: remove this since it's been added to
# `_raise_from_unexpected_msg()`..?
# if (
# expect_msg is not Started
# and not is_started_send_side
# ):
# # set emulated remote error more-or-less as the
# # runtime would
# ctx: Context = getattr(ipc, 'ctx', ipc)
# ctx._maybe_cancel_and_set_remote_error(mte)
# XXX some other decoder specific failure?
# except TypeError as src_error:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise src_error
# ^-TODO-^ can remove?
# a runtime-internal RPC endpoint response.
# always passthrough since (internal) runtime
# responses are generally never exposed to consumer
@ -435,6 +406,8 @@ class PldRx(Struct):
__tracebackhide__: bool = False
raise
dec_msg = decode_pld
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
@ -463,7 +436,7 @@ class PldRx(Struct):
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.dec_msg(
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
@ -610,7 +583,10 @@ async def drain_to_final_msg(
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
ctx.maybe_raise()
ctx.maybe_raise(
# TODO: when use this/
# from_src_exc=taskc,
)
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
@ -783,7 +759,7 @@ def validate_payload_msg(
try:
roundtripped: Started = codec.decode(msg_bytes)
ctx: Context = getattr(ipc, 'ctx', ipc)
pld: PayloadT = ctx.pld_rx.dec_msg(
pld: PayloadT = ctx.pld_rx.decode_pld(
msg=roundtripped,
ipc=ipc,
expect_msg=Started,