Compare commits

..

12 Commits

Author SHA1 Message Date
Tyler Goodlet eef540c94c Add `debug_mode: bool` control to task mngr
Allows dynamically importing `pdbp` when enabled and a way for
eventually linking with `tractor`'s own debug mode flag.
2025-03-10 12:33:36 -04:00
Tyler Goodlet 49a07e070e Go all in on "task manager" naming 2025-03-10 12:33:36 -04:00
Tyler Goodlet 0409566b65 More refinements and proper typing
- drop unneeded (and commented) internal cs allocating bits.
- bypass all task manager stuff if no generator is provided by the
  caller; i.e. just call `.start_soon()` as normal.
- fix `Generator` typing.
- add some prints around task manager.
- wrap in `TaskOutcome.lowlevel_task: Task`.
2025-03-10 12:33:36 -04:00
Tyler Goodlet 8d67af1273 Ensure user-allocated cancel scope just works!
Turns out the nursery doesn't have to care about allocating a per task
`CancelScope` since the user can just do that in the
`@task_scope_manager` if desired B) So just mask all the nursery cs
allocating with the intention of removal.

Also add a test for per-task-cancellation by starting the crash task as
a `trio.sleep_forever()` but then cancel it via the user allocated cs
and ensure the crash propagates as expected 💥
2025-03-10 12:33:36 -04:00
Tyler Goodlet ce33a2d612 Facepalm, don't pass in unecessary cancel scope 2025-03-10 12:33:36 -04:00
Tyler Goodlet f47179239c Do renaming, implement lowlevel `Outcome` sending
As was listed in the many todos, this changes the `.start_soon()` impl
to instead (manually) `.send()` into the user defined
`@task_scope_manager` an `Outcome` from the spawned task. In this case
the task manager wraps that in a user defined (and renamed)
`TaskOutcome` and delivers that + a containing `trio.CancelScope` to the
`.start_soon()` caller. Here the user defined `TaskOutcome` defines
a `.wait_for_result()` method that can be used to await the task's exit
and handle it's underlying returned value or raised error; the
implementation could be different and subject to the user's own whims.

Note that by default, if this was added to `trio`'s core, the
`@task_scope_manager` would simply be implemented as either a `None`
yielding single-yield-generator but more likely just entirely ignored
by the runtime (as in no manual task outcome collecting, generator
calling and sending is done at all) by default if the user does not provide
the `task_scope_manager` to the nursery at open time.
2025-03-10 12:33:36 -04:00
Tyler Goodlet 3d7df41b2d Alias to `@acm` in broadcaster mod 2025-03-10 12:33:36 -04:00
Tyler Goodlet 947db4a2d2 Initial prototype for a one-cancels-one style supervisor, nursery thing.. 2025-03-10 12:33:36 -04:00
Tyler Goodlet 0f35ca67aa Use shorthand nursery var-names per convention in codebase 2025-03-10 12:33:36 -04:00
Tyler Goodlet 6f1bc1e8a1 Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2025-03-10 12:33:36 -04:00
Tyler Goodlet 10de89741e Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2025-03-10 12:33:36 -04:00
Tyler Goodlet 578b96c6e3 Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2025-03-10 12:33:36 -04:00
13 changed files with 213 additions and 423 deletions

View File

View File

@ -22,7 +22,7 @@ from tractor.devx._debug import (
_repl_fail_msg as _repl_fail_msg, _repl_fail_msg as _repl_fail_msg,
_ctlc_ignore_header as _ctlc_ignore_header, _ctlc_ignore_header as _ctlc_ignore_header,
) )
from ..conftest import ( from conftest import (
_ci_env, _ci_env,
) )

View File

@ -14,7 +14,7 @@ import tractor
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
) )
from .conftest import no_windows from conftest import no_windows
def is_win(): def is_win():

View File

