Compare commits

...

14 Commits

Author SHA1 Message Date
Tyler Goodlet 74945f5163 Use shorthand nursery var-names per convention in codebase 2025-03-13 13:17:28 -04:00
Tyler Goodlet 8ae7aeccca Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2025-03-13 13:17:28 -04:00
Tyler Goodlet b01cf136f6 Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2025-03-13 13:17:28 -04:00
Tyler Goodlet af1313cd9c Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2025-03-13 13:17:28 -04:00
Tyler Goodlet 6e4ae7ca86 Add `.runtime()`-emit to `._invoke()` to report final result msg in the child 2025-03-12 16:41:42 -04:00
Tyler Goodlet 5497401920 Add `MsgStream._stop_msg` use new `PldRx` API
In particular ensuring we use `ctx._pld_rx.recv_msg_nowait()` from
`.receive_nowait()` (which is called from `.aclose()`) such that we
ALWAYS (can) set the surrounding `Context._result/._outcome_msg` attrs
on reception of a final `Return`!!

This fixes a final stream-teardown-race-condition-bug where prior we
normally didn't set the `Context._result/._outcome_msg` in such cases.
This is **precisely because**  `.receive_nowait()` only returns the
`pld` and when called from `.aclose()` this value is discarded, meaning
so is its boxing `Return` despite consuming it from the underlying
`._rx_chan`..

Longer term this should be solved differently by ensuring such races
cases are handled at a higher scope like inside `Context._deliver_msg()`
or the `Portal.open_context()` enter/exit blocks? Add a detailed warning
note and todos for all this around the special case block!
2025-03-12 16:39:52 -04:00
Tyler Goodlet 03c447df0d Add `Context._outcome_msg` use new `PldRx` API
Such that any `Return` is always capture for each ctx instance and set
in `._deliver_msg()` normally; ensures we can at least introspect for it
when missing (like in a recently discovered stream teardown race bug).
Yes this augments the already existing `._result` which is dedicated for
the `._outcome_msg.pld` in the non-error case; we might want to see if
there's a nicer way to directly proxy ref to that without getting the
pre-pld-decoded `Raw` form with `msgspec`?

Also use the new `ctx._pld_rx.recv_msg()` and drop assigning
`pld_rx._ctx`.
2025-03-12 15:15:30 -04:00
Tyler Goodlet 1ce99ae742 Slight `PldRx` rework to simplify
Namely renaming and tweaking the `MsgType` receiving methods,
- `.recv_msg()` from what was `.recv_msg_w_pld()` which both receives
  the IPC msg from the underlying `._rx_chan` and then decodes its
  payload with `.decode_pld()`; it now also log reports on the different
  "stage of SC dialog protocol" msg types via a `match/case`.
- a new `.recv_msg_nowait()` sync equivalent of ^ (*was*
  `.recv_pld_nowait()`) who's use was the source of a recently
  discovered bug where any final `Return.pld` is being
  consumed-n-discarded by by `MsgStream.aclose()` depending on
  ctx/stream teardown race conditions..

Also,
- remove all the "instance persistent" ipc-ctx attrs, specifically the
  optional `_ipc`, `_ctx` and the `.wraps_ipc()` cm, since none of them
  were ever really needed/used; all methods which require
  a `Context/MsgStream` are explicitly always passed.
- update a buncha typing namely to use the more generic-styled
  `PayloadT` over `Any` and obviously `MsgType[PayloadT]`.
2025-03-12 13:49:58 -04:00
Tyler Goodlet 96826854b7 Rename ext-types with `msgspec` suite module 2025-03-12 13:47:53 -04:00
Tyler Goodlet 434577953a Complete rename to parent->child IPC ctx peers
Now changed in all comments docs **and** test-code content such that we
aren't using the "caller"->"callee" semantics anymore.
2025-03-12 13:15:48 -04:00
Tyler Goodlet b6b001faad Mk `tests/__init__.py`, not sure where it went?
I must have had a local touched file but never committed or something?
Seems that new `pytest` requires a top level `tests` pkg in order for
relative `.conftest` imports to work.
2025-03-12 13:13:20 -04:00
Tyler Goodlet 3a3fd36890 Fix msg-draining on `parent_never_opened_stream`!
Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case
block we weren't guarding against the `ctx._stream is None` edge case
which should be treated a `continue`-draining (not a `break` or
attr-error!!) situation since the peer task maybe be continuing to send
`Yield` but has not yet sent an outcome msg (one of
`Return/Error/ContextCancelled`) to terminate the loop. Ensure we
explicitly warn about this case as well as `.cancel()` emit on a taskc.

Thanks again to @guille for discovering this!

Also add temporary `.info()`s around rxed `Return` msgs as part of
trying to debug a different bug discovered while updating the
context-semantics test suite (in a prior commit).
2025-03-11 14:31:53 -04:00
Tyler Goodlet f0561fc8c0 Extend ctx semantics suite for streaming edge cases!
Muchas grax to @guilledk for finding the first issue which kicked of
this further scrutiny of the `tractor.Context` and `MsgStream` semantics
test suite with a strange edge case where,
- if the parent opened and immediately closed a stream while the remote
  child task started and continued (without terminating) to send msgs
  the parent's `open_context().__aexit__()` would **not block** on the
  child to complete!
=> this was seemingly due to a bug discovered inside the
  `.msg._ops.drain_to_final_msg()` stream handling case logic where we
  are NOT checking if `Context._stream` is non-`None`!

As such this,
- extends the `test_caller_closes_ctx_after_callee_opens_stream` (now
  renamed, see below) to include cases for all combinations of the child
  and parent sending before receiving on the stream as well as all
  placements of `Context.cancel()` in the parent before, around and after
  the stream open.
