Compare commits

...

22 Commits

Author SHA1 Message Date
Tyler Goodlet 18923817c8 Add `debug_mode: bool` control to task mngr
Allows dynamically importing `pdbp` when enabled and a way for
eventually linking with `tractor`'s own debug mode flag.
2025-03-13 13:52:09 -04:00
Tyler Goodlet da69284685 Go all in on "task manager" naming 2025-03-13 13:52:09 -04:00
Tyler Goodlet 23dff1117c More refinements and proper typing
- drop unneeded (and commented) internal cs allocating bits.
- bypass all task manager stuff if no generator is provided by the
  caller; i.e. just call `.start_soon()` as normal.
- fix `Generator` typing.
- add some prints around task manager.
- wrap in `TaskOutcome.lowlevel_task: Task`.
2025-03-13 13:52:09 -04:00
Tyler Goodlet f5b8bde2ff Ensure user-allocated cancel scope just works!
Turns out the nursery doesn't have to care about allocating a per task
`CancelScope` since the user can just do that in the
`@task_scope_manager` if desired B) So just mask all the nursery cs
allocating with the intention of removal.

Also add a test for per-task-cancellation by starting the crash task as
a `trio.sleep_forever()` but then cancel it via the user allocated cs
and ensure the crash propagates as expected 💥
2025-03-13 13:52:09 -04:00
Tyler Goodlet 4b4c8961da Facepalm, don't pass in unecessary cancel scope 2025-03-13 13:52:09 -04:00
Tyler Goodlet 2440f04a43 Do renaming, implement lowlevel `Outcome` sending
As was listed in the many todos, this changes the `.start_soon()` impl
to instead (manually) `.send()` into the user defined
`@task_scope_manager` an `Outcome` from the spawned task. In this case
the task manager wraps that in a user defined (and renamed)
`TaskOutcome` and delivers that + a containing `trio.CancelScope` to the
`.start_soon()` caller. Here the user defined `TaskOutcome` defines
a `.wait_for_result()` method that can be used to await the task's exit
and handle it's underlying returned value or raised error; the
implementation could be different and subject to the user's own whims.

Note that by default, if this was added to `trio`'s core, the
`@task_scope_manager` would simply be implemented as either a `None`
yielding single-yield-generator but more likely just entirely ignored
by the runtime (as in no manual task outcome collecting, generator
calling and sending is done at all) by default if the user does not provide
the `task_scope_manager` to the nursery at open time.
2025-03-13 13:52:09 -04:00
Tyler Goodlet 53bd7a4acc Alias to `@acm` in broadcaster mod 2025-03-13 13:52:09 -04:00
Tyler Goodlet 99d38981e6 Initial prototype for a one-cancels-one style supervisor, nursery thing.. 2025-03-13 13:52:09 -04:00
Tyler Goodlet f1cadd2b32 Use shorthand nursery var-names per convention in codebase 2025-03-13 13:52:09 -04:00
Tyler Goodlet c533c81e25 Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2025-03-13 13:52:09 -04:00
Tyler Goodlet eb833cc963 Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2025-03-13 13:52:09 -04:00
Tyler Goodlet 25db57477c Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2025-03-13 13:52:09 -04:00
Tyler Goodlet 6e4ae7ca86 Add `.runtime()`-emit to `._invoke()` to report final result msg in the child 2025-03-12 16:41:42 -04:00
Tyler Goodlet 5497401920 Add `MsgStream._stop_msg` use new `PldRx` API
In particular ensuring we use `ctx._pld_rx.recv_msg_nowait()` from
`.receive_nowait()` (which is called from `.aclose()`) such that we
ALWAYS (can) set the surrounding `Context._result/._outcome_msg` attrs
on reception of a final `Return`!!

This fixes a final stream-teardown-race-condition-bug where prior we
normally didn't set the `Context._result/._outcome_msg` in such cases.
This is **precisely because**  `.receive_nowait()` only returns the
`pld` and when called from `.aclose()` this value is discarded, meaning
so is its boxing `Return` despite consuming it from the underlying
`._rx_chan`..

Longer term this should be solved differently by ensuring such races
cases are handled at a higher scope like inside `Context._deliver_msg()`
or the `Portal.open_context()` enter/exit blocks? Add a detailed warning
note and todos for all this around the special case block!
2025-03-12 16:39:52 -04:00
Tyler Goodlet 03c447df0d Add `Context._outcome_msg` use new `PldRx` API
Such that any `Return` is always capture for each ctx instance and set
in `._deliver_msg()` normally; ensures we can at least introspect for it
when missing (like in a recently discovered stream teardown race bug).
Yes this augments the already existing `._result` which is dedicated for
the `._outcome_msg.pld` in the non-error case; we might want to see if
there's a nicer way to directly proxy ref to that without getting the
pre-pld-decoded `Raw` form with `msgspec`?

Also use the new `ctx._pld_rx.recv_msg()` and drop assigning
`pld_rx._ctx`.
2025-03-12 15:15:30 -04:00
Tyler Goodlet 1ce99ae742 Slight `PldRx` rework to simplify
Namely renaming and tweaking the `MsgType` receiving methods,
- `.recv_msg()` from what was `.recv_msg_w_pld()` which both receives
  the IPC msg from the underlying `._rx_chan` and then decodes its
  payload with `.decode_pld()`; it now also log reports on the different
  "stage of SC dialog protocol" msg types via a `match/case`.
- a new `.recv_msg_nowait()` sync equivalent of ^ (*was*
  `.recv_pld_nowait()`) who's use was the source of a recently
  discovered bug where any final `Return.pld` is being
  consumed-n-discarded by by `MsgStream.aclose()` depending on
  ctx/stream teardown race conditions..

Also,
- remove all the "instance persistent" ipc-ctx attrs, specifically the
  optional `_ipc`, `_ctx` and the `.wraps_ipc()` cm, since none of them
  were ever really needed/used; all methods which require
  a `Context/MsgStream` are explicitly always passed.
- update a buncha typing namely to use the more generic-styled
  `PayloadT` over `Any` and obviously `MsgType[PayloadT]`.
