Compare commits

...

20 Commits

Author SHA1 Message Date
Tyler Goodlet 840c328f19 Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2024-12-11 14:24:49 -05:00
Tyler Goodlet 46dbe6d2fc Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2024-12-11 12:38:35 -05:00
Tyler Goodlet f08e888138 Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2024-12-10 14:44:09 -05:00
Tyler Goodlet 441cf0962d TOSQUASH: 9002f60 howtorelease.md file 2024-12-10 14:43:39 -05:00
Tyler Goodlet fb04f74605 Draft a (pretty)`Struct.fields_diff()`
For comparing a `msgspec.Struct` against an input `dict` presumably to
be used as input for struct instantiation. The main diff with
`.__sub__()` is that non-existing fields on either are reported
(loudly).
2024-12-10 14:12:37 -05:00
Tyler Goodlet aa1f6fa4b5 Spitballing how to expose custom `msgspec` type hooks
Such that maybe we can eventually offer a nicer higher-level API which
implements much of the boilerplate required by `msgspec` (like
type-matched branching to serialization logic) via a type-table
interface or something?

Not sure if the idea is that useful so leaving it all as TODOs for now
obviously.
2024-12-09 21:09:48 -05:00
Tyler Goodlet 9002f608ee Add `notes_to_self/howtorelease.md` reminder doc 2024-12-09 18:14:11 -05:00
Tyler Goodlet 8ebc022535 Add TODO for a runtime-vars passing mechanism 2024-12-09 18:12:22 -05:00
Tyler Goodlet e26fa8330f Change masked `.pause()` line 2024-12-09 18:04:32 -05:00
Tyler Goodlet a2659069c5 Type the inter-loop chans 2024-12-09 17:37:32 -05:00
Tyler Goodlet 54699d7a0b Denoise duplicate chan logging for now 2024-12-09 17:36:52 -05:00
Tyler Goodlet b91ab9e3a8 Add TODO for a tb frame "filterer" sys.. 2024-12-09 17:14:51 -05:00
Tyler Goodlet cd14c4fe72 Set `RemoteActorError.pformat(boxer_header=self.relay_uid)` by def 2024-12-09 16:57:57 -05:00
Tyler Goodlet ad40fcd2bc Support custom `boxer_header: str` provided by `pformat_boxed_tb()` caller 2024-12-09 16:57:22 -05:00
Tyler Goodlet 508ba510a5 Expose a `_ctlc_ignore_header: str` for use in `sigint_shield()` 2024-12-09 16:56:30 -05:00
Tyler Goodlet b875b35b98 Change `tractor.breakpoint()` to new `.pause()` in test suite 2024-12-09 16:08:55 -05:00
Tyler Goodlet 46ddc214cd Wrap `asyncio_bp.py` ex into test suite
Ensuring we can at least use `breakpoint()` from an infected actor's
`asyncio.Task` spawned via a `.to_asyncio` API.

Also includes a little `tests/devx/` reorging,
- start splitting out non-`tractor.pause()` tests into a new
  `test_pause_from_non_trio.py` for all the `.pause_from_sync()`
  use in bg-threaded or `asyncio` applications.
- factor harness commonalities to the `devx/conftest` (namely
  the `do_ctlc()` masher).
- mv `test_pause_from_sync` to the new non`-trio` mod.

NOTE, the `ctlc=True` is still failing for
`test_pause_from_asyncio_task` which is a user-happiness bug but not
anything fundamentally broken - just need to handle the `asyncio` case
in `.devx._debug.sigint_shield()`!
2024-12-09 15:38:28 -05:00
Tyler Goodlet b3ee20d3b9 Add `breakpoint()` hook restoration example + test 2024-12-05 20:56:39 -05:00
Tyler Goodlet cf3e6c1218 Rename `n: trio.Nursery` -> `tn` (task nursery) 2024-12-04 14:01:38 -05:00
Tyler Goodlet 8af9b0201d Messy-teardown `DebugStatus` related fixes
Mostly fixing edge cases with `asyncio` and/or bg threads where the
`.repl_release: trio.Event` needs to be used from the main `trio`
thread OW confusing-but-valid teardown tracebacks can show under various
races.

Also improve,
- log reporting for such internal bugs to make them more obvious on
  console via `log.exception()`.
- only restore the SIGINT handler when runtime is (still) active.
- reporting when `tractor.pause(shield=True)` should be used and
  unhiding the internal frames from the tb in that case.
- for `pause_from_sync()` some deep fixes..
 |_add a `allow_no_runtime: bool = False` flag to allow
   **not** requiring the actor runtime to be active.
 |_fix the `greenback` case-branch to only trigger on `not
   is_trio_thread`.
 |_add a scope-global `repl_owner: Task|Thread|None = None` to
   avoid ref errors..
2024-12-03 15:26:25 -05:00
28 changed files with 1396 additions and 306 deletions

View File