@ -38,9 +38,9 @@ from tractor._testing import (
# - standard setup/teardown: # - standard setup/teardown:
# ``Portal.open_context()`` starts a new # ``Portal.open_context()`` starts a new
# remote task context in another actor. The target actor's task must # remote task context in another actor. The target actor's task must
# call ``Context.started()`` to unblock this entry on the parent side. # call ``Context.started()`` to unblock this entry on the caller side.
# the child task executes until complete and returns a final value # the callee task executes until complete and returns a final value
# which is delivered to the parent side and retreived via # which is delivered to the caller side and retreived via
# ``Context.result()``. # ``Context.result()``.
# - cancel termination: # - cancel termination:
@ -170,9 +170,9 @@ async def assert_state(value: bool):
[False, ValueError, KeyboardInterrupt], [False, ValueError, KeyboardInterrupt],
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'child_blocks_forever', 'callee_blocks_forever',
[False, True], [False, True],
ids=lambda item: f'child_blocks_forever={item}' ids=lambda item: f'callee_blocks_forever={item}'
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'pointlessly_open_stream', 'pointlessly_open_stream',
@ -181,7 +181,7 @@ async def assert_state(value: bool):
) )
def test_simple_context( def test_simple_context(
error_parent, error_parent,
child_blocks_forever, callee_blocks_forever,
pointlessly_open_stream, pointlessly_open_stream,
debug_mode: bool, debug_mode: bool,
): ):
@ -204,13 +204,13 @@ def test_simple_context(
portal.open_context( portal.open_context(
simple_setup_teardown, simple_setup_teardown,
data=10, data=10,
block_forever=child_blocks_forever, block_forever=callee_blocks_forever,
) as (ctx, sent), ) as (ctx, sent),
): ):
assert current_ipc_ctx() is ctx assert current_ipc_ctx() is ctx
assert sent == 11 assert sent == 11
if child_blocks_forever: if callee_blocks_forever:
await portal.run(assert_state, value=True) await portal.run(assert_state, value=True)
else: else:
assert await ctx.result() == 'yo' assert await ctx.result() == 'yo'
@ -220,7 +220,7 @@ def test_simple_context(
if error_parent: if error_parent:
raise error_parent raise error_parent
if child_blocks_forever: if callee_blocks_forever:
await ctx.cancel() await ctx.cancel()
else: else:
# in this case the stream will send a # in this case the stream will send a
@ -259,9 +259,9 @@ def test_simple_context(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'child_returns_early', 'callee_returns_early',
[True, False], [True, False],
ids=lambda item: f'child_returns_early={item}' ids=lambda item: f'callee_returns_early={item}'
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'cancel_method', 'cancel_method',
@ -273,14 +273,14 @@ def test_simple_context(
[True, False], [True, False],
ids=lambda item: f'chk_ctx_result_before_exit={item}' ids=lambda item: f'chk_ctx_result_before_exit={item}'
) )
def test_parent_cancels( def test_caller_cancels(
cancel_method: str, cancel_method: str,
chk_ctx_result_before_exit: bool, chk_ctx_result_before_exit: bool,
child_returns_early: bool, callee_returns_early: bool,
debug_mode: bool, debug_mode: bool,
): ):
''' '''
Verify that when the opening side of a context (aka the parent) Verify that when the opening side of a context (aka the caller)
cancels that context, the ctx does not raise a cancelled when cancels that context, the ctx does not raise a cancelled when
either calling `.result()` or on context exit. either calling `.result()` or on context exit.
@ -294,7 +294,7 @@ def test_parent_cancels(
if ( if (
cancel_method == 'portal' cancel_method == 'portal'
and not child_returns_early and not callee_returns_early
): ):
try: try:
res = await ctx.result() res = await ctx.result()
@ -318,7 +318,7 @@ def test_parent_cancels(
pytest.fail(f'should not have raised ctxc\n{ctxc}') pytest.fail(f'should not have raised ctxc\n{ctxc}')
# we actually get a result # we actually get a result
if child_returns_early: if callee_returns_early:
assert res == 'yo' assert res == 'yo'
assert ctx.outcome is res assert ctx.outcome is res
assert ctx.maybe_error is None assert ctx.maybe_error is None
@ -362,14 +362,14 @@ def test_parent_cancels(
) )
timeout: float = ( timeout: float = (
0.5 0.5
if not child_returns_early if not callee_returns_early
else 2 else 2
) )
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with ( async with (
expect_ctxc( expect_ctxc(
yay=( yay=(
not child_returns_early not callee_returns_early
and cancel_method == 'portal' and cancel_method == 'portal'
) )
), ),
@ -377,13 +377,13 @@ def test_parent_cancels(
portal.open_context( portal.open_context(
simple_setup_teardown, simple_setup_teardown,
data=10, data=10,
block_forever=not child_returns_early, block_forever=not callee_returns_early,
) as (ctx, sent), ) as (ctx, sent),
): ):
if child_returns_early: if callee_returns_early:
# ensure we block long enough before sending # ensure we block long enough before sending
# a cancel such that the child has already # a cancel such that the callee has already
# returned it's result. # returned it's result.
await trio.sleep(0.5) await trio.sleep(0.5)
@ -421,7 +421,7 @@ def test_parent_cancels(
# which should in turn cause `ctx._scope` to # which should in turn cause `ctx._scope` to
# catch any cancellation? # catch any cancellation?
if ( if (
not child_returns_early not callee_returns_early
and cancel_method != 'portal' and cancel_method != 'portal'
): ):
assert not ctx._scope.cancelled_caught assert not ctx._scope.cancelled_caught
@ -430,11 +430,11 @@ def test_parent_cancels(
# basic stream terminations: # basic stream terminations:
# - child context closes without using stream # - callee context closes without using stream
# - parent context closes without using stream # - caller context closes without using stream
# - parent context calls `Context.cancel()` while streaming # - caller context calls `Context.cancel()` while streaming
# is ongoing resulting in child being cancelled # is ongoing resulting in callee being cancelled
# - child calls `Context.cancel()` while streaming and parent # - callee calls `Context.cancel()` while streaming and caller
# sees stream terminated in `RemoteActorError` # sees stream terminated in `RemoteActorError`
# TODO: future possible features # TODO: future possible features
@ -443,6 +443,7 @@ def test_parent_cancels(
@tractor.context @tractor.context
async def close_ctx_immediately( async def close_ctx_immediately(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -453,24 +454,13 @@ async def close_ctx_immediately(
async with ctx.open_stream(): async with ctx.open_stream():
pass pass
print('child returning!')
@pytest.mark.parametrize(
'parent_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@tractor_test @tractor_test
async def test_child_exits_ctx_after_stream_open( async def test_callee_closes_ctx_after_stream_open(
debug_mode: bool, debug_mode: bool,
parent_send_before_receive: bool,
): ):
''' '''
child context closes without using stream. callee context closes without using stream.
This should result in a msg sequence This should result in a msg sequence
|_<root>_ |_<root>_
@ -484,9 +474,6 @@ async def test_child_exits_ctx_after_stream_open(
=> {'stop': True, 'cid': <str>} => {'stop': True, 'cid': <str>}
''' '''
timeout: float = (
0.5 if not debug_mode else 999
)
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
@ -495,7 +482,7 @@ async def test_child_exits_ctx_after_stream_open(
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(timeout): with trio.fail_after(0.5):
async with portal.open_context( async with portal.open_context(
close_ctx_immediately, close_ctx_immediately,
@ -507,56 +494,41 @@ async def test_child_exits_ctx_after_stream_open(
with trio.fail_after(0.4): with trio.fail_after(0.4):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
if parent_send_before_receive:
print('sending first msg from parent!')
await stream.send('yo')
# should fall through since ``StopAsyncIteration`` # should fall through since ``StopAsyncIteration``
# should be raised through translation of # should be raised through translation of
# a ``trio.EndOfChannel`` by # a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()`` # ``trio.abc.ReceiveChannel.__anext__()``
msg = 10 async for _ in stream:
async for msg in stream:
# trigger failure if we DO NOT # trigger failure if we DO NOT
# get an EOC! # get an EOC!
assert 0 assert 0
else: else:
# never should get anythinig new from
# the underlying stream
assert msg == 10
# verify stream is now closed # verify stream is now closed
try: try:
with trio.fail_after(0.3): with trio.fail_after(0.3):
print('parent trying to `.receive()` on EoC stream!')
await stream.receive() await stream.receive()
assert 0, 'should have raised eoc!?'
except trio.EndOfChannel: except trio.EndOfChannel:
print('parent got EoC as expected!')
pass pass
# raise
# TODO: should be just raise the closed resource err # TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open # directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of # of a stream to the context (at least until a time of
# if/when we decide that's a good idea?) # if/when we decide that's a good idea?)
try: try:
with trio.fail_after(timeout): with trio.fail_after(0.5):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pass pass
except trio.ClosedResourceError: except trio.ClosedResourceError:
pass pass
# if ctx._rx_chan._state.data:
# await tractor.pause()
await portal.cancel_actor() await portal.cancel_actor()
@tractor.context @tractor.context
async def expect_cancelled( async def expect_cancelled(
ctx: Context, ctx: Context,
send_before_receive: bool = False,
) -> None: ) -> None:
global _state global _state
@ -566,10 +538,6 @@ async def expect_cancelled(
try: try:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
if send_before_receive:
await stream.send('yo')
async for msg in stream: async for msg in stream:
await stream.send(msg) # echo server await stream.send(msg) # echo server
@ -596,49 +564,26 @@ async def expect_cancelled(
raise raise
else: else:
assert 0, "child wasn't cancelled !?" assert 0, "callee wasn't cancelled !?"
@pytest.mark.parametrize(
'child_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@pytest.mark.parametrize(
'rent_wait_for_msg',
[
False,
True,
],
ids=lambda item: f'rent_wait_for_msg={item}'
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'use_ctx_cancel_method', 'use_ctx_cancel_method',
[ [False, True],
False,
'pre_stream',
'post_stream_open',
'post_stream_close',
],
ids=lambda item: f'use_ctx_cancel_method={item}'
) )
@tractor_test @tractor_test
async def test_parent_exits_ctx_after_child_enters_stream( async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool|str, use_ctx_cancel_method: bool,
debug_mode: bool, debug_mode: bool,
rent_wait_for_msg: bool,
child_send_before_receive: bool,
): ):
''' '''
Parent-side of IPC context closes without sending on `MsgStream`. caller context closes without using/opening stream
''' '''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
root: Actor = current_actor() root: Actor = current_actor()
portal = await an.start_actor( portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
@ -647,52 +592,41 @@ async def test_parent_exits_ctx_after_child_enters_stream(
async with portal.open_context( async with portal.open_context(
expect_cancelled, expect_cancelled,
send_before_receive=child_send_before_receive,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
await portal.run(assert_state, value=True) await portal.run(assert_state, value=True)
# call `ctx.cancel()` explicitly # call `ctx.cancel()` explicitly
if use_ctx_cancel_method == 'pre_stream': if use_ctx_cancel_method:
await ctx.cancel() await ctx.cancel()
# NOTE: means the local side `ctx._scope` will # NOTE: means the local side `ctx._scope` will
# have been cancelled by an ctxc ack and thus # have been cancelled by an ctxc ack and thus
# `._scope.cancelled_caught` should be set. # `._scope.cancelled_caught` should be set.
async with ( try:
expect_ctxc(
# XXX: the cause is US since we call
# `Context.cancel()` just above!
yay=True,
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
reraise=True,
) as maybe_ctxc,
):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for msg in stream:
pass
if rent_wait_for_msg: except tractor.ContextCancelled as ctxc:
async for msg in stream: # XXX: the cause is US since we call
print(f'PARENT rx: {msg!r}\n') # `Context.cancel()` just above!
break assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
if use_ctx_cancel_method == 'post_stream_open': # XXX: must be propagated to __aexit__
await ctx.cancel() # and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
if use_ctx_cancel_method == 'post_stream_close': else:
await ctx.cancel() assert 0, "Should have context cancelled?"
ctxc: tractor.ContextCancelled = maybe_ctxc.value
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# channel should still be up # channel should still be up
assert portal.channel.connected() assert portal.channel.connected()
@ -703,20 +637,13 @@ async def test_parent_exits_ctx_after_child_enters_stream(
value=False, value=False,
) )
# XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
# `.open_context()` before the child has returned,
# errored or been cancelled!
else: else:
try: try:
with trio.fail_after( with trio.fail_after(0.2):
0.5 # if not debug_mode else 999 await ctx.result()
):
res = await ctx.wait_for_result()
assert res is not tractor._context.Unresolved
assert 0, "Callee should have blocked!?" assert 0, "Callee should have blocked!?"
except trio.TooSlowError: except trio.TooSlowError:
# NO-OP -> since already triggered by # NO-OP -> since already called above
# `trio.fail_after()` above!
await ctx.cancel() await ctx.cancel()
# NOTE: local scope should have absorbed the cancellation since # NOTE: local scope should have absorbed the cancellation since
@ -756,7 +683,7 @@ async def test_parent_exits_ctx_after_child_enters_stream(
@tractor_test @tractor_test
async def test_multitask_parent_cancels_from_nonroot_task( async def test_multitask_caller_cancels_from_nonroot_task(
debug_mode: bool, debug_mode: bool,
): ):
async with tractor.open_nursery( async with tractor.open_nursery(
@ -808,6 +735,7 @@ async def test_multitask_parent_cancels_from_nonroot_task(
@tractor.context @tractor.context
async def cancel_self( async def cancel_self(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -847,11 +775,11 @@ async def cancel_self(
@tractor_test @tractor_test
async def test_child_cancels_before_started( async def test_callee_cancels_before_started(
debug_mode: bool, debug_mode: bool,
): ):
''' '''
Callee calls `Context.cancel()` while streaming and parent Callee calls `Context.cancel()` while streaming and caller
sees stream terminated in `ContextCancelled`. sees stream terminated in `ContextCancelled`.
''' '''
@ -898,13 +826,14 @@ async def never_open_stream(
@tractor.context @tractor.context
async def keep_sending_from_child( async def keep_sending_from_callee(
ctx: Context, ctx: Context,
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
) -> None: ) -> None:
''' '''
Send endlessly on the child stream. Send endlessly on the calleee stream.
''' '''
await ctx.started() await ctx.started()
@ -912,7 +841,7 @@ async def keep_sending_from_child(
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
) as stream: ) as stream:
for msg in count(): for msg in count():
print(f'child sending {msg}') print(f'callee sending {msg}')
await stream.send(msg) await stream.send(msg)
await trio.sleep(0.01) await trio.sleep(0.01)
@ -920,12 +849,12 @@ async def keep_sending_from_child(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'overrun_by', 'overrun_by',
[ [
('parent', 1, never_open_stream), ('caller', 1, never_open_stream),
('child', 0, keep_sending_from_child), ('callee', 0, keep_sending_from_callee),
], ],
ids=[ ids=[
('parent_1buf_never_open_stream'), ('caller_1buf_never_open_stream'),
('child_0buf_keep_sending_from_child'), ('callee_0buf_keep_sending_from_callee'),
] ]
) )
def test_one_end_stream_not_opened( def test_one_end_stream_not_opened(
@ -956,7 +885,8 @@ def test_one_end_stream_not_opened(
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
if 'parent' in overrunner: if 'caller' in overrunner:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# itersend +1 msg more then the buffer size # itersend +1 msg more then the buffer size
@ -971,7 +901,7 @@ def test_one_end_stream_not_opened(
await trio.sleep_forever() await trio.sleep_forever()
else: else:
# child overruns parent case so we do nothing here # callee overruns caller case so we do nothing here
await trio.sleep_forever() await trio.sleep_forever()
await portal.cancel_actor() await portal.cancel_actor()
@ -979,19 +909,19 @@ def test_one_end_stream_not_opened(
# 2 overrun cases and the no overrun case (which pushes right up to # 2 overrun cases and the no overrun case (which pushes right up to
# the msg limit) # the msg limit)
if ( if (
overrunner == 'parent' overrunner == 'caller'
): ):
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.boxed_type == StreamOverrun assert excinfo.value.boxed_type == StreamOverrun
elif overrunner == 'child': elif overrunner == 'callee':
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
# TODO: embedded remote errors so that we can verify the source # TODO: embedded remote errors so that we can verify the source
# error? the child delivers an error which is an overrun # error? the callee delivers an error which is an overrun
# wrapped in a remote actor error. # wrapped in a remote actor error.
assert excinfo.value.boxed_type == tractor.RemoteActorError assert excinfo.value.boxed_type == tractor.RemoteActorError
@ -1001,7 +931,8 @@ def test_one_end_stream_not_opened(
@tractor.context @tractor.context
async def echo_back_sequence( async def echo_back_sequence(
ctx: Context,
ctx: Context,
seq: list[int], seq: list[int],
wait_for_cancel: bool, wait_for_cancel: bool,
allow_overruns_side: str, allow_overruns_side: str,
@ -1010,12 +941,12 @@ async def echo_back_sequence(
) -> None: ) -> None:
''' '''
Send endlessly on the child stream using a small buffer size Send endlessly on the calleee stream using a small buffer size
setting on the contex to simulate backlogging that would normally setting on the contex to simulate backlogging that would normally
cause overruns. cause overruns.
''' '''
# NOTE: ensure that if the parent is expecting to cancel this task # NOTE: ensure that if the caller is expecting to cancel this task
# that we stay echoing much longer then they are so we don't # that we stay echoing much longer then they are so we don't
# return early instead of receive the cancel msg. # return early instead of receive the cancel msg.
total_batches: int = ( total_batches: int = (
@ -1065,18 +996,18 @@ async def echo_back_sequence(
if be_slow: if be_slow:
await trio.sleep(0.05) await trio.sleep(0.05)
print('child waiting on next') print('callee waiting on next')
print(f'child echoing back latest batch\n{batch}') print(f'callee echoing back latest batch\n{batch}')
for msg in batch: for msg in batch:
print(f'child sending msg\n{msg}') print(f'callee sending msg\n{msg}')
await stream.send(msg) await stream.send(msg)
try: try:
return 'yo' return 'yo'
finally: finally:
print( print(
'exiting child with context:\n' 'exiting callee with context:\n'
f'{pformat(ctx)}\n' f'{pformat(ctx)}\n'
) )
@ -1130,7 +1061,7 @@ def test_maybe_allow_overruns_stream(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
portal = await an.start_actor( portal = await an.start_actor(
'child_sends_forever', 'callee_sends_forever',
enable_modules=[__name__], enable_modules=[__name__],
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,

View File

@ -10,7 +10,7 @@ import tractor
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
) )
from .conftest import ( from conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
_INT_RETURN_CODE, _INT_RETURN_CODE,

View File

@ -82,7 +82,6 @@ from .msg import (
MsgType, MsgType,
NamespacePath, NamespacePath,
PayloadT, PayloadT,
Return,
Started, Started,
Stop, Stop,
Yield, Yield,
@ -246,13 +245,11 @@ class Context:
# a drain loop? # a drain loop?
# _res_scope: trio.CancelScope|None = None # _res_scope: trio.CancelScope|None = None
_outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value # on a clean exit there should be a final value
# delivered from the far end "callee" task, so # delivered from the far end "callee" task, so
# this value is only set on one side. # this value is only set on one side.
# _result: Any | int = None # _result: Any | int = None
_result: PayloadT|Unresolved = Unresolved _result: Any|Unresolved = Unresolved
# if the local "caller" task errors this value is always set # if the local "caller" task errors this value is always set
# to the error that was captured in the # to the error that was captured in the
@ -1202,11 +1199,9 @@ class Context:
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
if not self._portal: assert self._portal, (
raise RuntimeError( '`Context.wait_for_result()` can not be called from callee side!'
'Invalid usage of `Context.wait_for_result()`!\n' )
'Not valid on child-side IPC ctx!\n'
)
if self._final_result_is_set(): if self._final_result_is_set():
return self._result return self._result
@ -1227,8 +1222,6 @@ class Context:
# since every message should be delivered via the normal # since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set # `._deliver_msg()` route which will appropriately set
# any `.maybe_error`. # any `.maybe_error`.
outcome_msg: Return|Error|ContextCancelled
drained_msgs: list[MsgType]
( (
outcome_msg, outcome_msg,
drained_msgs, drained_msgs,
@ -1236,19 +1229,11 @@ class Context:
ctx=self, ctx=self,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
drained_status: str = ( drained_status: str = (
'Ctx drained to final outcome msg\n\n' 'Ctx drained to final outcome msg\n\n'
f'{outcome_msg}\n' f'{outcome_msg}\n'
) )
# ?XXX, should already be set in `._deliver_msg()` right?
if self._outcome_msg is not Unresolved:
# from .devx import _debug
# await _debug.pause()
assert self._outcome_msg is outcome_msg
else:
self._outcome_msg = outcome_msg
if drained_msgs: if drained_msgs:
drained_status += ( drained_status += (
'\n' '\n'
@ -1756,6 +1741,7 @@ class Context:
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
# NOTE: if an error is deteced we should always still # NOTE: if an error is deteced we should always still
# send it through the feeder-mem-chan and expect # send it through the feeder-mem-chan and expect
# it to be raised by any context (stream) consumer # it to be raised by any context (stream) consumer
@ -1767,21 +1753,6 @@ class Context:
# normally the task that should get cancelled/error # normally the task that should get cancelled/error
# from some remote fault! # from some remote fault!
send_chan.send_nowait(msg) send_chan.send_nowait(msg)
match msg:
case Stop():
if (stream := self._stream):
stream._stop_msg = msg
case Return():
if not self._outcome_msg:
log.warning(
f'Setting final outcome msg AFTER '
f'`._rx_chan.send()`??\n'
f'\n'
f'{msg}'
)
self._outcome_msg = msg
return True return True
except trio.BrokenResourceError: except trio.BrokenResourceError:
@ -2038,7 +2009,7 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
try: try:
started_msg, first = await ctx._pld_rx.recv_msg( started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
passthrough_non_pld_msgs=False, passthrough_non_pld_msgs=False,
@ -2403,8 +2374,7 @@ async def open_context_from_portal(
# displaying `ContextCancelled` traces where the # displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in # cause of crash/exit IS due to something in
# user/app code on either end of the context. # user/app code on either end of the context.
and and not rxchan._closed
not rxchan._closed
): ):
# XXX NOTE XXX: and again as per above, we mask any # XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask # `trio.Cancelled` raised here so as to NOT mask
@ -2463,7 +2433,6 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit!
log.runtime( log.runtime(
# log.cancel(
f'De-allocating IPC ctx opened with {ctx.side!r} peer \n' f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'uid: {uid}\n' f'uid: {uid}\n'
f'cid: {ctx.cid}\n' f'cid: {ctx.cid}\n'
@ -2519,6 +2488,7 @@ def mk_context(
_caller_info=caller_info, _caller_info=caller_info,
**kwargs, **kwargs,
) )
pld_rx._ctx = ctx
ctx._result = Unresolved ctx._result = Unresolved
return ctx return ctx

View File

@ -184,7 +184,7 @@ class Portal:
( (
self._final_result_msg, self._final_result_msg,
self._final_result_pld, self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg( ) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
ipc=self._expect_result_ctx, ipc=self._expect_result_ctx,
expect_msg=Return, expect_msg=Return,
) )

View File

@ -650,10 +650,6 @@ async def _invoke(
) )
# set and shuttle final result to "parent"-side task. # set and shuttle final result to "parent"-side task.
ctx._result = res ctx._result = res
log.runtime(
f'Sending result msg and exiting {ctx.side!r}\n'
f'{return_msg}\n'
)
await chan.send(return_msg) await chan.send(return_msg)
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is

View File

@ -840,10 +840,8 @@ class Actor:
)] )]
except KeyError: except KeyError:
report: str = ( report: str = (
'Ignoring invalid IPC msg!?\n' 'Ignoring invalid IPC ctx msg!\n\n'
f'Ctx seems to not/no-longer exist??\n' f'<=? {uid}\n\n'
f'\n'
f'<=? {uid}\n'
f' |_{pretty_struct.pformat(msg)}\n' f' |_{pretty_struct.pformat(msg)}\n'
) )
match msg: match msg:

View File

@ -45,11 +45,9 @@ from .trionics import (
BroadcastReceiver, BroadcastReceiver,
) )
from tractor.msg import ( from tractor.msg import (
Error, # Return,
Return, # Stop,
Stop,
MsgType, MsgType,
PayloadT,
Yield, Yield,
) )
@ -72,7 +70,8 @@ class MsgStream(trio.abc.Channel):
A bidirectional message stream for receiving logically sequenced A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC `Channel`. values over an inter-actor IPC `Channel`.
This is the type returned to a local task which entered either
`Portal.open_stream_from()` or `Context.open_stream()`.
Termination rules: Termination rules:
@ -95,9 +94,6 @@ class MsgStream(trio.abc.Channel):
self._rx_chan = rx_chan self._rx_chan = rx_chan
self._broadcaster = _broadcaster self._broadcaster = _broadcaster
# any actual IPC msg which is effectively an `EndOfStream`
self._stop_msg: bool|Stop = False
# flag to denote end of stream # flag to denote end of stream
self._eoc: bool|trio.EndOfChannel = False self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
@ -129,67 +125,16 @@ class MsgStream(trio.abc.Channel):
def receive_nowait( def receive_nowait(
self, self,
expect_msg: MsgType = Yield, expect_msg: MsgType = Yield,
) -> PayloadT: ):
ctx: Context = self._ctx ctx: Context = self._ctx
( return ctx._pld_rx.recv_pld_nowait(
msg,
pld,
) = ctx._pld_rx.recv_msg_nowait(
ipc=self, ipc=self,
expect_msg=expect_msg, expect_msg=expect_msg,
) )
# ?TODO, maybe factor this into a hyper-common `unwrap_pld()`
#
match msg:
# XXX, these never seems to ever hit? cool?
case Stop():
log.cancel(
f'Msg-stream was ended via stop msg\n'
f'{msg}'
)
case Error():
log.error(
f'Msg-stream was ended via error msg\n'
f'{msg}'
)
# XXX NOTE, always set any final result on the ctx to
# avoid teardown race conditions where previously this msg
# would be consumed silently (by `.aclose()` doing its
# own "msg drain loop" but WITHOUT those `drained: lists[MsgType]`
# being post-close-processed!
#
# !!TODO, see the equiv todo-comment in `.receive()`
# around the `if drained:` where we should prolly
# ACTUALLY be doing this post-close processing??
#
case Return(pld=pld):
log.warning(
f'Msg-stream final result msg for IPC ctx?\n'
f'{msg}'
)
# XXX TODO, this **should be covered** by higher
# scoped runtime-side method calls such as
# `Context._deliver_msg()`, so you should never
# really see the warning above or else something
# racy/out-of-order is likely going on between
# actor-runtime-side push tasks and the user-app-side
# consume tasks!
# -[ ] figure out that set of race cases and fix!
# -[ ] possibly return the `msg` given an input
# arg-flag is set so we can process the `Return`
# from the `.aclose()` caller?
#
# breakpoint() # to debug this RACE CASE!
ctx._result = pld
ctx._outcome_msg = msg
return pld
async def receive( async def receive(
self, self,
hide_tb: bool = False, hide_tb: bool = False,
): ):
''' '''
@ -209,7 +154,7 @@ class MsgStream(trio.abc.Channel):
# except trio.EndOfChannel: # except trio.EndOfChannel:
# raise StopAsyncIteration # raise StopAsyncIteration
# #
# see `.aclose()` for notes on the old behaviour prior to # see ``.aclose()`` for notes on the old behaviour prior to
# introducing this # introducing this
if self._eoc: if self._eoc:
raise self._eoc raise self._eoc
@ -220,11 +165,7 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb src_err: Exception|None = None # orig tb
try: try:
ctx: Context = self._ctx ctx: Context = self._ctx
pld = await ctx._pld_rx.recv_pld( return await ctx._pld_rx.recv_pld(ipc=self)
ipc=self,
expect_msg=Yield,
)
return pld
# XXX: the stream terminates on either of: # XXX: the stream terminates on either of:
# - `self._rx_chan.receive()` raising after manual closure # - `self._rx_chan.receive()` raising after manual closure
@ -233,7 +174,7 @@ class MsgStream(trio.abc.Channel):
# - via a `Stop`-msg received from remote peer task. # - via a `Stop`-msg received from remote peer task.
# NOTE # NOTE
# |_ previously this was triggered by calling # |_ previously this was triggered by calling
# `._rx_chan.aclose()` on the send side of the channel # ``._rx_chan.aclose()`` on the send side of the channel
# inside `Actor._deliver_ctx_payload()`, but now the 'stop' # inside `Actor._deliver_ctx_payload()`, but now the 'stop'
# message handling gets delegated to `PldRFx.recv_pld()` # message handling gets delegated to `PldRFx.recv_pld()`
# internals. # internals.
@ -257,14 +198,11 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose() drained: list[Exception|dict] = await self.aclose()
if drained: if drained:
# ^^^^^^^^TODO? pass these to the `._ctx._drained_msgs: # ?TODO? pass these to the `._ctx._drained_msgs: deque`
# deque` and then iterate them as part of any # and then iterate them as part of any `.wait_for_result()` call?
# `.wait_for_result()` call? #
# # from .devx import pause
# -[ ] move the match-case processing from # await pause()
# `.receive_nowait()` instead to right here, use it from
# a for msg in drained:` post-proc loop?
#
log.warning( log.warning(
'Drained context msgs during closure\n\n' 'Drained context msgs during closure\n\n'
f'{drained}' f'{drained}'
@ -327,6 +265,9 @@ class MsgStream(trio.abc.Channel):
- more or less we try to maintain adherance to trio's `.aclose()` semantics: - more or less we try to maintain adherance to trio's `.aclose()` semantics:
https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
''' '''
# rx_chan = self._rx_chan
# XXX NOTE XXX # XXX NOTE XXX
# it's SUPER IMPORTANT that we ensure we don't DOUBLE # it's SUPER IMPORTANT that we ensure we don't DOUBLE
# DRAIN msgs on closure so avoid getting stuck handing on # DRAIN msgs on closure so avoid getting stuck handing on
@ -338,16 +279,15 @@ class MsgStream(trio.abc.Channel):
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics. # per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
# import tractor
# await tractor.pause()
return [] return []
ctx: Context = self._ctx ctx: Context = self._ctx
drained: list[Exception|dict] = [] drained: list[Exception|dict] = []
while not drained: while not drained:
try: try:
maybe_final_msg: Yield|Return = self.receive_nowait( maybe_final_msg = self.receive_nowait(
expect_msg=Yield|Return, # allow_msgs=[Yield, Return],
expect_msg=Yield,
) )
if maybe_final_msg: if maybe_final_msg:
log.debug( log.debug(
@ -432,10 +372,8 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose() # await rx_chan.aclose()
if not self._eoc: if not self._eoc:
this_side: str = self._ctx.side
peer_side: str = self._ctx.peer_side
message: str = ( message: str = (
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n' f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
# } bc a stream is a "scope"/msging-phase inside an IPC # } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n' f'x}}>\n'
f' |_{self}\n' f' |_{self}\n'
@ -443,19 +381,9 @@ class MsgStream(trio.abc.Channel):
log.cancel(message) log.cancel(message)
self._eoc = trio.EndOfChannel(message) self._eoc = trio.EndOfChannel(message)
if (
(rx_chan := self._rx_chan)
and
(stats := rx_chan.statistics()).tasks_waiting_receive
):
log.cancel(
f'Msg-stream is closing but there is still reader tasks,\n'
f'{stats}\n'
)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <= # => NO, DEFINITELY NOT! <=
# if we're a bi-dir `MsgStream` BECAUSE this same # if we're a bi-dir ``MsgStream`` BECAUSE this same
# core-msg-loop mem recv-chan is used to deliver the # core-msg-loop mem recv-chan is used to deliver the
# potential final result from the surrounding inter-actor # potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that # `Context` so we don't want to close it until that

View File

@ -26,9 +26,6 @@ import os
import pathlib import pathlib
import tractor import tractor
from tractor.devx._debug import (
BoxedMaybeException,
)
from .pytest import ( from .pytest import (
tractor_test as tractor_test tractor_test as tractor_test
) )
@ -101,13 +98,12 @@ async def expect_ctxc(
''' '''
if yay: if yay:
try: try:
yield (maybe_exc := BoxedMaybeException()) yield
raise RuntimeError('Never raised ctxc?') raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled as ctxc: except tractor.ContextCancelled:
maybe_exc.value = ctxc
if reraise: if reraise:
raise raise
else: else:
return return
else: else:
yield (maybe_exc := BoxedMaybeException()) yield

View File

@ -110,11 +110,33 @@ class PldRx(Struct):
# TODO: better to bind it here? # TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel # _rx_mc: trio.MemoryReceiveChannel
_pld_dec: MsgDec _pld_dec: MsgDec
_ctx: Context|None = None
_ipc: Context|MsgStream|None = None
@property @property
def pld_dec(self) -> MsgDec: def pld_dec(self) -> MsgDec:
return self._pld_dec return self._pld_dec
# TODO: a better name?
# -[ ] when would this be used as it avoids needingn to pass the
# ipc prim to every method
@cm
def wraps_ipc(
self,
ipc_prim: Context|MsgStream,
) -> PldRx:
'''
Apply this payload receiver to an IPC primitive type, one
of `Context` or `MsgStream`.
'''
self._ipc = ipc_prim
try:
yield self
finally:
self._ipc = None
@cm @cm
def limit_plds( def limit_plds(
self, self,
@ -147,7 +169,7 @@ class PldRx(Struct):
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._pld_dec.dec return self._pld_dec.dec
def recv_msg_nowait( def recv_pld_nowait(
self, self,
# TODO: make this `MsgStream` compat as well, see above^ # TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream, # ipc_prim: Context|MsgStream,
@ -158,95 +180,34 @@ class PldRx(Struct):
hide_tb: bool = False, hide_tb: bool = False,
**dec_pld_kwargs, **dec_pld_kwargs,
) -> tuple[ ) -> Any|Raw:
MsgType[PayloadT],
PayloadT,
]:
'''
Attempt to non-blocking receive a message from the `._rx_chan` and
unwrap it's payload delivering the pair to the caller.
'''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait() ipc._rx_chan.receive_nowait()
) )
pld: PayloadT = self.decode_pld( return self.decode_pld(
msg, msg,
ipc=ipc, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=hide_tb, hide_tb=hide_tb,
**dec_pld_kwargs, **dec_pld_kwargs,
) )
return (
msg,
pld,
)
async def recv_msg(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: ONLY for handling `Stop`-msgs that arrive during
# a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**decode_pld_kwargs,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode its payload, and
return the (msg, pld) pair.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
match msg:
case Return()|Error():
log.runtime(
f'Rxed final outcome msg\n'
f'{msg}\n'
)
case Stop():
log.runtime(
f'Rxed stream stopped msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs:
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**decode_pld_kwargs,
)
return (
msg,
pld,
)
async def recv_pld( async def recv_pld(
self, self,
ipc: Context|MsgStream, ipc: Context|MsgStream,
ipc_msg: MsgType[PayloadT]|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True, hide_tb: bool = True,
**dec_pld_kwargs, **dec_pld_kwargs,
) -> PayloadT: ) -> Any|Raw:
''' '''
Receive a `MsgType`, then decode and return its `.pld` field. Receive a `MsgType`, then decode and return its `.pld` field.
@ -258,13 +219,6 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive() await ipc._rx_chan.receive()
) )
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld( return self.decode_pld(
msg=msg, msg=msg,
ipc=ipc, ipc=ipc,
@ -453,6 +407,45 @@ class PldRx(Struct):
__tracebackhide__: bool = False __tracebackhide__: bool = False
raise raise
dec_msg = decode_pld
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode it's payload, and return
the pair of refs.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs:
match msg:
case Stop():
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**kwargs,
)
return msg, pld
@cm @cm
def limit_plds( def limit_plds(
@ -545,8 +538,8 @@ async def maybe_limit_plds(
async def drain_to_final_msg( async def drain_to_final_msg(
ctx: Context, ctx: Context,
msg_limit: int = 6,
hide_tb: bool = True, hide_tb: bool = True,
msg_limit: int = 6,
) -> tuple[ ) -> tuple[
Return|None, Return|None,
@ -575,8 +568,8 @@ async def drain_to_final_msg(
even after ctx closure and the `.open_context()` block exit. even after ctx closure and the `.open_context()` block exit.
''' '''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns raise_overrun: bool = not ctx._allow_overruns
parent_never_opened_stream: bool = ctx._stream is None
# wait for a final context result by collecting (but # wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
@ -585,14 +578,13 @@ async def drain_to_final_msg(
result_msg: Return|Error|None = None result_msg: Return|Error|None = None
while not ( while not (
ctx.maybe_error ctx.maybe_error
and and not ctx._final_result_is_set()
not ctx._final_result_is_set()
): ):
try: try:
# receive all msgs, scanning for either a final result # receive all msgs, scanning for either a final result
# or error; the underlying call should never raise any # or error; the underlying call should never raise any
# remote error directly! # remote error directly!
msg, pld = await ctx._pld_rx.recv_msg( msg, pld = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx, ipc=ctx,
expect_msg=Return, expect_msg=Return,
raise_error=False, raise_error=False,
@ -639,11 +631,6 @@ async def drain_to_final_msg(
) )
__tracebackhide__: bool = False __tracebackhide__: bool = False
else:
log.cancel(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
@ -675,24 +662,17 @@ async def drain_to_final_msg(
case Yield(): case Yield():
pre_result_drained.append(msg) pre_result_drained.append(msg)
if ( if (
not parent_never_opened_stream (ctx._stream.closed
and ( and (reason := 'stream was already closed')
(ctx._stream.closed )
and or (ctx.cancel_acked
(reason := 'stream was already closed') and (reason := 'ctx cancelled other side')
) or )
(ctx.cancel_acked or (ctx._cancel_called
and and (reason := 'ctx called `.cancel()`')
(reason := 'ctx cancelled other side') )
) or (len(pre_result_drained) > msg_limit
or (ctx._cancel_called and (reason := f'"yield" limit={msg_limit}')
and
(reason := 'ctx called `.cancel()`')
)
or (len(pre_result_drained) > msg_limit
and
(reason := f'"yield" limit={msg_limit}')
)
) )
): ):
log.cancel( log.cancel(
@ -710,7 +690,7 @@ async def drain_to_final_msg(
# drain up to the `msg_limit` hoping to get # drain up to the `msg_limit` hoping to get
# a final result or error/ctxc. # a final result or error/ctxc.
else: else:
report: str = ( log.warning(
'Ignoring "yield" msg during `ctx.result()` drain..\n' 'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n' f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n' f' |_{ctx._nsf}()\n\n'
@ -719,14 +699,6 @@ async def drain_to_final_msg(
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
if parent_never_opened_stream:
report = (
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
f'\n'
# f'{ctx}\n'
) + report
log.warning(report)
continue continue
# stream terminated, but no result yet.. # stream terminated, but no result yet..
@ -818,7 +790,6 @@ async def drain_to_final_msg(
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )
__tracebackhide__: bool = hide_tb
return ( return (
result_msg, result_msg,
pre_result_drained, pre_result_drained,