2025-03-12 13:49:58 -04:00
Tyler Goodlet 96826854b7 Rename ext-types with `msgspec` suite module 2025-03-12 13:47:53 -04:00
Tyler Goodlet 434577953a Complete rename to parent->child IPC ctx peers
Now changed in all comments docs **and** test-code content such that we
aren't using the "caller"->"callee" semantics anymore.
2025-03-12 13:15:48 -04:00
Tyler Goodlet b6b001faad Mk `tests/__init__.py`, not sure where it went?
I must have had a local touched file but never committed or something?
Seems that new `pytest` requires a top level `tests` pkg in order for
relative `.conftest` imports to work.
2025-03-12 13:13:20 -04:00
Tyler Goodlet 3a3fd36890 Fix msg-draining on `parent_never_opened_stream`!
Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case
block we weren't guarding against the `ctx._stream is None` edge case
which should be treated a `continue`-draining (not a `break` or
attr-error!!) situation since the peer task maybe be continuing to send
`Yield` but has not yet sent an outcome msg (one of
`Return/Error/ContextCancelled`) to terminate the loop. Ensure we
explicitly warn about this case as well as `.cancel()` emit on a taskc.

Thanks again to @guille for discovering this!

Also add temporary `.info()`s around rxed `Return` msgs as part of
trying to debug a different bug discovered while updating the
context-semantics test suite (in a prior commit).
2025-03-11 14:31:53 -04:00
Tyler Goodlet f0561fc8c0 Extend ctx semantics suite for streaming edge cases!
Muchas grax to @guilledk for finding the first issue which kicked of
this further scrutiny of the `tractor.Context` and `MsgStream` semantics
test suite with a strange edge case where,
- if the parent opened and immediately closed a stream while the remote
  child task started and continued (without terminating) to send msgs
  the parent's `open_context().__aexit__()` would **not block** on the
  child to complete!
=> this was seemingly due to a bug discovered inside the
  `.msg._ops.drain_to_final_msg()` stream handling case logic where we
  are NOT checking if `Context._stream` is non-`None`!

As such this,
- extends the `test_caller_closes_ctx_after_callee_opens_stream` (now
  renamed, see below) to include cases for all combinations of the child
  and parent sending before receiving on the stream as well as all
  placements of `Context.cancel()` in the parent before, around and after
  the stream open.