- uses the new `expect_ctxc()` for expecting the taskc (`trio.Task`
  cancelled)` cases.
- also extends the `test_callee_closes_ctx_after_stream_open` (also
  renamed) to include the case where the parent sends a msg before it
  receives.
=> this case has unveiled yet-another-bug where somehow the underlying
  `MsgStream._rx_chan: trio.ReceiveMemoryChannel` is allowing the
  child's `Return[None]` msg be consumed and NOT in a place where it is
  correctly set as `Context._result` resulting in the parent hanging
  forever inside `._ops.drain_to_final_msg()`..

Alongside,
- start renaming using the new "remote-task-peer-side" semantics
  throughout the test module: "caller" -> "parent", "callee" -> "child".
2025-03-11 14:04:55 -04:00
Tyler Goodlet d1abe4da44 Deliver a `MaybeBoxedError` from `.expect_ctxc()`
Just like we do from the `.devx._debug.open_crash_handler()`, this
allows checking various attrs on the raised `ContextCancelled` much like
`with pytest.raises() as excinfo:`.
2025-03-10 18:17:31 -04:00
15 changed files with 1041 additions and 213 deletions

View File

View File

@ -22,7 +22,7 @@ from tractor.devx._debug import (
_repl_fail_msg as _repl_fail_msg, _repl_fail_msg as _repl_fail_msg,
_ctlc_ignore_header as _ctlc_ignore_header, _ctlc_ignore_header as _ctlc_ignore_header,
) )
from conftest import ( from ..conftest import (
_ci_env, _ci_env,
) )

View File

@ -14,7 +14,7 @@ import tractor
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
) )
from conftest import no_windows from .conftest import no_windows
def is_win(): def is_win():

View File

@ -38,9 +38,9 @@ from tractor._testing import (
# - standard setup/teardown: # - standard setup/teardown:
# ``Portal.open_context()`` starts a new # ``Portal.open_context()`` starts a new
# remote task context in another actor. The target actor's task must # remote task context in another actor. The target actor's task must
# call ``Context.started()`` to unblock this entry on the caller side. # call ``Context.started()`` to unblock this entry on the parent side.
# the callee task executes until complete and returns a final value # the child task executes until complete and returns a final value
# which is delivered to the caller side and retreived via # which is delivered to the parent side and retreived via
# ``Context.result()``. # ``Context.result()``.
# - cancel termination: # - cancel termination:
@ -170,9 +170,9 @@ async def assert_state(value: bool):
[False, ValueError, KeyboardInterrupt], [False, ValueError, KeyboardInterrupt],
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'callee_blocks_forever', 'child_blocks_forever',
[False, True], [False, True],
ids=lambda item: f'callee_blocks_forever={item}' ids=lambda item: f'child_blocks_forever={item}'
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'pointlessly_open_stream', 'pointlessly_open_stream',
@ -181,7 +181,7 @@ async def assert_state(value: bool):
) )
def test_simple_context( def test_simple_context(
error_parent, error_parent,
callee_blocks_forever, child_blocks_forever,
pointlessly_open_stream, pointlessly_open_stream,
debug_mode: bool, debug_mode: bool,
): ):
@ -204,13 +204,13 @@ def test_simple_context(
portal.open_context( portal.open_context(
simple_setup_teardown, simple_setup_teardown,
data=10, data=10,
block_forever=callee_blocks_forever, block_forever=child_blocks_forever,
) as (ctx, sent), ) as (ctx, sent),
): ):
assert current_ipc_ctx() is ctx assert current_ipc_ctx() is ctx
assert sent == 11 assert sent == 11
if callee_blocks_forever: if child_blocks_forever:
await portal.run(assert_state, value=True) await portal.run(assert_state, value=True)
else: else:
assert await ctx.result() == 'yo' assert await ctx.result() == 'yo'
@ -220,7 +220,7 @@ def test_simple_context(
if error_parent: if error_parent:
raise error_parent raise error_parent
if callee_blocks_forever: if child_blocks_forever:
await ctx.cancel() await ctx.cancel()
else: else:
# in this case the stream will send a # in this case the stream will send a
@ -259,9 +259,9 @@ def test_simple_context(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'callee_returns_early', 'child_returns_early',
[True, False], [True, False],
ids=lambda item: f'callee_returns_early={item}' ids=lambda item: f'child_returns_early={item}'
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'cancel_method', 'cancel_method',
@ -273,14 +273,14 @@ def test_simple_context(
[True, False], [True, False],
ids=lambda item: f'chk_ctx_result_before_exit={item}' ids=lambda item: f'chk_ctx_result_before_exit={item}'
) )
def test_caller_cancels( def test_parent_cancels(
cancel_method: str, cancel_method: str,
chk_ctx_result_before_exit: bool, chk_ctx_result_before_exit: bool,
callee_returns_early: bool, child_returns_early: bool,
debug_mode: bool, debug_mode: bool,
): ):
''' '''
Verify that when the opening side of a context (aka the caller) Verify that when the opening side of a context (aka the parent)
cancels that context, the ctx does not raise a cancelled when cancels that context, the ctx does not raise a cancelled when
either calling `.result()` or on context exit. either calling `.result()` or on context exit.
@ -294,7 +294,7 @@ def test_caller_cancels(
if ( if (
cancel_method == 'portal' cancel_method == 'portal'
and not callee_returns_early and not child_returns_early
): ):
try: try:
res = await ctx.result() res = await ctx.result()
@ -318,7 +318,7 @@ def test_caller_cancels(
pytest.fail(f'should not have raised ctxc\n{ctxc}') pytest.fail(f'should not have raised ctxc\n{ctxc}')
# we actually get a result # we actually get a result
if callee_returns_early: if child_returns_early:
assert res == 'yo' assert res == 'yo'
assert ctx.outcome is res assert ctx.outcome is res
assert ctx.maybe_error is None assert ctx.maybe_error is None
@ -362,14 +362,14 @@ def test_caller_cancels(
) )
timeout: float = ( timeout: float = (
0.5 0.5
if not callee_returns_early if not child_returns_early
else 2 else 2
) )
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with ( async with (
expect_ctxc( expect_ctxc(
yay=( yay=(
not callee_returns_early not child_returns_early
and cancel_method == 'portal' and cancel_method == 'portal'
) )
), ),
@ -377,13 +377,13 @@ def test_caller_cancels(
portal.open_context( portal.open_context(
simple_setup_teardown, simple_setup_teardown,
data=10, data=10,
block_forever=not callee_returns_early, block_forever=not child_returns_early,
) as (ctx, sent), ) as (ctx, sent),
): ):
if callee_returns_early: if child_returns_early:
# ensure we block long enough before sending # ensure we block long enough before sending
# a cancel such that the callee has already # a cancel such that the child has already
# returned it's result. # returned it's result.
await trio.sleep(0.5) await trio.sleep(0.5)
@ -421,7 +421,7 @@ def test_caller_cancels(
# which should in turn cause `ctx._scope` to # which should in turn cause `ctx._scope` to
# catch any cancellation? # catch any cancellation?
if ( if (
not callee_returns_early not child_returns_early
and cancel_method != 'portal' and cancel_method != 'portal'
): ):
assert not ctx._scope.cancelled_caught assert not ctx._scope.cancelled_caught
@ -430,11 +430,11 @@ def test_caller_cancels(
# basic stream terminations: # basic stream terminations:
# - callee context closes without using stream # - child context closes without using stream
# - caller context closes without using stream # - parent context closes without using stream
# - caller context calls `Context.cancel()` while streaming # - parent context calls `Context.cancel()` while streaming
# is ongoing resulting in callee being cancelled # is ongoing resulting in child being cancelled
# - callee calls `Context.cancel()` while streaming and caller # - child calls `Context.cancel()` while streaming and parent
# sees stream terminated in `RemoteActorError` # sees stream terminated in `RemoteActorError`
# TODO: future possible features # TODO: future possible features
@ -443,7 +443,6 @@ def test_caller_cancels(
@tractor.context @tractor.context
async def close_ctx_immediately( async def close_ctx_immediately(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -454,13 +453,24 @@ async def close_ctx_immediately(
async with ctx.open_stream(): async with ctx.open_stream():
pass pass
print('child returning!')
@pytest.mark.parametrize(
'parent_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@tractor_test @tractor_test
async def test_callee_closes_ctx_after_stream_open( async def test_child_exits_ctx_after_stream_open(
debug_mode: bool, debug_mode: bool,
parent_send_before_receive: bool,
): ):
''' '''
callee context closes without using stream. child context closes without using stream.
This should result in a msg sequence This should result in a msg sequence
|_<root>_ |_<root>_
@ -474,6 +484,9 @@ async def test_callee_closes_ctx_after_stream_open(
=> {'stop': True, 'cid': <str>} => {'stop': True, 'cid': <str>}
''' '''
timeout: float = (
0.5 if not debug_mode else 999
)
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
@ -482,7 +495,7 @@ async def test_callee_closes_ctx_after_stream_open(
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(0.5): with trio.fail_after(timeout):
async with portal.open_context( async with portal.open_context(
close_ctx_immediately, close_ctx_immediately,
@ -494,41 +507,56 @@ async def test_callee_closes_ctx_after_stream_open(
with trio.fail_after(0.4): with trio.fail_after(0.4):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
if parent_send_before_receive:
print('sending first msg from parent!')
await stream.send('yo')
# should fall through since ``StopAsyncIteration`` # should fall through since ``StopAsyncIteration``
# should be raised through translation of # should be raised through translation of
# a ``trio.EndOfChannel`` by # a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()`` # ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream: msg = 10
async for msg in stream:
# trigger failure if we DO NOT # trigger failure if we DO NOT
# get an EOC! # get an EOC!
assert 0 assert 0
else: else:
# never should get anythinig new from
# the underlying stream
assert msg == 10
# verify stream is now closed # verify stream is now closed
try: try:
with trio.fail_after(0.3): with trio.fail_after(0.3):
print('parent trying to `.receive()` on EoC stream!')
await stream.receive() await stream.receive()
assert 0, 'should have raised eoc!?'
except trio.EndOfChannel: except trio.EndOfChannel:
print('parent got EoC as expected!')
pass pass
# raise
# TODO: should be just raise the closed resource err # TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open # directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of # of a stream to the context (at least until a time of
# if/when we decide that's a good idea?) # if/when we decide that's a good idea?)
try: try:
with trio.fail_after(0.5): with trio.fail_after(timeout):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pass pass
except trio.ClosedResourceError: except trio.ClosedResourceError:
pass pass
# if ctx._rx_chan._state.data:
# await tractor.pause()
await portal.cancel_actor() await portal.cancel_actor()
@tractor.context @tractor.context
async def expect_cancelled( async def expect_cancelled(
ctx: Context, ctx: Context,
send_before_receive: bool = False,
) -> None: ) -> None:
global _state global _state
@ -538,6 +566,10 @@ async def expect_cancelled(
try: try:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
if send_before_receive:
await stream.send('yo')
async for msg in stream: async for msg in stream:
await stream.send(msg) # echo server await stream.send(msg) # echo server
@ -564,26 +596,49 @@ async def expect_cancelled(
raise raise
else: else:
assert 0, "callee wasn't cancelled !?" assert 0, "child wasn't cancelled !?"
@pytest.mark.parametrize(
'child_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@pytest.mark.parametrize(
'rent_wait_for_msg',
[
False,
True,
],
ids=lambda item: f'rent_wait_for_msg={item}'
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'use_ctx_cancel_method', 'use_ctx_cancel_method',
[False, True], [
False,
'pre_stream',
'post_stream_open',
'post_stream_close',
],
ids=lambda item: f'use_ctx_cancel_method={item}'
) )
@tractor_test @tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream( async def test_parent_exits_ctx_after_child_enters_stream(
use_ctx_cancel_method: bool, use_ctx_cancel_method: bool|str,
debug_mode: bool, debug_mode: bool,
rent_wait_for_msg: bool,
child_send_before_receive: bool,
): ):
''' '''
caller context closes without using/opening stream Parent-side of IPC context closes without sending on `MsgStream`.
''' '''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
root: Actor = current_actor() root: Actor = current_actor()
portal = await an.start_actor( portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
@ -592,41 +647,52 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context( async with portal.open_context(
expect_cancelled, expect_cancelled,
send_before_receive=child_send_before_receive,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
await portal.run(assert_state, value=True) await portal.run(assert_state, value=True)
# call `ctx.cancel()` explicitly # call `ctx.cancel()` explicitly
if use_ctx_cancel_method: if use_ctx_cancel_method == 'pre_stream':
await ctx.cancel() await ctx.cancel()
# NOTE: means the local side `ctx._scope` will # NOTE: means the local side `ctx._scope` will
# have been cancelled by an ctxc ack and thus # have been cancelled by an ctxc ack and thus
# `._scope.cancelled_caught` should be set. # `._scope.cancelled_caught` should be set.
try: async with (
expect_ctxc(
# XXX: the cause is US since we call
# `Context.cancel()` just above!
yay=True,
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
reraise=True,
) as maybe_ctxc,
):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled as ctxc: if rent_wait_for_msg:
# XXX: the cause is US since we call async for msg in stream:
# `Context.cancel()` just above! print(f'PARENT rx: {msg!r}\n')
assert ( break
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# XXX: must be propagated to __aexit__ if use_ctx_cancel_method == 'post_stream_open':
# and should be silently absorbed there await ctx.cancel()
# since we called `.cancel()` just above ;)
raise
else: if use_ctx_cancel_method == 'post_stream_close':
assert 0, "Should have context cancelled?" await ctx.cancel()
ctxc: tractor.ContextCancelled = maybe_ctxc.value
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# channel should still be up # channel should still be up
assert portal.channel.connected() assert portal.channel.connected()
@ -637,13 +703,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
value=False, value=False,
) )
# XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
# `.open_context()` before the child has returned,
# errored or been cancelled!
else: else:
try: try:
with trio.fail_after(0.2): with trio.fail_after(
await ctx.result() 0.5 # if not debug_mode else 999
):
res = await ctx.wait_for_result()
assert res is not tractor._context.Unresolved
assert 0, "Callee should have blocked!?" assert 0, "Callee should have blocked!?"
except trio.TooSlowError: except trio.TooSlowError:
# NO-OP -> since already called above # NO-OP -> since already triggered by
# `trio.fail_after()` above!
await ctx.cancel() await ctx.cancel()
# NOTE: local scope should have absorbed the cancellation since # NOTE: local scope should have absorbed the cancellation since
@ -683,7 +756,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
@tractor_test @tractor_test
async def test_multitask_caller_cancels_from_nonroot_task( async def test_multitask_parent_cancels_from_nonroot_task(
debug_mode: bool, debug_mode: bool,
): ):
async with tractor.open_nursery( async with tractor.open_nursery(
@ -735,7 +808,6 @@ async def test_multitask_caller_cancels_from_nonroot_task(
@tractor.context @tractor.context
async def cancel_self( async def cancel_self(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -775,11 +847,11 @@ async def cancel_self(
@tractor_test @tractor_test
async def test_callee_cancels_before_started( async def test_child_cancels_before_started(
debug_mode: bool, debug_mode: bool,
): ):
''' '''
Callee calls `Context.cancel()` while streaming and caller Callee calls `Context.cancel()` while streaming and parent
sees stream terminated in `ContextCancelled`. sees stream terminated in `ContextCancelled`.
''' '''
@ -826,14 +898,13 @@ async def never_open_stream(
@tractor.context @tractor.context
async def keep_sending_from_callee( async def keep_sending_from_child(
ctx: Context, ctx: Context,
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
) -> None: ) -> None:
''' '''
Send endlessly on the calleee stream. Send endlessly on the child stream.
''' '''
await ctx.started() await ctx.started()
@ -841,7 +912,7 @@ async def keep_sending_from_callee(
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
) as stream: ) as stream:
for msg in count(): for msg in count():
print(f'callee sending {msg}') print(f'child sending {msg}')
await stream.send(msg) await stream.send(msg)
await trio.sleep(0.01) await trio.sleep(0.01)
@ -849,12 +920,12 @@ async def keep_sending_from_callee(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'overrun_by', 'overrun_by',
[ [
('caller', 1, never_open_stream), ('parent', 1, never_open_stream),
('callee', 0, keep_sending_from_callee), ('child', 0, keep_sending_from_child),
], ],
ids=[ ids=[
('caller_1buf_never_open_stream'), ('parent_1buf_never_open_stream'),
('callee_0buf_keep_sending_from_callee'), ('child_0buf_keep_sending_from_child'),
] ]
) )
def test_one_end_stream_not_opened( def test_one_end_stream_not_opened(
@ -885,8 +956,7 @@ def test_one_end_stream_not_opened(
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
if 'caller' in overrunner: if 'parent' in overrunner:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# itersend +1 msg more then the buffer size # itersend +1 msg more then the buffer size
@ -901,7 +971,7 @@ def test_one_end_stream_not_opened(
await trio.sleep_forever() await trio.sleep_forever()
else: else:
# callee overruns caller case so we do nothing here # child overruns parent case so we do nothing here
await trio.sleep_forever() await trio.sleep_forever()
await portal.cancel_actor() await portal.cancel_actor()
@ -909,19 +979,19 @@ def test_one_end_stream_not_opened(
# 2 overrun cases and the no overrun case (which pushes right up to # 2 overrun cases and the no overrun case (which pushes right up to
# the msg limit) # the msg limit)
if ( if (
overrunner == 'caller' overrunner == 'parent'
): ):
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.boxed_type == StreamOverrun assert excinfo.value.boxed_type == StreamOverrun
elif overrunner == 'callee': elif overrunner == 'child':
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
# TODO: embedded remote errors so that we can verify the source # TODO: embedded remote errors so that we can verify the source
# error? the callee delivers an error which is an overrun # error? the child delivers an error which is an overrun
# wrapped in a remote actor error. # wrapped in a remote actor error.
assert excinfo.value.boxed_type == tractor.RemoteActorError assert excinfo.value.boxed_type == tractor.RemoteActorError
@ -931,8 +1001,7 @@ def test_one_end_stream_not_opened(
@tractor.context @tractor.context
async def echo_back_sequence( async def echo_back_sequence(
ctx: Context,
ctx: Context,
seq: list[int], seq: list[int],
wait_for_cancel: bool, wait_for_cancel: bool,
allow_overruns_side: str, allow_overruns_side: str,
@ -941,12 +1010,12 @@ async def echo_back_sequence(
) -> None: ) -> None:
''' '''
Send endlessly on the calleee stream using a small buffer size Send endlessly on the child stream using a small buffer size
setting on the contex to simulate backlogging that would normally setting on the contex to simulate backlogging that would normally
cause overruns. cause overruns.
''' '''
# NOTE: ensure that if the caller is expecting to cancel this task # NOTE: ensure that if the parent is expecting to cancel this task
# that we stay echoing much longer then they are so we don't # that we stay echoing much longer then they are so we don't
# return early instead of receive the cancel msg. # return early instead of receive the cancel msg.
total_batches: int = ( total_batches: int = (
@ -996,18 +1065,18 @@ async def echo_back_sequence(
if be_slow: if be_slow:
await trio.sleep(0.05) await trio.sleep(0.05)
print('callee waiting on next') print('child waiting on next')
print(f'callee echoing back latest batch\n{batch}') print(f'child echoing back latest batch\n{batch}')
for msg in batch: for msg in batch:
print(f'callee sending msg\n{msg}') print(f'child sending msg\n{msg}')
await stream.send(msg) await stream.send(msg)
try: try:
return 'yo' return 'yo'
finally: finally:
print( print(
'exiting callee with context:\n' 'exiting child with context:\n'
f'{pformat(ctx)}\n' f'{pformat(ctx)}\n'
) )
@ -1061,7 +1130,7 @@ def test_maybe_allow_overruns_stream(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
portal = await an.start_actor( portal = await an.start_actor(
'callee_sends_forever', 'child_sends_forever',
enable_modules=[__name__], enable_modules=[__name__],
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,

View File

@ -10,7 +10,7 @@ import tractor
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
) )
from conftest import ( from .conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
_INT_RETURN_CODE, _INT_RETURN_CODE,

View File

@ -82,6 +82,7 @@ from .msg import (
MsgType, MsgType,
NamespacePath, NamespacePath,
PayloadT, PayloadT,
Return,
Started, Started,
Stop, Stop,
Yield, Yield,
@ -245,11 +246,13 @@ class Context:
# a drain loop? # a drain loop?
# _res_scope: trio.CancelScope|None = None # _res_scope: trio.CancelScope|None = None
_outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value # on a clean exit there should be a final value
# delivered from the far end "callee" task, so # delivered from the far end "callee" task, so
# this value is only set on one side. # this value is only set on one side.
# _result: Any | int = None # _result: Any | int = None
_result: Any|Unresolved = Unresolved _result: PayloadT|Unresolved = Unresolved
# if the local "caller" task errors this value is always set # if the local "caller" task errors this value is always set
# to the error that was captured in the # to the error that was captured in the
@ -1199,9 +1202,11 @@ class Context:
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
assert self._portal, ( if not self._portal:
'`Context.wait_for_result()` can not be called from callee side!' raise RuntimeError(
) 'Invalid usage of `Context.wait_for_result()`!\n'
'Not valid on child-side IPC ctx!\n'
)
if self._final_result_is_set(): if self._final_result_is_set():
return self._result return self._result
@ -1222,6 +1227,8 @@ class Context:
# since every message should be delivered via the normal # since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set # `._deliver_msg()` route which will appropriately set
# any `.maybe_error`. # any `.maybe_error`.
outcome_msg: Return|Error|ContextCancelled
drained_msgs: list[MsgType]
( (
outcome_msg, outcome_msg,
drained_msgs, drained_msgs,
@ -1229,11 +1236,19 @@ class Context:
ctx=self, ctx=self,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
drained_status: str = ( drained_status: str = (
'Ctx drained to final outcome msg\n\n' 'Ctx drained to final outcome msg\n\n'
f'{outcome_msg}\n' f'{outcome_msg}\n'
) )
# ?XXX, should already be set in `._deliver_msg()` right?
if self._outcome_msg is not Unresolved:
# from .devx import _debug
# await _debug.pause()
assert self._outcome_msg is outcome_msg
else:
self._outcome_msg = outcome_msg
if drained_msgs: if drained_msgs:
drained_status += ( drained_status += (
'\n' '\n'
@ -1741,7 +1756,6 @@ class Context:
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
# NOTE: if an error is deteced we should always still # NOTE: if an error is deteced we should always still
# send it through the feeder-mem-chan and expect # send it through the feeder-mem-chan and expect
# it to be raised by any context (stream) consumer # it to be raised by any context (stream) consumer
@ -1753,6 +1767,21 @@ class Context:
# normally the task that should get cancelled/error # normally the task that should get cancelled/error
# from some remote fault! # from some remote fault!
send_chan.send_nowait(msg) send_chan.send_nowait(msg)
match msg:
case Stop():
if (stream := self._stream):
stream._stop_msg = msg
case Return():
if not self._outcome_msg:
log.warning(
f'Setting final outcome msg AFTER '
f'`._rx_chan.send()`??\n'
f'\n'
f'{msg}'
)
self._outcome_msg = msg
return True return True
except trio.BrokenResourceError: except trio.BrokenResourceError:
@ -2009,7 +2038,7 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
try: try:
started_msg, first = await ctx._pld_rx.recv_msg_w_pld( started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
passthrough_non_pld_msgs=False, passthrough_non_pld_msgs=False,
@ -2374,7 +2403,8 @@ async def open_context_from_portal(
# displaying `ContextCancelled` traces where the # displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in # cause of crash/exit IS due to something in
# user/app code on either end of the context. # user/app code on either end of the context.
and not rxchan._closed and
not rxchan._closed
): ):
# XXX NOTE XXX: and again as per above, we mask any # XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask # `trio.Cancelled` raised here so as to NOT mask
@ -2433,6 +2463,7 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit!
log.runtime( log.runtime(
# log.cancel(
f'De-allocating IPC ctx opened with {ctx.side!r} peer \n' f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'uid: {uid}\n' f'uid: {uid}\n'
f'cid: {ctx.cid}\n' f'cid: {ctx.cid}\n'
@ -2488,7 +2519,6 @@ def mk_context(
_caller_info=caller_info, _caller_info=caller_info,
**kwargs, **kwargs,
) )
pld_rx._ctx = ctx
ctx._result = Unresolved ctx._result = Unresolved
return ctx return ctx

View File

@ -184,7 +184,7 @@ class Portal:
( (
self._final_result_msg, self._final_result_msg,
self._final_result_pld, self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld( ) = await self._expect_result_ctx._pld_rx.recv_msg(
ipc=self._expect_result_ctx, ipc=self._expect_result_ctx,
expect_msg=Return, expect_msg=Return,
) )

View File

@ -650,6 +650,10 @@ async def _invoke(
) )
# set and shuttle final result to "parent"-side task. # set and shuttle final result to "parent"-side task.
ctx._result = res ctx._result = res
log.runtime(
f'Sending result msg and exiting {ctx.side!r}\n'
f'{return_msg}\n'
)
await chan.send(return_msg) await chan.send(return_msg)
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is

View File

@ -840,8 +840,10 @@ class Actor:
)] )]
except KeyError: except KeyError:
report: str = ( report: str = (
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC msg!?\n'
f'<=? {uid}\n\n' f'Ctx seems to not/no-longer exist??\n'
f'\n'
f'<=? {uid}\n'
f' |_{pretty_struct.pformat(msg)}\n' f' |_{pretty_struct.pformat(msg)}\n'
) )
match msg: match msg:

View File

@ -45,9 +45,11 @@ from .trionics import (
BroadcastReceiver, BroadcastReceiver,
) )
from tractor.msg import ( from tractor.msg import (
# Return, Error,
# Stop, Return,
Stop,
MsgType, MsgType,
PayloadT,
Yield, Yield,
) )
@ -70,8 +72,7 @@ class MsgStream(trio.abc.Channel):
A bidirectional message stream for receiving logically sequenced A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC `Channel`. values over an inter-actor IPC `Channel`.
This is the type returned to a local task which entered either
`Portal.open_stream_from()` or `Context.open_stream()`.
Termination rules: Termination rules:
@ -94,6 +95,9 @@ class MsgStream(trio.abc.Channel):
self._rx_chan = rx_chan self._rx_chan = rx_chan
self._broadcaster = _broadcaster self._broadcaster = _broadcaster
# any actual IPC msg which is effectively an `EndOfStream`
self._stop_msg: bool|Stop = False
# flag to denote end of stream # flag to denote end of stream
self._eoc: bool|trio.EndOfChannel = False self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
@ -125,16 +129,67 @@ class MsgStream(trio.abc.Channel):
def receive_nowait( def receive_nowait(
self, self,
expect_msg: MsgType = Yield, expect_msg: MsgType = Yield,
): ) -> PayloadT:
ctx: Context = self._ctx ctx: Context = self._ctx
return ctx._pld_rx.recv_pld_nowait( (
msg,
pld,
) = ctx._pld_rx.recv_msg_nowait(
ipc=self, ipc=self,
expect_msg=expect_msg, expect_msg=expect_msg,
) )
# ?TODO, maybe factor this into a hyper-common `unwrap_pld()`
#
match msg:
# XXX, these never seems to ever hit? cool?
case Stop():
log.cancel(
f'Msg-stream was ended via stop msg\n'
f'{msg}'
)
case Error():
log.error(
f'Msg-stream was ended via error msg\n'
f'{msg}'
)
# XXX NOTE, always set any final result on the ctx to
# avoid teardown race conditions where previously this msg
# would be consumed silently (by `.aclose()` doing its
# own "msg drain loop" but WITHOUT those `drained: lists[MsgType]`
# being post-close-processed!
#
# !!TODO, see the equiv todo-comment in `.receive()`
# around the `if drained:` where we should prolly
# ACTUALLY be doing this post-close processing??
#
case Return(pld=pld):
log.warning(
f'Msg-stream final result msg for IPC ctx?\n'
f'{msg}'
)
# XXX TODO, this **should be covered** by higher
# scoped runtime-side method calls such as
# `Context._deliver_msg()`, so you should never
# really see the warning above or else something
# racy/out-of-order is likely going on between
# actor-runtime-side push tasks and the user-app-side
# consume tasks!
# -[ ] figure out that set of race cases and fix!
# -[ ] possibly return the `msg` given an input
# arg-flag is set so we can process the `Return`
# from the `.aclose()` caller?
#
# breakpoint() # to debug this RACE CASE!
ctx._result = pld
ctx._outcome_msg = msg
return pld
async def receive( async def receive(
self, self,
hide_tb: bool = False, hide_tb: bool = False,
): ):
''' '''
@ -154,7 +209,7 @@ class MsgStream(trio.abc.Channel):
# except trio.EndOfChannel: # except trio.EndOfChannel:
# raise StopAsyncIteration # raise StopAsyncIteration
# #
# see ``.aclose()`` for notes on the old behaviour prior to # see `.aclose()` for notes on the old behaviour prior to
# introducing this # introducing this
if self._eoc: if self._eoc:
raise self._eoc raise self._eoc
@ -165,7 +220,11 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb src_err: Exception|None = None # orig tb
try: try:
ctx: Context = self._ctx ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ipc=self) pld = await ctx._pld_rx.recv_pld(
ipc=self,
expect_msg=Yield,
)
return pld
# XXX: the stream terminates on either of: # XXX: the stream terminates on either of:
# - `self._rx_chan.receive()` raising after manual closure # - `self._rx_chan.receive()` raising after manual closure
@ -174,7 +233,7 @@ class MsgStream(trio.abc.Channel):
# - via a `Stop`-msg received from remote peer task. # - via a `Stop`-msg received from remote peer task.
# NOTE # NOTE
# |_ previously this was triggered by calling # |_ previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel # `._rx_chan.aclose()` on the send side of the channel
# inside `Actor._deliver_ctx_payload()`, but now the 'stop' # inside `Actor._deliver_ctx_payload()`, but now the 'stop'
# message handling gets delegated to `PldRFx.recv_pld()` # message handling gets delegated to `PldRFx.recv_pld()`
# internals. # internals.
@ -198,11 +257,14 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose() drained: list[Exception|dict] = await self.aclose()
if drained: if drained:
# ?TODO? pass these to the `._ctx._drained_msgs: deque` # ^^^^^^^^TODO? pass these to the `._ctx._drained_msgs:
# and then iterate them as part of any `.wait_for_result()` call? # deque` and then iterate them as part of any
# # `.wait_for_result()` call?
# from .devx import pause #
# await pause() # -[ ] move the match-case processing from
# `.receive_nowait()` instead to right here, use it from
# a for msg in drained:` post-proc loop?
#
log.warning( log.warning(
'Drained context msgs during closure\n\n' 'Drained context msgs during closure\n\n'
f'{drained}' f'{drained}'
@ -265,9 +327,6 @@ class MsgStream(trio.abc.Channel):
- more or less we try to maintain adherance to trio's `.aclose()` semantics: - more or less we try to maintain adherance to trio's `.aclose()` semantics:
https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
''' '''
# rx_chan = self._rx_chan
# XXX NOTE XXX # XXX NOTE XXX
# it's SUPER IMPORTANT that we ensure we don't DOUBLE # it's SUPER IMPORTANT that we ensure we don't DOUBLE
# DRAIN msgs on closure so avoid getting stuck handing on # DRAIN msgs on closure so avoid getting stuck handing on
@ -279,15 +338,16 @@ class MsgStream(trio.abc.Channel):
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics. # per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
# import tractor
# await tractor.pause()
return [] return []
ctx: Context = self._ctx ctx: Context = self._ctx
drained: list[Exception|dict] = [] drained: list[Exception|dict] = []
while not drained: while not drained:
try: try:
maybe_final_msg = self.receive_nowait( maybe_final_msg: Yield|Return = self.receive_nowait(
# allow_msgs=[Yield, Return], expect_msg=Yield|Return,
expect_msg=Yield,
) )
if maybe_final_msg: if maybe_final_msg:
log.debug( log.debug(
@ -372,8 +432,10 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose() # await rx_chan.aclose()
if not self._eoc: if not self._eoc:
this_side: str = self._ctx.side
peer_side: str = self._ctx.peer_side
message: str = ( message: str = (
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n' f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
# } bc a stream is a "scope"/msging-phase inside an IPC # } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n' f'x}}>\n'
f' |_{self}\n' f' |_{self}\n'
@ -381,9 +443,19 @@ class MsgStream(trio.abc.Channel):
log.cancel(message) log.cancel(message)
self._eoc = trio.EndOfChannel(message) self._eoc = trio.EndOfChannel(message)
if (
(rx_chan := self._rx_chan)
and
(stats := rx_chan.statistics()).tasks_waiting_receive
):
log.cancel(
f'Msg-stream is closing but there is still reader tasks,\n'
f'{stats}\n'
)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <= # => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same # if we're a bi-dir `MsgStream` BECAUSE this same
# core-msg-loop mem recv-chan is used to deliver the # core-msg-loop mem recv-chan is used to deliver the
# potential final result from the surrounding inter-actor # potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that # `Context` so we don't want to close it until that

View File

@ -26,6 +26,9 @@ import os
import pathlib import pathlib
import tractor import tractor
from tractor.devx._debug import (
BoxedMaybeException,
)
from .pytest import ( from .pytest import (
tractor_test as tractor_test tractor_test as tractor_test
) )
@ -98,12 +101,13 @@ async def expect_ctxc(
''' '''
if yay: if yay:
try: try:
yield yield (maybe_exc := BoxedMaybeException())
raise RuntimeError('Never raised ctxc?') raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled: except tractor.ContextCancelled as ctxc:
maybe_exc.value = ctxc
if reraise: if reraise:
raise raise
else: else:
return return
else: else:
yield yield (maybe_exc := BoxedMaybeException())

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level design patterns, APIs and runtime extensions built on top
of the `tractor` runtime core.
'''
from ._service import (
open_service_mngr as open_service_mngr,
get_service_mngr as get_service_mngr,
ServiceMngr as ServiceMngr,
)

View File

@ -0,0 +1,592 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Daemon subactor as service(s) management and supervision primitives
and API.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import tractor
import trio
from trio import TaskStatus
from tractor import (
log,
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
)
log = log.get_logger('tractor')
# TODO: implement a `@singleton` deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# -[ ] go through the options peeps on SO did?
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
# * including @mikenerone's answer
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
#
# -[ ] put it in `tractor.lowlevel._globals` ?
# * fits with our oustanding actor-local/global feat req?
# |_ https://github.com/goodboy/tractor/issues/55
# * how can it relate to the `Actor.lifetime_stack` that was
# silently patched in?
# |_ we could implicitly call both of these in the same
# spot in the runtime using the lifetime stack?
# - `open_singleton_cm().__exit__()`
# -`del_singleton()`
# |_ gives SC fixtue semantics to sync code oriented around
# sub-process lifetime?
# * what about with `trio.RunVar`?
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
# - which we'll need for no-GIL cpython (right?) presuming
# multiple `trio.run()` calls in process?
#
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# a deletion API for explicit instance de-allocation?
# @open_service_mngr.deleter
# def del_service_mngr() -> None:
# mngr = open_service_mngr._singleton[0]
# open_service_mngr._singleton[0] = None
# del mngr
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
#
# > "The default value is evaluated only once. This makes
# a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes"
#
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open an actor-global "service-manager" for supervising a tree
of subactors and/or actor-global tasks.
The delivered `ServiceMngr` is singleton instance for each
actor-process, that is, allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'an': an,
'tn': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.an = an
mngr.tn = tn
else:
assert (mngr.an and mngr.tn)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
an: ActorNursery
tn: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
# TODO, unify this interface with our `TaskManager` PR!
#
#
async def start_service_task(
self,
name: str,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
fn: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Any,
trio.Event,
]:
async def _task_manager_start(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
task_cs = trio.CancelScope()
task_complete = trio.Event()
with task_cs as cs:
task_status.started((
cs,
task_complete,
))
try:
await fn()
except trio.Cancelled as taskc:
log.cancel(
f'Service task for `{name}` was cancelled!\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
)
raise taskc
finally:
task_complete.set()
(
cs,
complete,
) = await self.tn.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (
cs,
complete,
)
return (
cs,
complete,
)
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
ctx_fn: Callable,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Context,
Any,
]:
'''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.tn.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
name=name,
ctx_fn=ctx_fn,
portal=portal,
**ctx_kwargs,
)
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return (
cs,
ipc_ctx,
started,
)
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
# ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False,
**start_actor_kwargs,
) -> Context:
'''
Start new subactor and schedule a supervising "service task"
in it which explicitly defines the sub's lifetime.
"Service daemon subactors" are cancelled (and thus
terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
'''
entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.an.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**start_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(
cs,
sub_ctx,
started,
) = await self.start_service_ctx(
name=daemon_name,
portal=portal,
ctx_fn=ctx_ep,
**ctx_kwargs,
)
return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'

View File

@ -110,33 +110,11 @@ class PldRx(Struct):
# TODO: better to bind it here? # TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel # _rx_mc: trio.MemoryReceiveChannel
_pld_dec: MsgDec _pld_dec: MsgDec
_ctx: Context|None = None
_ipc: Context|MsgStream|None = None
@property @property
def pld_dec(self) -> MsgDec: def pld_dec(self) -> MsgDec:
return self._pld_dec return self._pld_dec
# TODO: a better name?
# -[ ] when would this be used as it avoids needingn to pass the
# ipc prim to every method
@cm
def wraps_ipc(
self,
ipc_prim: Context|MsgStream,
) -> PldRx:
'''
Apply this payload receiver to an IPC primitive type, one
of `Context` or `MsgStream`.
'''
self._ipc = ipc_prim
try:
yield self
finally:
self._ipc = None
@cm @cm
def limit_plds( def limit_plds(
self, self,
@ -169,7 +147,7 @@ class PldRx(Struct):
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._pld_dec.dec return self._pld_dec.dec
def recv_pld_nowait( def recv_msg_nowait(
self, self,
# TODO: make this `MsgStream` compat as well, see above^ # TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream, # ipc_prim: Context|MsgStream,
@ -180,34 +158,95 @@ class PldRx(Struct):
hide_tb: bool = False, hide_tb: bool = False,
**dec_pld_kwargs, **dec_pld_kwargs,
) -> Any|Raw: ) -> tuple[
MsgType[PayloadT],
PayloadT,
]:
'''
Attempt to non-blocking receive a message from the `._rx_chan` and
unwrap it's payload delivering the pair to the caller.
'''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait() ipc._rx_chan.receive_nowait()
) )
return self.decode_pld( pld: PayloadT = self.decode_pld(
msg, msg,
ipc=ipc, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=hide_tb, hide_tb=hide_tb,
**dec_pld_kwargs, **dec_pld_kwargs,
) )
return (
msg,
pld,
)
async def recv_msg(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: ONLY for handling `Stop`-msgs that arrive during
# a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**decode_pld_kwargs,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode its payload, and
return the (msg, pld) pair.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
match msg:
case Return()|Error():
log.runtime(
f'Rxed final outcome msg\n'
f'{msg}\n'
)
case Stop():
log.runtime(
f'Rxed stream stopped msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs:
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**decode_pld_kwargs,
)
return (
msg,
pld,
)
async def recv_pld( async def recv_pld(
self, self,
ipc: Context|MsgStream, ipc: Context|MsgStream,
ipc_msg: MsgType|None = None, ipc_msg: MsgType[PayloadT]|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True, hide_tb: bool = True,
**dec_pld_kwargs, **dec_pld_kwargs,
) -> Any|Raw: ) -> PayloadT:
''' '''
Receive a `MsgType`, then decode and return its `.pld` field. Receive a `MsgType`, then decode and return its `.pld` field.
@ -219,6 +258,13 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive() await ipc._rx_chan.receive()
) )
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld( return self.decode_pld(
msg=msg, msg=msg,
ipc=ipc, ipc=ipc,
@ -407,45 +453,6 @@ class PldRx(Struct):
__tracebackhide__: bool = False __tracebackhide__: bool = False
raise raise
dec_msg = decode_pld
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode it's payload, and return
the pair of refs.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs:
match msg:
case Stop():
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**kwargs,
)
return msg, pld
@cm @cm
def limit_plds( def limit_plds(
@ -538,8 +545,8 @@ async def maybe_limit_plds(
async def drain_to_final_msg( async def drain_to_final_msg(
ctx: Context, ctx: Context,
hide_tb: bool = True,
msg_limit: int = 6, msg_limit: int = 6,
hide_tb: bool = True,
) -> tuple[ ) -> tuple[
Return|None, Return|None,
@ -568,8 +575,8 @@ async def drain_to_final_msg(
even after ctx closure and the `.open_context()` block exit. even after ctx closure and the `.open_context()` block exit.
''' '''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns raise_overrun: bool = not ctx._allow_overruns
parent_never_opened_stream: bool = ctx._stream is None
# wait for a final context result by collecting (but # wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
@ -578,13 +585,14 @@ async def drain_to_final_msg(
result_msg: Return|Error|None = None result_msg: Return|Error|None = None
while not ( while not (
ctx.maybe_error ctx.maybe_error
and not ctx._final_result_is_set() and
not ctx._final_result_is_set()
): ):
try: try:
# receive all msgs, scanning for either a final result # receive all msgs, scanning for either a final result
# or error; the underlying call should never raise any # or error; the underlying call should never raise any
# remote error directly! # remote error directly!
msg, pld = await ctx._pld_rx.recv_msg_w_pld( msg, pld = await ctx._pld_rx.recv_msg(
ipc=ctx, ipc=ctx,
expect_msg=Return, expect_msg=Return,
raise_error=False, raise_error=False,
@ -631,6 +639,11 @@ async def drain_to_final_msg(
) )
__tracebackhide__: bool = False __tracebackhide__: bool = False
else:
log.cancel(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
@ -662,17 +675,24 @@ async def drain_to_final_msg(
case Yield(): case Yield():
pre_result_drained.append(msg) pre_result_drained.append(msg)
if ( if (
(ctx._stream.closed not parent_never_opened_stream
and (reason := 'stream was already closed') and (
) (ctx._stream.closed
or (ctx.cancel_acked and
and (reason := 'ctx cancelled other side') (reason := 'stream was already closed')
) ) or
or (ctx._cancel_called (ctx.cancel_acked
and (reason := 'ctx called `.cancel()`') and
) (reason := 'ctx cancelled other side')
or (len(pre_result_drained) > msg_limit )
and (reason := f'"yield" limit={msg_limit}') or (ctx._cancel_called
and
(reason := 'ctx called `.cancel()`')
)
or (len(pre_result_drained) > msg_limit
and
(reason := f'"yield" limit={msg_limit}')
)
) )
): ):
log.cancel( log.cancel(
@ -690,7 +710,7 @@ async def drain_to_final_msg(
# drain up to the `msg_limit` hoping to get # drain up to the `msg_limit` hoping to get
# a final result or error/ctxc. # a final result or error/ctxc.
else: else:
log.warning( report: str = (
'Ignoring "yield" msg during `ctx.result()` drain..\n' 'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n' f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n' f' |_{ctx._nsf}()\n\n'
@ -699,6 +719,14 @@ async def drain_to_final_msg(
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
if parent_never_opened_stream:
report = (
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
f'\n'
# f'{ctx}\n'
) + report
log.warning(report)
continue continue
# stream terminated, but no result yet.. # stream terminated, but no result yet..
@ -790,6 +818,7 @@ async def drain_to_final_msg(
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )
__tracebackhide__: bool = hide_tb
return ( return (
result_msg, result_msg,
pre_result_drained, pre_result_drained,