@ -1,3 +1,8 @@
'''
Examples of using the builtin `breakpoint()` from an `asyncio.Task`
running in a subactor spawned with `infect_asyncio=True`.
'''
import asyncio
import trio
@ -26,15 +31,16 @@ async def bp_then_error(
# NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `._debug._set_trace()` but
# we set `Lock.local_task_in_debug = 'sync'`, we probably want
# some further, at least, meta-data about the task/actoq in debug
# in terms of making it clear it's asyncio mucking about.
# some further, at least, meta-data about the task/actor in debug
# in terms of making it clear it's `asyncio` mucking about.
breakpoint()
# short checkpoint / delay
await asyncio.sleep(0.5)
await asyncio.sleep(0.5) # asyncio-side
if raise_after_bp:
raise ValueError('blah')
raise ValueError('asyncio side error!')
# TODO: test case with this so that it gets cancelled?
else:
@ -46,7 +52,7 @@ async def bp_then_error(
@tractor.context
async def trio_ctx(
ctx: tractor.Context,
bp_before_started: bool = True,
bp_before_started: bool = False,
):
# this will block until the ``asyncio`` task sends a "first"
@ -55,19 +61,19 @@ async def trio_ctx(
to_asyncio.open_channel_from(
bp_then_error,
raise_after_bp=not bp_before_started,
# raise_after_bp=not bp_before_started,
) as (first, chan),
trio.open_nursery() as n,
trio.open_nursery() as tn,
):
assert first == 'start'
if bp_before_started:
await tractor.breakpoint()
await tractor.pause()
await ctx.started(first)
await ctx.started(first) # trio-side
n.start_soon(
tn.start_soon(
to_asyncio.run_task,
aio_sleep_forever,
)
@ -77,14 +83,18 @@ async def trio_ctx(
async def main(
bps_all_over: bool = True,
# TODO, WHICH OF THESE HAZ BUGZ?
cancel_from_root: bool = False,
err_from_root: bool = False,
) -> None:
async with tractor.open_nursery(
debug_mode=True,
maybe_enable_greenback=True,
# loglevel='devx',
) as n:
ptl: Portal = await n.start_actor(
) as an:
ptl: Portal = await an.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
@ -99,12 +109,18 @@ async def main(
assert first == 'start'
if bps_all_over:
await tractor.breakpoint()
# pause in parent to ensure no cross-actor
# locking problems exist!
await tractor.pause()
if cancel_from_root:
await ctx.cancel()
if err_from_root:
assert 0
else:
await trio.sleep_forever()
# await trio.sleep_forever()
await ctx.cancel()
assert 0
# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?

View File

@ -1,5 +1,5 @@
'''
Fast fail test with a context.
Fast fail test with a `Context`.
Ensure the partially initialized sub-actor process
doesn't cause a hang on error/cancel of the parent

View File

@ -7,7 +7,7 @@ async def breakpoint_forever():
try:
while True:
yield 'yo'
await tractor.breakpoint()
await tractor.pause()
except BaseException:
tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!'

View File

@ -10,7 +10,7 @@ async def name_error():
async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
while True:
await tractor.breakpoint()
await tractor.pause()
# NOTE: if the test never sent 'q'/'quit' commands
# on the pdb repl, without this checkpoint line the

View File

@ -6,7 +6,7 @@ async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
while True:
await trio.sleep(0.1)
await tractor.breakpoint()
await tractor.pause()
async def name_error():

View File

@ -6,19 +6,46 @@ import tractor
async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
# intially unset, no entry.
orig_pybp_var: int = os.environ.get('PYTHONBREAKPOINT')
assert orig_pybp_var in {None, "0"}
async with tractor.open_nursery(
debug_mode=True,
) as an:
assert an
assert (
(pybp_var := os.environ['PYTHONBREAKPOINT'])
==
'tractor.devx._debug._sync_pause_from_builtin'
)
# TODO: an assert that verifies the hook has indeed been, hooked
# XD
assert sys.breakpointhook is not tractor._debug._set_trace
assert (
(pybp_hook := sys.breakpointhook)
is not tractor.devx._debug._set_trace
)
print(
f'$PYTHONOBREAKPOINT: {pybp_var!r}\n'
f'`sys.breakpointhook`: {pybp_hook!r}\n'
)
breakpoint()
pass # first bp, tractor hook set.
# TODO: an assert that verifies the hook is unhooked..
# XXX AFTER EXIT (of actor-runtime) verify the hook is unset..
#
# YES, this is weird but it's how stdlib docs say to do it..
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
assert os.environ.get('PYTHONBREAKPOINT') is orig_pybp_var
assert sys.breakpointhook
# now ensure a regular builtin pause still works
breakpoint()
pass # last bp, stdlib hook restored
if __name__ == '__main__':
trio.run(main)

View File

@ -10,7 +10,7 @@ async def main():
await trio.sleep(0.1)
await tractor.breakpoint()
await tractor.pause()
await trio.sleep(0.1)

View File

@ -11,7 +11,7 @@ async def main(
# loglevel='runtime',
):
while True:
await tractor.breakpoint()
await tractor.pause()
if __name__ == '__main__':

View File

@ -4,9 +4,9 @@ import trio
async def gen():
yield 'yo'
await tractor.breakpoint()
await tractor.pause()
yield 'yo'
await tractor.breakpoint()
await tractor.pause()
@tractor.context
@ -15,7 +15,7 @@ async def just_bp(
) -> None:
await ctx.started()
await tractor.breakpoint()
await tractor.pause()
# TODO: bps and errors in this call..
async for val in gen():

View File

@ -0,0 +1,18 @@
First generate a built disti:
```
python -m pip install --upgrade build
python -m build --sdist --outdir dist/alpha5/
```
Then try a test ``pypi`` upload:
```
python -m twine upload --repository testpypi dist/alpha5/*
```
The push to `pypi` for realz.
```
python -m twine upload --repository testpypi dist/alpha5/*
```

View File

@ -2,6 +2,7 @@
`tractor.devx.*` tooling sub-pkg test space.
'''
import time
from typing import (
Callable,
)
@ -11,9 +12,19 @@ from pexpect.exceptions import (
TIMEOUT,
)
from pexpect.spawnbase import SpawnBase
from tractor._testing import (
mk_cmd,
)
from tractor.devx._debug import (
_pause_msg as _pause_msg,
_crash_msg as _crash_msg,
_repl_fail_msg as _repl_fail_msg,
_ctlc_ignore_header as _ctlc_ignore_header,
)
from conftest import (
_ci_env,
)
@pytest.fixture
@ -107,6 +118,9 @@ def expect(
raise
PROMPT = r"\(Pdb\+\)"
def in_prompt_msg(
child: SpawnBase,
parts: list[str],
@ -166,3 +180,40 @@ def assert_before(
err_on_false=True,
**kwargs
)
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> str|None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
time.sleep(delay)
child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
# return the console content up to the final prompt
return before

View File

@ -21,14 +21,13 @@ from pexpect.exceptions import (
EOF,
)
from tractor.devx._debug import (
from .conftest import (
do_ctlc,
PROMPT,
_pause_msg,
_crash_msg,
_repl_fail_msg,
)
from conftest import (
_ci_env,
)
from .conftest import (
expect,
in_prompt_msg,
@ -70,9 +69,6 @@ has_nested_actors = pytest.mark.has_nested_actors
# )
PROMPT = r"\(Pdb\+\)"
@pytest.mark.parametrize(
'user_in_out',
[
@ -123,8 +119,10 @@ def test_root_actor_error(
ids=lambda item: f'{item[0]} -> {item[1]}',
)
def test_root_actor_bp(spawn, user_in_out):
"""Demonstrate breakpoint from in root actor.
"""
'''
Demonstrate breakpoint from in root actor.
'''
user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint')
@ -146,43 +144,6 @@ def test_root_actor_bp(spawn, user_in_out):
assert expect_err_str in str(child.before)
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> str|None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
time.sleep(delay)
child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
# return the console content up to the final prompt
return before
def test_root_actor_bp_forever(
spawn,
ctlc: bool,
@ -919,138 +880,6 @@ def test_different_debug_mode_per_actor(
)
def test_pause_from_sync(
spawn,
ctlc: bool
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
[
# pre-prompt line
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
assert not in_prompt_msg(
child,
['`greenback` portal opened!'],
)
# should be same root task
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
conts: int = 0 # for debugging below matching logic on failure
while attach_patts:
child.sendline('c')
conts += 1
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts:
if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg]
+
expected_patts
)
break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
child.sendline('c')
child.expect(EOF)
def test_post_mortem_api(
spawn,
ctlc: bool,

View File

@ -0,0 +1,329 @@
'''
That "foreign loop/thread" debug REPL support better ALSO WORK!
Same as `test_native_pause.py`.
All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually.
'''
# from functools import partial
# import itertools
import time
# from typing import (
# Iterator,
# )
import pytest
from pexpect.exceptions import (
# TIMEOUT,
EOF,
)
from .conftest import (
# _ci_env,
do_ctlc,
PROMPT,
# expect,
in_prompt_msg,
assert_before,
_pause_msg,
_crash_msg,
_ctlc_ignore_header,
# _repl_fail_msg,
)
def test_pause_from_sync(
spawn,
ctlc: bool,
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
[
# pre-prompt line
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
assert not in_prompt_msg(
child,
['`greenback` portal opened!'],
)
# should be same root task
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
conts: int = 0 # for debugging below matching logic on failure
while attach_patts:
child.sendline('c')
conts += 1
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts:
if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg]
+
expected_patts
)
break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
child.sendline('c')
child.expect(EOF)
def expect_any_of(
attach_patts: dict[str, list[str]],
child, # what type?
ctlc: bool = False,
prompt: str = _ctlc_ignore_header,
ctlc_delay: float = .4,
) -> list[str]:
'''
Receive any of a `list[str]` of patterns provided in
`attach_patts`.
Used to test racing prompts from multiple actors and/or
tasks using a common root process' `pdbp` REPL.
'''
assert attach_patts
child.expect(PROMPT)
before = str(child.before.decode())
for attach_key in attach_patts:
if attach_key in before:
expected_patts: str = attach_patts.pop(attach_key)
assert_before(
child,
expected_patts
)
break # from for
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=prompt,
# NOTE same as comment above
delay=ctlc_delay,
)
return expected_patts
# yield child
def test_pause_from_asyncio_task(
spawn,
ctlc: bool
# ^TODO, fix for `asyncio`!!
):
'''
Verify we can use the `pdbp` REPL from an `asyncio.Task` spawned using
APIs in `.to_asyncio`.
`examples/debugging/asycio_bp.py`
'''
child = spawn('asyncio_bp')
# RACE on whether trio/asyncio task bps first
attach_patts: dict[str, list[str]] = {
# first pause in guest-mode (aka "infecting")
# `trio.Task`.
'trio-side': [
_pause_msg,
"<Task 'trio_ctx'",
"('aio_daemon'",
],
# `breakpoint()` from `asyncio.Task`.
'asyncio-side': [
_pause_msg,
"<Task pending name='Task-2' coro=<greenback_shim()",
"('aio_daemon'",
],
}
while attach_patts:
expect_any_of(
attach_patts=attach_patts,
child=child,
ctlc=ctlc,
)
child.sendline('c')
# NOW in race order,
# - the asyncio-task will error
# - the root-actor parent task will pause
#
attach_patts: dict[str, list[str]] = {
# error raised in `asyncio.Task`
"raise ValueError('asyncio side error!')": [
_crash_msg,
'return await chan.receive()', # `.to_asyncio` impl internals in tb
"<Task 'trio_ctx'",
"@ ('aio_daemon'",
"ValueError: asyncio side error!",
],
# parent-side propagation via actor-nursery/portal
# "tractor._exceptions.RemoteActorError: remote task raised a 'ValueError'": [
"remote task raised a 'ValueError'": [
_crash_msg,
"src_uid=('aio_daemon'",
"('aio_daemon'",
],
# a final pause in root-actor
"<Task '__main__.main'": [
_pause_msg,
"<Task '__main__.main'",
"('root'",
],
}
while attach_patts:
expect_any_of(
attach_patts=attach_patts,
child=child,
ctlc=ctlc,
)
child.sendline('c')
assert not attach_patts
# final boxed error propagates to root
assert_before(
child,
[
_crash_msg,
"<Task '__main__.main'",
"('root'",
"remote task raised a 'ValueError'",
"ValueError: asyncio side error!",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
child.sendline('c')
child.expect(EOF)

View File

@ -955,7 +955,7 @@ async def echo_back_sequence(
)
await ctx.started()
# await tractor.breakpoint()
# await tractor.pause()
async with ctx.open_stream(
msg_buffer_size=msg_buffer_size,

View File

@ -271,7 +271,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
# the faster subtask was cancelled
break
# await tractor.breakpoint()
# await tractor.pause()
# await stream.receive()
print(f'final value: {value}')

View File

@ -1703,15 +1703,28 @@ class Context:
# TODO: expose as mod func instead!
structfmt = pretty_struct.Struct.pformat
if self._in_overrun:
log.warning(
f'Queueing OVERRUN msg on caller task:\n\n'
report: str = (
f'{flow_body}'
f'{structfmt(msg)}\n'
)
over_q: deque = self._overflow_q
self._overflow_q.append(msg)
if len(over_q) == over_q.maxlen:
report = (
'FAILED to queue OVERRUN msg, OVERAN the OVERRUN QUEUE !!\n\n'
+ report
)
# log.error(report)
log.debug(report)
else:
report = (
'Queueing OVERRUN msg on caller task:\n\n'
+ report
)
log.debug(report)
# XXX NOTE XXX
# overrun is the ONLY case where returning early is fine!
return False

View File

@ -609,6 +609,7 @@ class RemoteActorError(Exception):
# just after <Type(
# |___ ..
tb_body_indent=1,
boxer_header=self.relay_uid,
)
tail = ''

View File

@ -95,6 +95,10 @@ async def open_root_actor(
hide_tb: bool = True,
# TODO, a way for actors to augment passing derived
# read-only state to sublayers?
# extra_rt_vars: dict|None = None,
) -> Actor:
'''
Runtime init entry point for ``tractor``.

View File

@ -456,11 +456,14 @@ class Actor:
)
if _pre_chan:
log.warning(
# con_status += (
# ^TODO^ swap once we minimize conn duplication
f' -> Wait, we already have IPC with `{uid_short}`??\n'
f' |_{_pre_chan}\n'
# -[ ] last thing might be reg/unreg runtime reqs?
# log.warning(
log.debug(
f'?Wait?\n'
f'We already have IPC with peer {uid_short!r}\n'
f'|_{_pre_chan}\n'
)
# IPC connection tracking for both peers and new children:

View File

@ -730,6 +730,9 @@ class DebugStatus:
# -[ ] see if we can get our proto oco task-mngr to work for
# this?
repl_task: Task|None = None
# repl_thread: Thread|None = None
# ^TODO?
repl_release: trio.Event|None = None
req_task: Task|None = None
@ -839,11 +842,12 @@ class DebugStatus:
if (
not cls.is_main_trio_thread()
and
# not _state._runtime_vars.get(
# '_is_infected_aio',
# False,
# )
not current_actor().is_infected_aio()
not _state._runtime_vars.get(
'_is_infected_aio',
False,
)
# not current_actor().is_infected_aio()
# ^XXX, since for bg-thr case will always raise..
):
trio.from_thread.run_sync(
signal.signal,
@ -928,12 +932,27 @@ class DebugStatus:
try:
# sometimes the task might already be terminated in
# which case this call will raise an RTE?
if repl_release is not None:
# See below for reporting on that..
if (
repl_release is not None
and
not repl_release.is_set()
):
if cls.is_main_trio_thread():
repl_release.set()
elif current_actor().is_infected_aio():
elif (
_state._runtime_vars.get(
'_is_infected_aio',
False,
)
# ^XXX, again bc we need to not except
# but for bg-thread case it will always raise..
#
# TODO, is there a better api then using
# `err_on_no_runtime=False` in the below?
# current_actor().is_infected_aio()
):
async def _set_repl_release():
repl_release.set()
@ -949,6 +968,15 @@ class DebugStatus:
trio.from_thread.run_sync(
repl_release.set
)
except RuntimeError as rte:
log.exception(
f'Failed to release debug-request ??\n\n'
f'{cls.repr()}\n'
)
# pdbp.set_trace()
raise rte
finally:
# if req_ctx := cls.req_ctx:
# req_ctx._scope.cancel()
@ -976,9 +1004,10 @@ class DebugStatus:
# logging when we don't need to?
cls.repl = None
# restore original sigint handler
cls.unshield_sigint()
# maybe restore original sigint handler
# XXX requires runtime check to avoid crash!
if current_actor(err_on_no_runtime=False):
cls.unshield_sigint()
# TODO: use the new `@lowlevel.singleton` for this!
@ -1066,7 +1095,7 @@ class PdbREPL(pdbp.Pdb):
# Lock.release(raise_on_thread=False)
Lock.release()
# XXX after `Lock.release()` for root local repl usage
# XXX AFTER `Lock.release()` for root local repl usage
DebugStatus.release()
def set_quit(self):
@ -1391,6 +1420,10 @@ def any_connected_locker_child() -> bool:
return False
_ctlc_ignore_header: str = (
'Ignoring SIGINT while debug REPL in use'
)
def sigint_shield(
signum: int,
frame: 'frame', # type: ignore # noqa
@ -1472,7 +1505,9 @@ def sigint_shield(
# NOTE: don't emit this with `.pdb()` level in
# root without a higher level.
log.runtime(
f'Ignoring SIGINT while debug REPL in use by child '
_ctlc_ignore_header
+
f' by child '
f'{uid_in_debug}\n'
)
problem = None
@ -1506,7 +1541,9 @@ def sigint_shield(
# NOTE: since we emit this msg on ctl-c, we should
# also always re-print the prompt the tail block!
log.pdb(
'Ignoring SIGINT while pdb REPL in use by root actor..\n'
_ctlc_ignore_header
+
f' by root actor..\n'
f'{DebugStatus.repl_task}\n'
f' |_{repl}\n'
)
@ -1567,16 +1604,20 @@ def sigint_shield(
repl
):
log.pdb(
f'Ignoring SIGINT while local task using debug REPL\n'
f'|_{repl_task}\n'
f' |_{repl}\n'
_ctlc_ignore_header
+
f' by local task\n\n'
f'{repl_task}\n'
f' |_{repl}\n'
)
elif req_task:
log.debug(
'Ignoring SIGINT while debug request task is open but either,\n'
'- someone else is already REPL-in and has the `Lock`, or\n'
'- some other local task already is replin?\n'
f'|_{req_task}\n'
_ctlc_ignore_header
+
f' by local request-task and either,\n'
f'- someone else is already REPL-in and has the `Lock`, or\n'
f'- some other local task already is replin?\n\n'
f'{req_task}\n'
)
# TODO can we remove this now?
@ -1672,7 +1713,7 @@ class DebugRequestError(RuntimeError):
'''
_repl_fail_msg: str = (
_repl_fail_msg: str|None = (
'Failed to REPl via `_pause()` '
)
@ -1712,6 +1753,7 @@ async def _pause(
'''
__tracebackhide__: bool = hide_tb
pause_err: BaseException|None = None
actor: Actor = current_actor()
try:
task: Task = current_task()
@ -2094,11 +2136,13 @@ async def _pause(
# TODO: prolly factor this plus the similar block from
# `_enter_repl_sync()` into a common @cm?
except BaseException as pause_err:
except BaseException as _pause_err:
pause_err: BaseException = _pause_err
if isinstance(pause_err, bdb.BdbQuit):
log.devx(
'REPL for pdb was quit!\n'
'REPL for pdb was explicitly quit!\n'
)
_repl_fail_msg = None
# when the actor is mid-runtime cancellation the
# `Actor._service_n` might get closed before we can spawn
@ -2117,13 +2161,18 @@ async def _pause(
)
return
else:
log.exception(
_repl_fail_msg
+
f'on behalf of {repl_task} ??\n'
elif isinstance(pause_err, trio.Cancelled):
_repl_fail_msg = (
'You called `tractor.pause()` from an already cancelled scope!\n\n'
'Consider `await tractor.pause(shield=True)` to make it work B)\n'
)
else:
_repl_fail_msg += f'on behalf of {repl_task} ??\n'
if _repl_fail_msg:
log.exception(_repl_fail_msg)
if not actor.is_infected_aio():
DebugStatus.release(cancel_req_task=True)
@ -2152,6 +2201,8 @@ async def _pause(
DebugStatus.req_err
or
repl_err
or
pause_err
):
__tracebackhide__: bool = False
@ -2435,6 +2486,8 @@ def pause_from_sync(
called_from_builtin: bool = False,
api_frame: FrameType|None = None,
allow_no_runtime: bool = False,
# proxy to `._pause()`, for ex:
# shield: bool = False,
# api_frame: FrameType|None = None,
@ -2453,16 +2506,25 @@ def pause_from_sync(
'''
__tracebackhide__: bool = hide_tb
repl_owner: Task|Thread|None = None
try:
actor: tractor.Actor = current_actor(
err_on_no_runtime=False,
)
if not actor:
raise RuntimeError(
'Not inside the `tractor`-runtime?\n'
if (
not actor
and
not allow_no_runtime
):
raise NoRuntime(
'The actor runtime has not been opened?\n\n'
'`tractor.pause_from_sync()` is not functional without a wrapping\n'
'- `async with tractor.open_nursery()` or,\n'
'- `async with tractor.open_root_actor()`\n'
'- `async with tractor.open_root_actor()`\n\n'
'If you are getting this from a builtin `breakpoint()` call\n'
'it might mean the runtime was started then '
'stopped prematurely?\n'
)
message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
@ -2485,6 +2547,7 @@ def pause_from_sync(
repl: PdbREPL = mk_pdb()
# message += f'-> created local REPL {repl}\n'
is_trio_thread: bool = DebugStatus.is_main_trio_thread()
is_root: bool = is_root_process()
is_aio: bool = actor.is_infected_aio()
@ -2500,7 +2563,7 @@ def pause_from_sync(
# thread which will call `._pause()` manually with special
# handling for root-actor caller usage.
if (
not DebugStatus.is_main_trio_thread()
not is_trio_thread
and
not is_aio # see below for this usage
):
@ -2574,7 +2637,11 @@ def pause_from_sync(
DebugStatus.shield_sigint()
assert bg_task is not DebugStatus.repl_task
elif is_aio:
elif (
not is_trio_thread
and
is_aio
):
greenback: ModuleType = maybe_import_greenback()
repl_owner: Task = asyncio.current_task()
DebugStatus.shield_sigint()
@ -2758,9 +2825,11 @@ def _post_mortem(
# ^TODO, instead a nice runtime-info + maddr + uid?
# -[ ] impl a `Actor.__repr()__`??
# |_ <task>:<thread> @ <actor>
# no_runtime: bool = False
except NoRuntime:
actor_repr: str = '<no-actor-runtime?>'
# no_runtime: bool = True
try:
task_repr: Task = current_task()
@ -2796,6 +2865,8 @@ def _post_mortem(
# Since we presume the post-mortem was enaged to a task-ending
# error, we MUST release the local REPL request so that not other
# local task nor the root remains blocked!
# if not no_runtime:
# DebugStatus.release()
DebugStatus.release()
@ -3033,6 +3104,7 @@ async def maybe_wait_for_debugger(
# pass
return False
# TODO: better naming and what additionals?
# - [ ] optional runtime plugging?
# - [ ] detection for sync vs. async code?

View File

@ -234,7 +234,7 @@ def find_caller_info(
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
# TODO: -[x] move all this into new `.devx._code`!
# TODO: -[x] move all this into new `.devx._frame_stack`!
# -[ ] consider rename to _callstack?
# -[ ] prolly create a `@runtime_api` dec?
# |_ @api_frame seems better?
@ -286,3 +286,18 @@ def api_frame(
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
wrapped.__api_func__: bool = True
return wrapper(wrapped)
# TODO: something like this instead of the adhoc frame-unhiding
# blocks all over the runtime!! XD
# -[ ] ideally we can expect a certain error (set) and if something
# else is raised then all frames below the wrapped one will be
# un-hidden via `__tracebackhide__: bool = False`.
# |_ might need to dynamically mutate the code objs like
# `pdbp.hideframe()` does?
# -[ ] use this as a `@acm` decorator as introed in 3.10?
# @acm
# async def unhide_frame_when_not(
# error_set: set[BaseException],
# ) -> TracebackType:
# ...

View File

@ -53,6 +53,7 @@ def pformat_boxed_tb(
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
boxer_header: str = '-'
) -> str:
'''
@ -88,9 +89,9 @@ def pformat_boxed_tb(
tb_box: str = (
f'|\n'
f' ------ - ------\n'
f' ------ {boxer_header} ------\n'
f'{tb_body}'
f' ------ - ------\n'
f' ------ {boxer_header}- ------\n'
f'_|\n'
)
tb_box_indent: str = (

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level design patterns, APIs and runtime extensions built on top
of the `tractor` runtime core.
'''
from ._service import (
open_service_mngr as open_service_mngr,
get_service_mngr as get_service_mngr,
ServiceMngr as ServiceMngr,
)

View File

@ -0,0 +1,596 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Daemon subactor as service(s) management and supervision primitives
and API.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import tractor
import trio
from trio import TaskStatus
from tractor import (
log,
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
)
log = log.get_logger('tractor')
# TODO: implement a `@singleton` deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# -[ ] go through the options peeps on SO did?
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
# * including @mikenerone's answer
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
#
# -[ ] put it in `tractor.lowlevel._globals` ?
# * fits with our oustanding actor-local/global feat req?
# |_ https://github.com/goodboy/tractor/issues/55
# * how can it relate to the `Actor.lifetime_stack` that was
# silently patched in?
# |_ we could implicitly call both of these in the same
# spot in the runtime using the lifetime stack?
# - `open_singleton_cm().__exit__()`
# -`del_singleton()`
# |_ gives SC fixtue semantics to sync code oriented around
# sub-process lifetime?
# * what about with `trio.RunVar`?
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
# - which we'll need for no-GIL cpython (right?) presuming
# multiple `trio.run()` calls in process?
#
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# a deletion API for explicit instance de-allocation?
# @open_service_mngr.deleter
# def del_service_mngr() -> None:
# mngr = open_service_mngr._singleton[0]
# open_service_mngr._singleton[0] = None
# del mngr
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
#
# > "The default value is evaluated only once. This makes
# a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes"
#
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open an actor-global "service-manager" for supervising a tree
of subactors and/or actor-global tasks.
The delivered `ServiceMngr` is singleton instance for each
actor-process, that is, allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
# TODO, unify this interface with our `TaskManager` PR!
#
#
async def start_service_task(
self,
name: str,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
fn: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Any,
trio.Event,
]:
async def _task_manager_start(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
task_cs = trio.CancelScope()
task_complete = trio.Event()
with task_cs as cs:
task_status.started((
cs,
task_complete,
))
try:
await fn()
except trio.Cancelled as taskc:
log.cancel(
f'Service task for `{name}` was cancelled!\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
)
raise taskc
finally:
task_complete.set()
(
cs,
complete,
) = await self.service_n.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (
cs,
complete,
)
return (
cs,
complete,
)
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
ctx_fn: Callable,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Context,
Any,
]:
'''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.service_n.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
name=name,
ctx_fn=ctx_fn,
portal=portal,
**ctx_kwargs,
)
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return (
cs,
ipc_ctx,
started,
)
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
# ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False,
**start_actor_kwargs,
) -> Context:
'''
Start new subactor and schedule a supervising "service task"
in it which explicitly defines the sub's lifetime.
"Service daemon subactors" are cancelled (and thus
terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
'''
entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**start_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(
cs,
sub_ctx,
started,
) = await self.start_service_ctx(
name=daemon_name,
portal=portal,
ctx_fn=ctx_ep,
**ctx_kwargs,
)
return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'

View File

@ -41,8 +41,10 @@ import textwrap
from typing import (
Any,
Callable,
Protocol,
Type,
TYPE_CHECKING,
TypeVar,
Union,
)
from types import ModuleType
@ -181,7 +183,11 @@ def mk_dec(
dec_hook: Callable|None = None,
) -> MsgDec:
'''
Create an IPC msg decoder, normally used as the
`PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`.
'''
return MsgDec(
_dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]`
@ -227,6 +233,13 @@ def pformat_msgspec(
join_char: str = '\n',
) -> str:
'''
Pretty `str` format the `msgspec.msgpack.Decoder.type` attribute
for display in (console) log messages as a nice (maybe multiline)
presentation of all supported `Struct`s (subtypes) available for
typed decoding.
'''
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
return join_char.join(
mk_msgspec_table(
@ -630,31 +643,57 @@ def limit_msg_spec(
# # import pdbp; pdbp.set_trace()
# assert ext_codec.pld_spec == extended_spec
# yield ext_codec
#
# ^-TODO-^ is it impossible to make something like this orr!?
# TODO: make an auto-custom hook generator from a set of input custom
# types?
# -[ ] below is a proto design using a `TypeCodec` idea?
#
# type var for the expected interchange-lib's
# IPC-transport type when not available as a built-in
# serialization output.
WireT = TypeVar('WireT')
# TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort?
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
# and then call `.to_dict()` on them?
# -[x] we're going to need to re-impl all the stuff changed in the
# runtime port such that it can handle dicts or `Msg`s?
#
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# '''
# Deliver a `enc_hook()`/`dec_hook()` pair which does
# manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation.
#
# Note: this is is/was primarly used while moving the core
# runtime over to using native `Msg`-struct types wherein we
# start with the send side emitting without loading
# a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has
# been adjusted appropriately.
#
# '''
# return (
# # enc_to_dict,
# dec_from_dict,
# )
# TODO: some kinda (decorator) API for built-in subtypes
# that builds this implicitly by inspecting the `mro()`?
class TypeCodec(Protocol):
'''
A per-custom-type wire-transport serialization translator
description type.
'''
src_type: Type
wire_type: WireT
def encode(obj: Type) -> WireT:
...
def decode(
obj_type: Type[WireT],
obj: WireT,
) -> Type:
...
class MsgpackTypeCodec(TypeCodec):
...
def mk_codec_hooks(
type_codecs: list[TypeCodec],
) -> tuple[Callable, Callable]:
'''
Deliver a `enc_hook()`/`dec_hook()` pair which handle
manual convertion from an input `Type` set such that whenever
the `TypeCodec.filter()` predicate matches the
`TypeCodec.decode()` is called on the input native object by
the `dec_hook()` and whenever the
`isiinstance(obj, TypeCodec.type)` matches against an
`enc_hook(obj=obj)` the return value is taken from a
`TypeCodec.encode(obj)` callback.
'''
...

View File

@ -30,9 +30,9 @@ from msgspec import (
Struct as _Struct,
structs,
)
from pprint import (
saferepr,
)
# from pprint import (
# saferepr,
# )
from tractor.log import get_logger
@ -75,8 +75,8 @@ class DiffDump(UserList):
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
f' |_{repr(left)},\n'
f' |_{repr(right)},\n'
')\n'
)
repstr += ']\n'
@ -144,15 +144,22 @@ def pformat(
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
else:
val_str: str = repr(v)
# XXX LOL, below just seems to be f#$%in causing
# recursion errs..
#
# the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
try:
val_str: str = saferepr(v)
except Exception:
log.exception(
'Failed to `saferepr({type(struct)})` !?\n'
)
return _Struct.__repr__(struct)
# try:
# val_str: str = saferepr(v)
# except Exception:
# log.exception(
# 'Failed to `saferepr({type(struct)})` !?\n'
# )
# raise
# return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -203,12 +210,7 @@ class Struct(
return sin_props
pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def __repr__(self) -> str:
try:
return pformat(self)
@ -218,6 +220,13 @@ class Struct(
)
return _Struct.__repr__(self)
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def copy(
self,
update: dict | None = None,
@ -267,13 +276,15 @@ class Struct(
fi.type(getattr(self, fi.name)),
)
# TODO: make a mod func instead and just point to it here for
# method impl?
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
Compare fields/items key-wise and return a `DiffDump`
for easy visual REPL comparison B)
'''
@ -290,3 +301,42 @@ class Struct(
))
return diffs
@classmethod
def fields_diff(
cls,
other: dict|Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Very similar to `PrettyStruct.__sub__()` except accepts an
input `other: dict` (presumably that would normally be called
like `Struct(**other)`) which returns a `DiffDump` of the
fields of the struct and the `dict`'s fields.
'''
nullish = object()
consumed: dict = other.copy()
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(cls):
field_name: str = fi.name
# ours: Any = getattr(self, field_name)
theirs: Any = consumed.pop(field_name, nullish)
if theirs is nullish:
diffs.append((
field_name,
f'{fi.type!r}',
'NOT-DEFINED in `other: dict`',
))
# when there are lingering fields in `other` that this struct
# DOES NOT define we also append those.
if consumed:
for k, v in consumed.items():
diffs.append((
k,
f'NOT-DEFINED for `{cls.__name__}`',
f'`other: dict` has value = {v!r}',
))
return diffs

View File

@ -245,14 +245,14 @@ def _run_asyncio_task(
result != orig and
aio_err is None and
# in the ``open_channel_from()`` case we don't
# in the `open_channel_from()` case we don't
# relay through the "return value".
not provide_channels
):
to_trio.send_nowait(result)
finally:
# if the task was spawned using ``open_channel_from()``
# if the task was spawned using `open_channel_from()`
# then we close the channels on exit.
if provide_channels:
# only close the sender side which will relay
@ -500,7 +500,7 @@ async def run_task(
'''
# simple async func
chan = _run_asyncio_task(
chan: LinkedTaskChannel = _run_asyncio_task(
func,
qsize=1,
**kwargs,
@ -530,7 +530,7 @@ async def open_channel_from(
spawned ``asyncio`` task and ``trio``.
'''
chan = _run_asyncio_task(
chan: LinkedTaskChannel = _run_asyncio_task(
target,
qsize=2**8,
provide_channels=True,

View File

@ -382,7 +382,7 @@ class BroadcastReceiver(ReceiveChannel):
# likely it makes sense to unwind back to the
# underlying?
# import tractor
# await tractor.breakpoint()
# await tractor.pause()
log.warning(
f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?'