- uses the new `expect_ctxc()` for expecting the taskc (`trio.Task`
  cancelled)` cases.
- also extends the `test_callee_closes_ctx_after_stream_open` (also
  renamed) to include the case where the parent sends a msg before it
  receives.
=> this case has unveiled yet-another-bug where somehow the underlying
  `MsgStream._rx_chan: trio.ReceiveMemoryChannel` is allowing the
  child's `Return[None]` msg be consumed and NOT in a place where it is
  correctly set as `Context._result` resulting in the parent hanging
  forever inside `._ops.drain_to_final_msg()`..

Alongside,
- start renaming using the new "remote-task-peer-side" semantics
  throughout the test module: "caller" -> "parent", "callee" -> "child".
2025-03-11 14:04:55 -04:00
Tyler Goodlet d1abe4da44 Deliver a `MaybeBoxedError` from `.expect_ctxc()`
Just like we do from the `.devx._debug.open_crash_handler()`, this
allows checking various attrs on the raised `ContextCancelled` much like
`with pytest.raises() as excinfo:`.
2025-03-10 18:17:31 -04:00
17 changed files with 1365 additions and 215 deletions

View File

View File

@ -22,7 +22,7 @@ from tractor.devx._debug import (
_repl_fail_msg as _repl_fail_msg,
_ctlc_ignore_header as _ctlc_ignore_header,
)
from conftest import (
from ..conftest import (
_ci_env,
)

View File

@ -14,7 +14,7 @@ import tractor
from tractor._testing import (
tractor_test,
)
from conftest import no_windows
from .conftest import no_windows
def is_win():

View File

@ -38,9 +38,9 @@ from tractor._testing import (
# - standard setup/teardown:
# ``Portal.open_context()`` starts a new
# remote task context in another actor. The target actor's task must
# call ``Context.started()`` to unblock this entry on the caller side.
# the callee task executes until complete and returns a final value
# which is delivered to the caller side and retreived via
# call ``Context.started()`` to unblock this entry on the parent side.
# the child task executes until complete and returns a final value
# which is delivered to the parent side and retreived via
# ``Context.result()``.
# - cancel termination:
@ -170,9 +170,9 @@ async def assert_state(value: bool):
[False, ValueError, KeyboardInterrupt],
)
@pytest.mark.parametrize(
'callee_blocks_forever',
'child_blocks_forever',
[False, True],
ids=lambda item: f'callee_blocks_forever={item}'
ids=lambda item: f'child_blocks_forever={item}'
)
@pytest.mark.parametrize(
'pointlessly_open_stream',
@ -181,7 +181,7 @@ async def assert_state(value: bool):
)
def test_simple_context(
error_parent,
callee_blocks_forever,
child_blocks_forever,
pointlessly_open_stream,
debug_mode: bool,
):
@ -204,13 +204,13 @@ def test_simple_context(
portal.open_context(
simple_setup_teardown,
data=10,
block_forever=callee_blocks_forever,
block_forever=child_blocks_forever,
) as (ctx, sent),
):
assert current_ipc_ctx() is ctx
assert sent == 11
if callee_blocks_forever:
if child_blocks_forever:
await portal.run(assert_state, value=True)
else:
assert await ctx.result() == 'yo'
@ -220,7 +220,7 @@ def test_simple_context(
if error_parent:
raise error_parent
if callee_blocks_forever:
if child_blocks_forever:
await ctx.cancel()
else:
# in this case the stream will send a
@ -259,9 +259,9 @@ def test_simple_context(
@pytest.mark.parametrize(
'callee_returns_early',
'child_returns_early',
[True, False],
ids=lambda item: f'callee_returns_early={item}'
ids=lambda item: f'child_returns_early={item}'
)
@pytest.mark.parametrize(
'cancel_method',
@ -273,14 +273,14 @@ def test_simple_context(
[True, False],
ids=lambda item: f'chk_ctx_result_before_exit={item}'
)
def test_caller_cancels(
def test_parent_cancels(
cancel_method: str,
chk_ctx_result_before_exit: bool,
callee_returns_early: bool,
child_returns_early: bool,
debug_mode: bool,
):
'''
Verify that when the opening side of a context (aka the caller)
Verify that when the opening side of a context (aka the parent)
cancels that context, the ctx does not raise a cancelled when
either calling `.result()` or on context exit.
@ -294,7 +294,7 @@ def test_caller_cancels(
if (
cancel_method == 'portal'
and not callee_returns_early
and not child_returns_early
):
try:
res = await ctx.result()
@ -318,7 +318,7 @@ def test_caller_cancels(
pytest.fail(f'should not have raised ctxc\n{ctxc}')
# we actually get a result
if callee_returns_early:
if child_returns_early:
assert res == 'yo'
assert ctx.outcome is res
assert ctx.maybe_error is None
@ -362,14 +362,14 @@ def test_caller_cancels(
)
timeout: float = (
0.5
if not callee_returns_early
if not child_returns_early
else 2
)
with trio.fail_after(timeout):
async with (
expect_ctxc(
yay=(
not callee_returns_early
not child_returns_early
and cancel_method == 'portal'
)
),
@ -377,13 +377,13 @@ def test_caller_cancels(
portal.open_context(
simple_setup_teardown,
data=10,
block_forever=not callee_returns_early,
block_forever=not child_returns_early,
) as (ctx, sent),
):
if callee_returns_early:
if child_returns_early:
# ensure we block long enough before sending
# a cancel such that the callee has already
# a cancel such that the child has already
# returned it's result.
await trio.sleep(0.5)
@ -421,7 +421,7 @@ def test_caller_cancels(
# which should in turn cause `ctx._scope` to
# catch any cancellation?
if (
not callee_returns_early
not child_returns_early
and cancel_method != 'portal'
):
assert not ctx._scope.cancelled_caught
@ -430,11 +430,11 @@ def test_caller_cancels(
# basic stream terminations:
# - callee context closes without using stream
# - caller context closes without using stream
# - caller context calls `Context.cancel()` while streaming
# is ongoing resulting in callee being cancelled
# - callee calls `Context.cancel()` while streaming and caller
# - child context closes without using stream
# - parent context closes without using stream
# - parent context calls `Context.cancel()` while streaming
# is ongoing resulting in child being cancelled
# - child calls `Context.cancel()` while streaming and parent
# sees stream terminated in `RemoteActorError`
# TODO: future possible features
@ -443,7 +443,6 @@ def test_caller_cancels(
@tractor.context
async def close_ctx_immediately(
ctx: Context,
) -> None:
@ -454,13 +453,24 @@ async def close_ctx_immediately(
async with ctx.open_stream():
pass
print('child returning!')
@pytest.mark.parametrize(
'parent_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@tractor_test
async def test_callee_closes_ctx_after_stream_open(
async def test_child_exits_ctx_after_stream_open(
debug_mode: bool,
parent_send_before_receive: bool,
):
'''
callee context closes without using stream.
child context closes without using stream.
This should result in a msg sequence
|_<root>_
@ -474,6 +484,9 @@ async def test_callee_closes_ctx_after_stream_open(
=> {'stop': True, 'cid': <str>}
'''
timeout: float = (
0.5 if not debug_mode else 999
)
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
@ -482,7 +495,7 @@ async def test_callee_closes_ctx_after_stream_open(
enable_modules=[__name__],
)
with trio.fail_after(0.5):
with trio.fail_after(timeout):
async with portal.open_context(
close_ctx_immediately,
@ -494,41 +507,56 @@ async def test_callee_closes_ctx_after_stream_open(
with trio.fail_after(0.4):
async with ctx.open_stream() as stream:
if parent_send_before_receive:
print('sending first msg from parent!')
await stream.send('yo')
# should fall through since ``StopAsyncIteration``
# should be raised through translation of
# a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream:
msg = 10
async for msg in stream:
# trigger failure if we DO NOT
# get an EOC!
assert 0
else:
# never should get anythinig new from
# the underlying stream
assert msg == 10
# verify stream is now closed
try:
with trio.fail_after(0.3):
print('parent trying to `.receive()` on EoC stream!')
await stream.receive()
assert 0, 'should have raised eoc!?'
except trio.EndOfChannel:
print('parent got EoC as expected!')
pass
# raise
# TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of
# if/when we decide that's a good idea?)
try:
with trio.fail_after(0.5):
with trio.fail_after(timeout):
async with ctx.open_stream() as stream:
pass
except trio.ClosedResourceError:
pass
# if ctx._rx_chan._state.data:
# await tractor.pause()
await portal.cancel_actor()
@tractor.context
async def expect_cancelled(
ctx: Context,
send_before_receive: bool = False,
) -> None:
global _state
@ -538,6 +566,10 @@ async def expect_cancelled(
try:
async with ctx.open_stream() as stream:
if send_before_receive:
await stream.send('yo')
async for msg in stream:
await stream.send(msg) # echo server
@ -564,26 +596,49 @@ async def expect_cancelled(
raise
else:
assert 0, "callee wasn't cancelled !?"
assert 0, "child wasn't cancelled !?"
@pytest.mark.parametrize(
'child_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@pytest.mark.parametrize(
'rent_wait_for_msg',
[
False,
True,
],
ids=lambda item: f'rent_wait_for_msg={item}'
)
@pytest.mark.parametrize(
'use_ctx_cancel_method',
[False, True],
[
False,
'pre_stream',
'post_stream_open',
'post_stream_close',
],
ids=lambda item: f'use_ctx_cancel_method={item}'
)
@tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool,
async def test_parent_exits_ctx_after_child_enters_stream(
use_ctx_cancel_method: bool|str,
debug_mode: bool,
rent_wait_for_msg: bool,
child_send_before_receive: bool,
):
'''
caller context closes without using/opening stream
Parent-side of IPC context closes without sending on `MsgStream`.
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
root: Actor = current_actor()
portal = await an.start_actor(
'ctx_cancelled',
@ -592,41 +647,52 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context(
expect_cancelled,
send_before_receive=child_send_before_receive,
) as (ctx, sent):
assert sent is None
await portal.run(assert_state, value=True)
# call `ctx.cancel()` explicitly
if use_ctx_cancel_method:
if use_ctx_cancel_method == 'pre_stream':
await ctx.cancel()
# NOTE: means the local side `ctx._scope` will
# have been cancelled by an ctxc ack and thus
# `._scope.cancelled_caught` should be set.
try:
async with (
expect_ctxc(
# XXX: the cause is US since we call
# `Context.cancel()` just above!
yay=True,
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
reraise=True,
) as maybe_ctxc,
):
async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled as ctxc:
# XXX: the cause is US since we call
# `Context.cancel()` just above!
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
if rent_wait_for_msg:
async for msg in stream:
print(f'PARENT rx: {msg!r}\n')
break
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
if use_ctx_cancel_method == 'post_stream_open':
await ctx.cancel()
else:
assert 0, "Should have context cancelled?"
if use_ctx_cancel_method == 'post_stream_close':
await ctx.cancel()
ctxc: tractor.ContextCancelled = maybe_ctxc.value
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# channel should still be up
assert portal.channel.connected()
@ -637,13 +703,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
value=False,
)
# XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
# `.open_context()` before the child has returned,
# errored or been cancelled!
else:
try:
with trio.fail_after(0.2):
await ctx.result()
with trio.fail_after(
0.5 # if not debug_mode else 999
):
res = await ctx.wait_for_result()
assert res is not tractor._context.Unresolved
assert 0, "Callee should have blocked!?"
except trio.TooSlowError:
# NO-OP -> since already called above
# NO-OP -> since already triggered by
# `trio.fail_after()` above!
await ctx.cancel()
# NOTE: local scope should have absorbed the cancellation since
@ -683,7 +756,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
@tractor_test
async def test_multitask_caller_cancels_from_nonroot_task(
async def test_multitask_parent_cancels_from_nonroot_task(
debug_mode: bool,
):
async with tractor.open_nursery(
@ -735,7 +808,6 @@ async def test_multitask_caller_cancels_from_nonroot_task(
@tractor.context
async def cancel_self(
ctx: Context,
) -> None:
@ -775,11 +847,11 @@ async def cancel_self(
@tractor_test
async def test_callee_cancels_before_started(
async def test_child_cancels_before_started(
debug_mode: bool,
):
'''
Callee calls `Context.cancel()` while streaming and caller
Callee calls `Context.cancel()` while streaming and parent
sees stream terminated in `ContextCancelled`.
'''
@ -826,14 +898,13 @@ async def never_open_stream(
@tractor.context
async def keep_sending_from_callee(
async def keep_sending_from_child(
ctx: Context,
msg_buffer_size: int|None = None,
) -> None:
'''
Send endlessly on the calleee stream.
Send endlessly on the child stream.
'''
await ctx.started()
@ -841,7 +912,7 @@ async def keep_sending_from_callee(
msg_buffer_size=msg_buffer_size,
) as stream:
for msg in count():
print(f'callee sending {msg}')
print(f'child sending {msg}')
await stream.send(msg)
await trio.sleep(0.01)
@ -849,12 +920,12 @@ async def keep_sending_from_callee(
@pytest.mark.parametrize(
'overrun_by',
[
('caller', 1, never_open_stream),
('callee', 0, keep_sending_from_callee),
('parent', 1, never_open_stream),
('child', 0, keep_sending_from_child),
],
ids=[
('caller_1buf_never_open_stream'),
('callee_0buf_keep_sending_from_callee'),
('parent_1buf_never_open_stream'),
('child_0buf_keep_sending_from_child'),
]
)
def test_one_end_stream_not_opened(
@ -885,8 +956,7 @@ def test_one_end_stream_not_opened(
) as (ctx, sent):
assert sent is None
if 'caller' in overrunner:
if 'parent' in overrunner:
async with ctx.open_stream() as stream:
# itersend +1 msg more then the buffer size
@ -901,7 +971,7 @@ def test_one_end_stream_not_opened(
await trio.sleep_forever()
else:
# callee overruns caller case so we do nothing here
# child overruns parent case so we do nothing here
await trio.sleep_forever()
await portal.cancel_actor()
@ -909,19 +979,19 @@ def test_one_end_stream_not_opened(
# 2 overrun cases and the no overrun case (which pushes right up to
# the msg limit)
if (
overrunner == 'caller'
overrunner == 'parent'
):
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.boxed_type == StreamOverrun
elif overrunner == 'callee':
elif overrunner == 'child':
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
# TODO: embedded remote errors so that we can verify the source
# error? the callee delivers an error which is an overrun
# error? the child delivers an error which is an overrun
# wrapped in a remote actor error.
assert excinfo.value.boxed_type == tractor.RemoteActorError
@ -931,8 +1001,7 @@ def test_one_end_stream_not_opened(
@tractor.context
async def echo_back_sequence(
ctx: Context,
ctx: Context,
seq: list[int],
wait_for_cancel: bool,
allow_overruns_side: str,
@ -941,12 +1010,12 @@ async def echo_back_sequence(
) -> None:
'''
Send endlessly on the calleee stream using a small buffer size
Send endlessly on the child stream using a small buffer size
setting on the contex to simulate backlogging that would normally
cause overruns.
'''
# NOTE: ensure that if the caller is expecting to cancel this task
# NOTE: ensure that if the parent is expecting to cancel this task
# that we stay echoing much longer then they are so we don't
# return early instead of receive the cancel msg.
total_batches: int = (
@ -996,18 +1065,18 @@ async def echo_back_sequence(
if be_slow:
await trio.sleep(0.05)
print('callee waiting on next')
print('child waiting on next')
print(f'callee echoing back latest batch\n{batch}')
print(f'child echoing back latest batch\n{batch}')
for msg in batch:
print(f'callee sending msg\n{msg}')
print(f'child sending msg\n{msg}')
await stream.send(msg)
try:
return 'yo'
finally:
print(
'exiting callee with context:\n'
'exiting child with context:\n'
f'{pformat(ctx)}\n'
)
@ -1061,7 +1130,7 @@ def test_maybe_allow_overruns_stream(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'callee_sends_forever',
'child_sends_forever',
enable_modules=[__name__],
loglevel=loglevel,
debug_mode=debug_mode,

View File

@ -10,7 +10,7 @@ import tractor
from tractor._testing import (
tractor_test,
)
from conftest import (
from .conftest import (
sig_prog,
_INT_SIGNAL,
_INT_RETURN_CODE,

View File

@ -82,6 +82,7 @@ from .msg import (
MsgType,
NamespacePath,
PayloadT,
Return,
Started,
Stop,
Yield,
@ -245,11 +246,13 @@ class Context:
# a drain loop?
# _res_scope: trio.CancelScope|None = None
_outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value
# delivered from the far end "callee" task, so
# this value is only set on one side.
# _result: Any | int = None
_result: Any|Unresolved = Unresolved
_result: PayloadT|Unresolved = Unresolved
# if the local "caller" task errors this value is always set
# to the error that was captured in the
@ -1199,9 +1202,11 @@ class Context:
'''
__tracebackhide__: bool = hide_tb
assert self._portal, (
'`Context.wait_for_result()` can not be called from callee side!'
)
if not self._portal:
raise RuntimeError(
'Invalid usage of `Context.wait_for_result()`!\n'
'Not valid on child-side IPC ctx!\n'
)
if self._final_result_is_set():
return self._result
@ -1222,6 +1227,8 @@ class Context:
# since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set
# any `.maybe_error`.
outcome_msg: Return|Error|ContextCancelled
drained_msgs: list[MsgType]
(
outcome_msg,
drained_msgs,
@ -1229,11 +1236,19 @@ class Context:
ctx=self,
hide_tb=hide_tb,
)
drained_status: str = (
'Ctx drained to final outcome msg\n\n'
f'{outcome_msg}\n'
)
# ?XXX, should already be set in `._deliver_msg()` right?
if self._outcome_msg is not Unresolved:
# from .devx import _debug
# await _debug.pause()
assert self._outcome_msg is outcome_msg
else:
self._outcome_msg = outcome_msg
if drained_msgs:
drained_status += (
'\n'
@ -1741,7 +1756,6 @@ class Context:
f'{structfmt(msg)}\n'
)
# NOTE: if an error is deteced we should always still
# send it through the feeder-mem-chan and expect
# it to be raised by any context (stream) consumer
@ -1753,6 +1767,21 @@ class Context:
# normally the task that should get cancelled/error
# from some remote fault!
send_chan.send_nowait(msg)
match msg:
case Stop():
if (stream := self._stream):
stream._stop_msg = msg
case Return():
if not self._outcome_msg:
log.warning(
f'Setting final outcome msg AFTER '
f'`._rx_chan.send()`??\n'
f'\n'
f'{msg}'
)
self._outcome_msg = msg
return True
except trio.BrokenResourceError:
@ -2009,7 +2038,7 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
try:
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
@ -2374,7 +2403,8 @@ async def open_context_from_portal(
# displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in
# user/app code on either end of the context.
and not rxchan._closed
and
not rxchan._closed
):
# XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask
@ -2433,6 +2463,7 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and
# exit!
log.runtime(
# log.cancel(
f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'uid: {uid}\n'
f'cid: {ctx.cid}\n'
@ -2488,7 +2519,6 @@ def mk_context(
_caller_info=caller_info,
**kwargs,
)
pld_rx._ctx = ctx
ctx._result = Unresolved
return ctx

View File

@ -184,7 +184,7 @@ class Portal:
(
self._final_result_msg,
self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
) = await self._expect_result_ctx._pld_rx.recv_msg(
ipc=self._expect_result_ctx,
expect_msg=Return,
)

View File

@ -650,6 +650,10 @@ async def _invoke(
)
# set and shuttle final result to "parent"-side task.
ctx._result = res
log.runtime(
f'Sending result msg and exiting {ctx.side!r}\n'
f'{return_msg}\n'
)
await chan.send(return_msg)
# NOTE: this happens IFF `ctx._scope.cancel()` is

View File

@ -840,8 +840,10 @@ class Actor:
)]
except KeyError:
report: str = (
'Ignoring invalid IPC ctx msg!\n\n'
f'<=? {uid}\n\n'
'Ignoring invalid IPC msg!?\n'
f'Ctx seems to not/no-longer exist??\n'
f'\n'
f'<=? {uid}\n'
f' |_{pretty_struct.pformat(msg)}\n'
)
match msg:

View File

@ -45,9 +45,11 @@ from .trionics import (
BroadcastReceiver,
)
from tractor.msg import (
# Return,
# Stop,
Error,
Return,
Stop,
MsgType,
PayloadT,
Yield,
)
@ -70,8 +72,7 @@ class MsgStream(trio.abc.Channel):
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC `Channel`.
This is the type returned to a local task which entered either
`Portal.open_stream_from()` or `Context.open_stream()`.
Termination rules:
@ -94,6 +95,9 @@ class MsgStream(trio.abc.Channel):
self._rx_chan = rx_chan
self._broadcaster = _broadcaster
# any actual IPC msg which is effectively an `EndOfStream`
self._stop_msg: bool|Stop = False
# flag to denote end of stream
self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False
@ -125,16 +129,67 @@ class MsgStream(trio.abc.Channel):
def receive_nowait(
self,
expect_msg: MsgType = Yield,
):
) -> PayloadT:
ctx: Context = self._ctx
return ctx._pld_rx.recv_pld_nowait(
(
msg,
pld,
) = ctx._pld_rx.recv_msg_nowait(
ipc=self,
expect_msg=expect_msg,
)
# ?TODO, maybe factor this into a hyper-common `unwrap_pld()`
#
match msg:
# XXX, these never seems to ever hit? cool?
case Stop():
log.cancel(
f'Msg-stream was ended via stop msg\n'
f'{msg}'
)
case Error():
log.error(
f'Msg-stream was ended via error msg\n'
f'{msg}'
)
# XXX NOTE, always set any final result on the ctx to
# avoid teardown race conditions where previously this msg
# would be consumed silently (by `.aclose()` doing its
# own "msg drain loop" but WITHOUT those `drained: lists[MsgType]`
# being post-close-processed!
#
# !!TODO, see the equiv todo-comment in `.receive()`
# around the `if drained:` where we should prolly
# ACTUALLY be doing this post-close processing??
#
case Return(pld=pld):
log.warning(
f'Msg-stream final result msg for IPC ctx?\n'
f'{msg}'
)
# XXX TODO, this **should be covered** by higher
# scoped runtime-side method calls such as
# `Context._deliver_msg()`, so you should never
# really see the warning above or else something
# racy/out-of-order is likely going on between
# actor-runtime-side push tasks and the user-app-side
# consume tasks!
# -[ ] figure out that set of race cases and fix!
# -[ ] possibly return the `msg` given an input
# arg-flag is set so we can process the `Return`
# from the `.aclose()` caller?
#
# breakpoint() # to debug this RACE CASE!
ctx._result = pld
ctx._outcome_msg = msg
return pld
async def receive(
self,
hide_tb: bool = False,
):
'''
@ -154,7 +209,7 @@ class MsgStream(trio.abc.Channel):
# except trio.EndOfChannel:
# raise StopAsyncIteration
#
# see ``.aclose()`` for notes on the old behaviour prior to
# see `.aclose()` for notes on the old behaviour prior to
# introducing this
if self._eoc:
raise self._eoc
@ -165,7 +220,11 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb
try:
ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ipc=self)
pld = await ctx._pld_rx.recv_pld(
ipc=self,
expect_msg=Yield,
)
return pld
# XXX: the stream terminates on either of:
# - `self._rx_chan.receive()` raising after manual closure
@ -174,7 +233,7 @@ class MsgStream(trio.abc.Channel):
# - via a `Stop`-msg received from remote peer task.
# NOTE
# |_ previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel
# `._rx_chan.aclose()` on the send side of the channel
# inside `Actor._deliver_ctx_payload()`, but now the 'stop'
# message handling gets delegated to `PldRFx.recv_pld()`
# internals.
@ -198,11 +257,14 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose()
if drained:
# ?TODO? pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.wait_for_result()` call?
#
# from .devx import pause
# await pause()
# ^^^^^^^^TODO? pass these to the `._ctx._drained_msgs:
# deque` and then iterate them as part of any
# `.wait_for_result()` call?
#
# -[ ] move the match-case processing from
# `.receive_nowait()` instead to right here, use it from
# a for msg in drained:` post-proc loop?
#
log.warning(
'Drained context msgs during closure\n\n'
f'{drained}'
@ -265,9 +327,6 @@ class MsgStream(trio.abc.Channel):
- more or less we try to maintain adherance to trio's `.aclose()` semantics:
https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
'''
# rx_chan = self._rx_chan
# XXX NOTE XXX
# it's SUPER IMPORTANT that we ensure we don't DOUBLE
# DRAIN msgs on closure so avoid getting stuck handing on
@ -279,15 +338,16 @@ class MsgStream(trio.abc.Channel):
# this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
# import tractor
# await tractor.pause()
return []
ctx: Context = self._ctx
drained: list[Exception|dict] = []
while not drained:
try:
maybe_final_msg = self.receive_nowait(
# allow_msgs=[Yield, Return],
expect_msg=Yield,
maybe_final_msg: Yield|Return = self.receive_nowait(
expect_msg=Yield|Return,
)
if maybe_final_msg:
log.debug(
@ -372,8 +432,10 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose()
if not self._eoc:
this_side: str = self._ctx.side
peer_side: str = self._ctx.peer_side
message: str = (
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
# } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n'
f' |_{self}\n'
@ -381,9 +443,19 @@ class MsgStream(trio.abc.Channel):
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
if (
(rx_chan := self._rx_chan)
and
(stats := rx_chan.statistics()).tasks_waiting_receive
):
log.cancel(
f'Msg-stream is closing but there is still reader tasks,\n'
f'{stats}\n'
)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same
# if we're a bi-dir `MsgStream` BECAUSE this same
# core-msg-loop mem recv-chan is used to deliver the
# potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that

View File

@ -26,6 +26,9 @@ import os
import pathlib
import tractor
from tractor.devx._debug import (
BoxedMaybeException,
)
from .pytest import (
tractor_test as tractor_test
)
@ -98,12 +101,13 @@ async def expect_ctxc(
'''
if yay:
try:
yield
yield (maybe_exc := BoxedMaybeException())
raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled:
except tractor.ContextCancelled as ctxc:
maybe_exc.value = ctxc
if reraise:
raise
else:
return
else:
yield
yield (maybe_exc := BoxedMaybeException())

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level design patterns, APIs and runtime extensions built on top
of the `tractor` runtime core.
'''
from ._service import (
open_service_mngr as open_service_mngr,
get_service_mngr as get_service_mngr,
ServiceMngr as ServiceMngr,
)

View File

@ -0,0 +1,592 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Daemon subactor as service(s) management and supervision primitives
and API.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import tractor
import trio
from trio import TaskStatus
from tractor import (
log,
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
)
log = log.get_logger('tractor')
# TODO: implement a `@singleton` deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# -[ ] go through the options peeps on SO did?
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
# * including @mikenerone's answer
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
#
# -[ ] put it in `tractor.lowlevel._globals` ?
# * fits with our oustanding actor-local/global feat req?
# |_ https://github.com/goodboy/tractor/issues/55
# * how can it relate to the `Actor.lifetime_stack` that was
# silently patched in?
# |_ we could implicitly call both of these in the same
# spot in the runtime using the lifetime stack?
# - `open_singleton_cm().__exit__()`
# -`del_singleton()`
# |_ gives SC fixtue semantics to sync code oriented around
# sub-process lifetime?
# * what about with `trio.RunVar`?
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
# - which we'll need for no-GIL cpython (right?) presuming
# multiple `trio.run()` calls in process?
#
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# a deletion API for explicit instance de-allocation?
# @open_service_mngr.deleter
# def del_service_mngr() -> None:
# mngr = open_service_mngr._singleton[0]
# open_service_mngr._singleton[0] = None
# del mngr
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
#
# > "The default value is evaluated only once. This makes
# a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes"
#
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open an actor-global "service-manager" for supervising a tree
of subactors and/or actor-global tasks.
The delivered `ServiceMngr` is singleton instance for each
actor-process, that is, allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'an': an,
'tn': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.an = an
mngr.tn = tn
else:
assert (mngr.an and mngr.tn)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
an: ActorNursery
tn: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
# TODO, unify this interface with our `TaskManager` PR!
#
#
async def start_service_task(
self,
name: str,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
fn: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Any,
trio.Event,
]:
async def _task_manager_start(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
task_cs = trio.CancelScope()
task_complete = trio.Event()
with task_cs as cs:
task_status.started((
cs,
task_complete,
))
try:
await fn()
except trio.Cancelled as taskc:
log.cancel(
f'Service task for `{name}` was cancelled!\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
)
raise taskc
finally:
task_complete.set()
(
cs,
complete,
) = await self.tn.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (
cs,
complete,
)
return (
cs,
complete,
)
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
ctx_fn: Callable,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Context,
Any,
]:
'''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.tn.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
name=name,
ctx_fn=ctx_fn,
portal=portal,
**ctx_kwargs,
)
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return (
cs,
ipc_ctx,
started,
)
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
# ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False,
**start_actor_kwargs,
) -> Context:
'''
Start new subactor and schedule a supervising "service task"
in it which explicitly defines the sub's lifetime.
"Service daemon subactors" are cancelled (and thus
terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
'''
entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.an.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**start_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(
cs,
sub_ctx,
started,
) = await self.start_service_ctx(
name=daemon_name,
portal=portal,
ctx_fn=ctx_ep,
**ctx_kwargs,
)
return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'

View File

@ -110,33 +110,11 @@ class PldRx(Struct):
# TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel
_pld_dec: MsgDec
_ctx: Context|None = None
_ipc: Context|MsgStream|None = None
@property
def pld_dec(self) -> MsgDec:
return self._pld_dec
# TODO: a better name?
# -[ ] when would this be used as it avoids needingn to pass the
# ipc prim to every method
@cm
def wraps_ipc(
self,
ipc_prim: Context|MsgStream,
) -> PldRx:
'''
Apply this payload receiver to an IPC primitive type, one
of `Context` or `MsgStream`.
'''
self._ipc = ipc_prim
try:
yield self
finally:
self._ipc = None
@cm
def limit_plds(
self,
@ -169,7 +147,7 @@ class PldRx(Struct):
def dec(self) -> msgpack.Decoder:
return self._pld_dec.dec
def recv_pld_nowait(
def recv_msg_nowait(
self,
# TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream,
@ -180,34 +158,95 @@ class PldRx(Struct):
hide_tb: bool = False,
**dec_pld_kwargs,
) -> Any|Raw:
) -> tuple[
MsgType[PayloadT],
PayloadT,
]:
'''
Attempt to non-blocking receive a message from the `._rx_chan` and
unwrap it's payload delivering the pair to the caller.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = (
ipc_msg
or
# sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait()
)
return self.decode_pld(
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**dec_pld_kwargs,
)
return (
msg,
pld,
)
async def recv_msg(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: ONLY for handling `Stop`-msgs that arrive during
# a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**decode_pld_kwargs,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode its payload, and
return the (msg, pld) pair.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
match msg:
case Return()|Error():
log.runtime(
f'Rxed final outcome msg\n'
f'{msg}\n'
)
case Stop():
log.runtime(
f'Rxed stream stopped msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs:
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**decode_pld_kwargs,
)
return (
msg,
pld,
)
async def recv_pld(
self,
ipc: Context|MsgStream,
ipc_msg: MsgType|None = None,
ipc_msg: MsgType[PayloadT]|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
**dec_pld_kwargs,
) -> Any|Raw:
) -> PayloadT:
'''
Receive a `MsgType`, then decode and return its `.pld` field.
@ -219,6 +258,13 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive()
)
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld(
msg=msg,
ipc=ipc,
@ -407,45 +453,6 @@ class PldRx(Struct):
__tracebackhide__: bool = False
raise
dec_msg = decode_pld
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
hide_tb: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode it's payload, and return
the pair of refs.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs:
match msg:
case Stop():
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.decode_pld(
msg,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**kwargs,
)
return msg, pld
@cm
def limit_plds(
@ -538,8 +545,8 @@ async def maybe_limit_plds(
async def drain_to_final_msg(
ctx: Context,
hide_tb: bool = True,
msg_limit: int = 6,
hide_tb: bool = True,
) -> tuple[
Return|None,
@ -568,8 +575,8 @@ async def drain_to_final_msg(
even after ctx closure and the `.open_context()` block exit.
'''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns
parent_never_opened_stream: bool = ctx._stream is None
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
@ -578,13 +585,14 @@ async def drain_to_final_msg(
result_msg: Return|Error|None = None
while not (
ctx.maybe_error
and not ctx._final_result_is_set()
and
not ctx._final_result_is_set()
):
try:
# receive all msgs, scanning for either a final result
# or error; the underlying call should never raise any
# remote error directly!
msg, pld = await ctx._pld_rx.recv_msg_w_pld(
msg, pld = await ctx._pld_rx.recv_msg(
ipc=ctx,
expect_msg=Return,
raise_error=False,
@ -631,6 +639,11 @@ async def drain_to_final_msg(
)
__tracebackhide__: bool = False
else:
log.cancel(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
@ -662,17 +675,24 @@ async def drain_to_final_msg(
case Yield():
pre_result_drained.append(msg)
if (
(ctx._stream.closed
and (reason := 'stream was already closed')
)
or (ctx.cancel_acked
and (reason := 'ctx cancelled other side')
)
or (ctx._cancel_called
and (reason := 'ctx called `.cancel()`')
)
or (len(pre_result_drained) > msg_limit
and (reason := f'"yield" limit={msg_limit}')
not parent_never_opened_stream
and (
(ctx._stream.closed
and
(reason := 'stream was already closed')
) or
(ctx.cancel_acked
and
(reason := 'ctx cancelled other side')
)
or (ctx._cancel_called
and
(reason := 'ctx called `.cancel()`')
)
or (len(pre_result_drained) > msg_limit
and
(reason := f'"yield" limit={msg_limit}')
)
)
):
log.cancel(
@ -690,7 +710,7 @@ async def drain_to_final_msg(
# drain up to the `msg_limit` hoping to get
# a final result or error/ctxc.
else:
log.warning(
report: str = (
'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
@ -699,6 +719,14 @@ async def drain_to_final_msg(
f'{pretty_struct.pformat(msg)}\n'
)
if parent_never_opened_stream:
report = (
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
f'\n'
# f'{ctx}\n'
) + report
log.warning(report)
continue
# stream terminated, but no result yet..
@ -790,6 +818,7 @@ async def drain_to_final_msg(
f'{ctx.outcome}\n'
)
__tracebackhide__: bool = hide_tb
return (
result_msg,
pre_result_drained,

View File

@ -22,7 +22,7 @@ https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html
from __future__ import annotations
from abc import abstractmethod
from collections import deque
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager as acm
from functools import partial
from operator import ne
from typing import (
@ -398,7 +398,7 @@ class BroadcastReceiver(ReceiveChannel):
return await self._receive_from_underlying(key, state)
@asynccontextmanager
@acm
async def subscribe(
self,
raise_on_lag: bool = True,

View File

@ -0,0 +1,322 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Erlang-style (ish) "one-cancels-one" nursery.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from functools import partial
from typing import (
Generator,
Any,
)
from outcome import (
Outcome,
acapture,
)
from msgspec import Struct
import trio
from trio._core._run import (
Task,
CancelScope,
Nursery,
)
class TaskOutcome(Struct):
'''
The outcome of a scheduled ``trio`` task which includes an interface
for synchronizing to the completion of the task's runtime and access
to the eventual boxed result/value or raised exception.
'''
lowlevel_task: Task
_exited = trio.Event() # as per `trio.Runner.task_exited()`
_outcome: Outcome | None = None # as per `outcome.Outcome`
_result: Any | None = None # the eventual maybe-returned-value
@property
def result(self) -> Any:
'''
Either Any or None depending on whether the Outcome has compeleted.
'''
if self._outcome is None:
raise RuntimeError(
f'Task {self.lowlevel_task.name} is not complete.\n'
'First wait on `await TaskOutcome.wait_for_result()`!'
)
return self._result
def _set_outcome(
self,
outcome: Outcome,
):
'''
Set the ``Outcome`` for this task.
This method should only ever be called by the task's supervising
nursery implemenation.
'''
self._outcome = outcome
self._result = outcome.unwrap()
self._exited.set()
async def wait_for_result(self) -> Any:
'''
Unwind the underlying task's ``Outcome`` by async waiting for
the task to first complete and then unwrap it's result-value.
'''
if self._exited.is_set():
return self._result
await self._exited.wait()
out = self._outcome
if out is None:
raise ValueError(f'{out} is not an outcome!?')
return self.result
class TaskManagerNursery(Struct):
_n: Nursery
_scopes: dict[
Task,
tuple[CancelScope, Outcome]
] = {}
task_manager: Generator[Any, Outcome, None] | None = None
async def start_soon(
self,
async_fn,
*args,
name=None,
task_manager: Generator[Any, Outcome, None] | None = None
) -> tuple[CancelScope, Task]:
# NOTE: internals of a nursery don't let you know what
# the most recently spawned task is by order.. so we'd
# have to either change that or do set ops.
# pre_start_tasks: set[Task] = n._children.copy()
# new_tasks = n._children - pre_start_Tasks
# assert len(new_tasks) == 1
# task = new_tasks.pop()
n: Nursery = self._n
sm = self.task_manager
# we do default behavior of a scope-per-nursery
# if the user did not provide a task manager.
if sm is None:
return n.start_soon(async_fn, *args, name=None)
new_task: Task | None = None
to_return: tuple[Any] | None = None
# NOTE: what do we enforce as a signature for the
# `@task_scope_manager` here?
mngr = sm(nursery=n)
async def _start_wrapped_in_scope(
task_status: TaskStatus[
tuple[CancelScope, Task]
] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: this was working before?! and, do we need something
# like it to implement `.start()`?
# nonlocal to_return
# execute up to the first yield
try:
to_return: tuple[Any] = next(mngr)
except StopIteration:
raise RuntimeError("task manager didn't yield") from None
# TODO: how do we support `.start()` style?
# - relay through whatever the
# started task passes back via `.started()` ?
# seems like that won't work with also returning
# a "task handle"?
# - we were previously binding-out this `to_return` to
# the parent's lexical scope, why isn't that working
# now?
task_status.started(to_return)
# invoke underlying func now that cs is entered.
outcome = await acapture(async_fn, *args)
# execute from the 1st yield to return and expect
# generator-mngr `@task_scope_manager` thinger to
# terminate!
try:
mngr.send(outcome)
# I would presume it's better to have a handle to
# the `Outcome` entirely? This method sends *into*
# the mngr this `Outcome.value`; seems like kinda
# weird semantics for our purposes?
# outcome.send(mngr)
except StopIteration:
return
else:
raise RuntimeError(f"{mngr} didn't stop!")
to_return = await n.start(_start_wrapped_in_scope)
assert to_return is not None
# TODO: use the fancy type-check-time type signature stuff from
# mypy i guess..to like, relay the type of whatever the
# generator yielded through? betcha that'll be un-grokable XD
return to_return
# TODO: define a decorator to runtime type check that this a generator
# with a single yield that also delivers a value (of some std type) from
# the yield expression?
# @trio.task_manager
def add_task_handle_and_crash_handling(
nursery: Nursery,
debug_mode: bool = False,
) -> Generator[
Any,
Outcome,
None,
]:
'''
A customizable, user defined "task scope manager".
With this specially crafted single-yield generator function you can
add more granular controls around every task spawned by `trio` B)
'''
# if you need it you can ask trio for the task obj
task: Task = trio.lowlevel.current_task()
print(f'Spawning task: {task.name}')
# User defined "task handle" for more granular supervision
# of each spawned task as needed for their particular usage.
task_outcome = TaskOutcome(task)
# NOTE: if wanted the user could wrap the output task handle however
# they want!
# class TaskHandle(Struct):
# task: Task
# cs: CancelScope
# outcome: TaskOutcome
# this yields back when the task is terminated, cancelled or returns.
try:
with CancelScope() as cs:
# the yielded value(s) here are what are returned to the
# nursery's `.start_soon()` caller B)
lowlevel_outcome: Outcome = yield (task_outcome, cs)
task_outcome._set_outcome(lowlevel_outcome)
# Adds "crash handling" from `pdbp` by entering
# a REPL on std errors.
except Exception as err:
print(f'{task.name} crashed, entering debugger!')
if debug_mode:
import pdbp
pdbp.xpm()
raise
finally:
print(f'{task.name} Exitted')
@acm
async def open_nursery(
task_manager: Generator[Any, Outcome, None] | None = None,
**lowlevel_nursery_kwargs,
):
async with trio.open_nursery(**lowlevel_nursery_kwargs) as nurse:
yield TaskManagerNursery(
nurse,
task_manager=task_manager,
)
async def sleep_then_return_val(val: str):
await trio.sleep(0.2)
return val
async def ensure_cancelled():
try:
await trio.sleep_forever()
except trio.Cancelled:
task = trio.lowlevel.current_task()
print(f'heyyo ONLY {task.name} was cancelled as expected B)')
assert 0
except BaseException:
raise RuntimeError("woa woa woa this ain't right!")
if __name__ == '__main__':
async def main():
async with open_nursery(
task_manager=partial(
add_task_handle_and_crash_handling,
debug_mode=True,
),
) as sn:
for _ in range(3):
outcome, _ = await sn.start_soon(trio.sleep_forever)
# extra task we want to engage in debugger post mortem.
err_outcome, cs = await sn.start_soon(ensure_cancelled)
val: str = 'yoyoyo'
val_outcome, _ = await sn.start_soon(
sleep_then_return_val,
val,
)
res = await val_outcome.wait_for_result()
assert res == val
print(f'{res} -> GOT EXPECTED TASK VALUE')
await trio.sleep(0.6)
print(
f'Cancelling and waiting on {err_outcome.lowlevel_task} '
'to CRASH..'
)
cs.cancel()
trio.run(main)