Compare commits

..

1 Commits

Author SHA1 Message Date
Tyler Goodlet 9eb7741822 Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2024-11-27 20:29:16 -05:00
28 changed files with 327 additions and 1285 deletions

View File

@ -1,8 +1,3 @@
'''
Examples of using the builtin `breakpoint()` from an `asyncio.Task`
running in a subactor spawned with `infect_asyncio=True`.
'''
import asyncio import asyncio
import trio import trio
@ -31,16 +26,15 @@ async def bp_then_error(
# NOTE: what happens here inside the hook needs some refinement.. # NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `._debug._set_trace()` but # => seems like it's still `._debug._set_trace()` but
# we set `Lock.local_task_in_debug = 'sync'`, we probably want # we set `Lock.local_task_in_debug = 'sync'`, we probably want
# some further, at least, meta-data about the task/actor in debug # some further, at least, meta-data about the task/actoq in debug
# in terms of making it clear it's `asyncio` mucking about. # in terms of making it clear it's asyncio mucking about.
breakpoint() breakpoint()
# short checkpoint / delay # short checkpoint / delay
await asyncio.sleep(0.5) # asyncio-side await asyncio.sleep(0.5)
if raise_after_bp: if raise_after_bp:
raise ValueError('asyncio side error!') raise ValueError('blah')
# TODO: test case with this so that it gets cancelled? # TODO: test case with this so that it gets cancelled?
else: else:
@ -52,7 +46,7 @@ async def bp_then_error(
@tractor.context @tractor.context
async def trio_ctx( async def trio_ctx(
ctx: tractor.Context, ctx: tractor.Context,
bp_before_started: bool = False, bp_before_started: bool = True,
): ):
# this will block until the ``asyncio`` task sends a "first" # this will block until the ``asyncio`` task sends a "first"
@ -61,19 +55,19 @@ async def trio_ctx(
to_asyncio.open_channel_from( to_asyncio.open_channel_from(
bp_then_error, bp_then_error,
# raise_after_bp=not bp_before_started, raise_after_bp=not bp_before_started,
) as (first, chan), ) as (first, chan),
trio.open_nursery() as tn, trio.open_nursery() as n,
): ):
assert first == 'start' assert first == 'start'
if bp_before_started: if bp_before_started:
await tractor.pause() await tractor.breakpoint()
await ctx.started(first) # trio-side await ctx.started(first)
tn.start_soon( n.start_soon(
to_asyncio.run_task, to_asyncio.run_task,
aio_sleep_forever, aio_sleep_forever,
) )
@ -83,18 +77,14 @@ async def trio_ctx(
async def main( async def main(
bps_all_over: bool = True, bps_all_over: bool = True,
# TODO, WHICH OF THESE HAZ BUGZ?
cancel_from_root: bool = False,
err_from_root: bool = False,
) -> None: ) -> None:
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
maybe_enable_greenback=True, maybe_enable_greenback=True,
# loglevel='devx', # loglevel='devx',
) as an: ) as n:
ptl: Portal = await an.start_actor( ptl: Portal = await n.start_actor(
'aio_daemon', 'aio_daemon',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
@ -109,18 +99,12 @@ async def main(
assert first == 'start' assert first == 'start'
# pause in parent to ensure no cross-actor if bps_all_over:
# locking problems exist! await tractor.breakpoint()
await tractor.pause()
if cancel_from_root: # await trio.sleep_forever()
await ctx.cancel() await ctx.cancel()
if err_from_root:
assert 0 assert 0
else:
await trio.sleep_forever()
# TODO: case where we cancel from trio-side while asyncio task # TODO: case where we cancel from trio-side while asyncio task
# has debugger lock? # has debugger lock?

View File

@ -1,5 +1,5 @@
''' '''
Fast fail test with a `Context`. Fast fail test with a context.
Ensure the partially initialized sub-actor process Ensure the partially initialized sub-actor process
doesn't cause a hang on error/cancel of the parent doesn't cause a hang on error/cancel of the parent

View File

@ -7,7 +7,7 @@ async def breakpoint_forever():
try: try:
while True: while True:
yield 'yo' yield 'yo'
await tractor.pause() await tractor.breakpoint()
except BaseException: except BaseException:
tractor.log.get_console_log().exception( tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!' 'Cancelled while trying to enter pause point!'

View File

@ -10,7 +10,7 @@ async def name_error():
async def breakpoint_forever(): async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor." "Indefinitely re-enter debugger in child actor."
while True: while True:
await tractor.pause() await tractor.breakpoint()
# NOTE: if the test never sent 'q'/'quit' commands # NOTE: if the test never sent 'q'/'quit' commands
# on the pdb repl, without this checkpoint line the # on the pdb repl, without this checkpoint line the

View File

@ -6,7 +6,7 @@ async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor." "Indefinitely re-enter debugger in child actor."
while True: while True:
await trio.sleep(0.1) await trio.sleep(0.1)
await tractor.pause() await tractor.breakpoint()
async def name_error(): async def name_error():

View File

@ -6,46 +6,19 @@ import tractor
async def main() -> None: async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
# intially unset, no entry. assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
orig_pybp_var: int = os.environ.get('PYTHONBREAKPOINT')
assert orig_pybp_var in {None, "0"}
async with tractor.open_nursery(
debug_mode=True,
) as an:
assert an
assert (
(pybp_var := os.environ['PYTHONBREAKPOINT'])
==
'tractor.devx._debug._sync_pause_from_builtin'
)
# TODO: an assert that verifies the hook has indeed been, hooked # TODO: an assert that verifies the hook has indeed been, hooked
# XD # XD
assert ( assert sys.breakpointhook is not tractor._debug._set_trace
(pybp_hook := sys.breakpointhook)
is not tractor.devx._debug._set_trace
)
print(
f'$PYTHONOBREAKPOINT: {pybp_var!r}\n'
f'`sys.breakpointhook`: {pybp_hook!r}\n'
)
breakpoint() breakpoint()
pass # first bp, tractor hook set.
# XXX AFTER EXIT (of actor-runtime) verify the hook is unset.. # TODO: an assert that verifies the hook is unhooked..
#
# YES, this is weird but it's how stdlib docs say to do it..
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
assert os.environ.get('PYTHONBREAKPOINT') is orig_pybp_var
assert sys.breakpointhook assert sys.breakpointhook
# now ensure a regular builtin pause still works
breakpoint() breakpoint()
pass # last bp, stdlib hook restored
if __name__ == '__main__': if __name__ == '__main__':
trio.run(main) trio.run(main)

View File

@ -10,7 +10,7 @@ async def main():
await trio.sleep(0.1) await trio.sleep(0.1)
await tractor.pause() await tractor.breakpoint()
await trio.sleep(0.1) await trio.sleep(0.1)

View File

@ -11,7 +11,7 @@ async def main(
# loglevel='runtime', # loglevel='runtime',
): ):
while True: while True:
await tractor.pause() await tractor.breakpoint()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -4,9 +4,9 @@ import trio
async def gen(): async def gen():
yield 'yo' yield 'yo'
await tractor.pause() await tractor.breakpoint()
yield 'yo' yield 'yo'
await tractor.pause() await tractor.breakpoint()
@tractor.context @tractor.context
@ -15,7 +15,7 @@ async def just_bp(
) -> None: ) -> None:
await ctx.started() await ctx.started()
await tractor.pause() await tractor.breakpoint()
# TODO: bps and errors in this call.. # TODO: bps and errors in this call..
async for val in gen(): async for val in gen():

View File

@ -1,18 +0,0 @@
First generate a built disti:
```
python -m pip install --upgrade build
python -m build --sdist --outdir dist/alpha5/
```
Then try a test ``pypi`` upload:
```
python -m twine upload --repository testpypi dist/alpha5/*
```
The push to `pypi` for realz.
```
python -m twine upload --repository testpypi dist/alpha5/*
```

View File

@ -2,7 +2,6 @@
`tractor.devx.*` tooling sub-pkg test space. `tractor.devx.*` tooling sub-pkg test space.
''' '''
import time
from typing import ( from typing import (
Callable, Callable,
) )
@ -12,19 +11,9 @@ from pexpect.exceptions import (
TIMEOUT, TIMEOUT,
) )
from pexpect.spawnbase import SpawnBase from pexpect.spawnbase import SpawnBase
from tractor._testing import ( from tractor._testing import (
mk_cmd, mk_cmd,
) )
from tractor.devx._debug import (
_pause_msg as _pause_msg,
_crash_msg as _crash_msg,
_repl_fail_msg as _repl_fail_msg,
_ctlc_ignore_header as _ctlc_ignore_header,
)
from conftest import (
_ci_env,
)
@pytest.fixture @pytest.fixture
@ -118,9 +107,6 @@ def expect(
raise raise
PROMPT = r"\(Pdb\+\)"
def in_prompt_msg( def in_prompt_msg(
child: SpawnBase, child: SpawnBase,
parts: list[str], parts: list[str],
@ -180,40 +166,3 @@ def assert_before(
err_on_false=True, err_on_false=True,
**kwargs **kwargs
) )
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> str|None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
time.sleep(delay)
child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
# return the console content up to the final prompt
return before

View File

@ -21,13 +21,14 @@ from pexpect.exceptions import (
EOF, EOF,
) )
from .conftest import ( from tractor.devx._debug import (
do_ctlc,
PROMPT,
_pause_msg, _pause_msg,
_crash_msg, _crash_msg,
_repl_fail_msg, _repl_fail_msg,
) )
from conftest import (
_ci_env,
)
from .conftest import ( from .conftest import (
expect, expect,
in_prompt_msg, in_prompt_msg,
@ -69,6 +70,9 @@ has_nested_actors = pytest.mark.has_nested_actors
# ) # )
PROMPT = r"\(Pdb\+\)"
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -119,10 +123,8 @@ def test_root_actor_error(
ids=lambda item: f'{item[0]} -> {item[1]}', ids=lambda item: f'{item[0]} -> {item[1]}',
) )
def test_root_actor_bp(spawn, user_in_out): def test_root_actor_bp(spawn, user_in_out):
''' """Demonstrate breakpoint from in root actor.
Demonstrate breakpoint from in root actor. """
'''
user_input, expect_err_str = user_in_out user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint') child = spawn('root_actor_breakpoint')
@ -144,6 +146,43 @@ def test_root_actor_bp(spawn, user_in_out):
assert expect_err_str in str(child.before) assert expect_err_str in str(child.before)
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> str|None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
time.sleep(delay)
child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
# return the console content up to the final prompt
return before
def test_root_actor_bp_forever( def test_root_actor_bp_forever(
spawn, spawn,
ctlc: bool, ctlc: bool,
@ -880,6 +919,138 @@ def test_different_debug_mode_per_actor(
) )
def test_pause_from_sync(
spawn,
ctlc: bool
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
[
# pre-prompt line
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
assert not in_prompt_msg(
child,
['`greenback` portal opened!'],
)
# should be same root task
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
conts: int = 0 # for debugging below matching logic on failure
while attach_patts:
child.sendline('c')
conts += 1
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts:
if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg]
+
expected_patts
)
break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
child.sendline('c')
child.expect(EOF)
def test_post_mortem_api( def test_post_mortem_api(
spawn, spawn,
ctlc: bool, ctlc: bool,

View File

@ -1,329 +0,0 @@
'''
That "foreign loop/thread" debug REPL support better ALSO WORK!
Same as `test_native_pause.py`.
All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually.
'''
# from functools import partial
# import itertools
import time
# from typing import (
# Iterator,
# )
import pytest
from pexpect.exceptions import (
# TIMEOUT,
EOF,
)
from .conftest import (
# _ci_env,
do_ctlc,
PROMPT,
# expect,
in_prompt_msg,
assert_before,
_pause_msg,
_crash_msg,
_ctlc_ignore_header,
# _repl_fail_msg,
)
def test_pause_from_sync(
spawn,
ctlc: bool,
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
[
# pre-prompt line
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
assert not in_prompt_msg(
child,
['`greenback` portal opened!'],
)
# should be same root task
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
conts: int = 0 # for debugging below matching logic on failure
while attach_patts:
child.sendline('c')
conts += 1
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts:
if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg]
+
expected_patts
)
break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
child.sendline('c')
child.expect(EOF)
def expect_any_of(
attach_patts: dict[str, list[str]],
child, # what type?
ctlc: bool = False,
prompt: str = _ctlc_ignore_header,
ctlc_delay: float = .4,
) -> list[str]:
'''
Receive any of a `list[str]` of patterns provided in
`attach_patts`.
Used to test racing prompts from multiple actors and/or
tasks using a common root process' `pdbp` REPL.
'''
assert attach_patts
child.expect(PROMPT)
before = str(child.before.decode())
for attach_key in attach_patts:
if attach_key in before:
expected_patts: str = attach_patts.pop(attach_key)
assert_before(
child,
expected_patts
)
break # from for
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=prompt,
# NOTE same as comment above
delay=ctlc_delay,
)
return expected_patts
# yield child
def test_pause_from_asyncio_task(
spawn,
ctlc: bool
# ^TODO, fix for `asyncio`!!
):
'''
Verify we can use the `pdbp` REPL from an `asyncio.Task` spawned using
APIs in `.to_asyncio`.
`examples/debugging/asycio_bp.py`
'''
child = spawn('asyncio_bp')
# RACE on whether trio/asyncio task bps first
attach_patts: dict[str, list[str]] = {
# first pause in guest-mode (aka "infecting")
# `trio.Task`.
'trio-side': [
_pause_msg,
"<Task 'trio_ctx'",
"('aio_daemon'",
],
# `breakpoint()` from `asyncio.Task`.
'asyncio-side': [
_pause_msg,
"<Task pending name='Task-2' coro=<greenback_shim()",
"('aio_daemon'",
],
}
while attach_patts:
expect_any_of(
attach_patts=attach_patts,
child=child,
ctlc=ctlc,
)
child.sendline('c')
# NOW in race order,
# - the asyncio-task will error
# - the root-actor parent task will pause
#
attach_patts: dict[str, list[str]] = {
# error raised in `asyncio.Task`
"raise ValueError('asyncio side error!')": [
_crash_msg,
'return await chan.receive()', # `.to_asyncio` impl internals in tb
"<Task 'trio_ctx'",
"@ ('aio_daemon'",
"ValueError: asyncio side error!",
],
# parent-side propagation via actor-nursery/portal
# "tractor._exceptions.RemoteActorError: remote task raised a 'ValueError'": [
"remote task raised a 'ValueError'": [
_crash_msg,
"src_uid=('aio_daemon'",
"('aio_daemon'",
],
# a final pause in root-actor
"<Task '__main__.main'": [
_pause_msg,
"<Task '__main__.main'",
"('root'",
],
}
while attach_patts:
expect_any_of(
attach_patts=attach_patts,
child=child,
ctlc=ctlc,
)
child.sendline('c')
assert not attach_patts
# final boxed error propagates to root
assert_before(
child,
[
_crash_msg,
"<Task '__main__.main'",
"('root'",
"remote task raised a 'ValueError'",
"ValueError: asyncio side error!",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
child.sendline('c')
child.expect(EOF)

View File

@ -955,7 +955,7 @@ async def echo_back_sequence(
) )
await ctx.started() await ctx.started()
# await tractor.pause() # await tractor.breakpoint()
async with ctx.open_stream( async with ctx.open_stream(
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,

View File

@ -271,7 +271,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
# the faster subtask was cancelled # the faster subtask was cancelled
break break
# await tractor.pause() # await tractor.breakpoint()
# await stream.receive() # await stream.receive()
print(f'final value: {value}') print(f'final value: {value}')

View File

@ -1703,28 +1703,15 @@ class Context:
# TODO: expose as mod func instead! # TODO: expose as mod func instead!
structfmt = pretty_struct.Struct.pformat structfmt = pretty_struct.Struct.pformat
if self._in_overrun: if self._in_overrun:
report: str = ( log.warning(
f'Queueing OVERRUN msg on caller task:\n\n'
f'{flow_body}' f'{flow_body}'
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
over_q: deque = self._overflow_q
self._overflow_q.append(msg) self._overflow_q.append(msg)
if len(over_q) == over_q.maxlen:
report = (
'FAILED to queue OVERRUN msg, OVERAN the OVERRUN QUEUE !!\n\n'
+ report
)
# log.error(report)
log.debug(report)
else:
report = (
'Queueing OVERRUN msg on caller task:\n\n'
+ report
)
log.debug(report)
# XXX NOTE XXX # XXX NOTE XXX
# overrun is the ONLY case where returning early is fine! # overrun is the ONLY case where returning early is fine!
return False return False

View File

@ -609,7 +609,6 @@ class RemoteActorError(Exception):
# just after <Type( # just after <Type(
# |___ .. # |___ ..
tb_body_indent=1, tb_body_indent=1,
boxer_header=self.relay_uid,
) )
tail = '' tail = ''

View File

@ -95,10 +95,6 @@ async def open_root_actor(
hide_tb: bool = True, hide_tb: bool = True,
# TODO, a way for actors to augment passing derived
# read-only state to sublayers?
# extra_rt_vars: dict|None = None,
) -> Actor: ) -> Actor:
''' '''
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.

View File

@ -456,13 +456,10 @@ class Actor:
) )
if _pre_chan: if _pre_chan:
log.warning(
# con_status += ( # con_status += (
# ^TODO^ swap once we minimize conn duplication # ^TODO^ swap once we minimize conn duplication
# -[ ] last thing might be reg/unreg runtime reqs? f' -> Wait, we already have IPC with `{uid_short}`??\n'
# log.warning(
log.debug(
f'?Wait?\n'
f'We already have IPC with peer {uid_short!r}\n'
f' |_{_pre_chan}\n' f' |_{_pre_chan}\n'
) )

View File

@ -730,9 +730,6 @@ class DebugStatus:
# -[ ] see if we can get our proto oco task-mngr to work for # -[ ] see if we can get our proto oco task-mngr to work for
# this? # this?
repl_task: Task|None = None repl_task: Task|None = None
# repl_thread: Thread|None = None
# ^TODO?
repl_release: trio.Event|None = None repl_release: trio.Event|None = None
req_task: Task|None = None req_task: Task|None = None
@ -842,12 +839,11 @@ class DebugStatus:
if ( if (
not cls.is_main_trio_thread() not cls.is_main_trio_thread()
and and
not _state._runtime_vars.get( # not _state._runtime_vars.get(
'_is_infected_aio', # '_is_infected_aio',
False, # False,
) # )
# not current_actor().is_infected_aio() not current_actor().is_infected_aio()
# ^XXX, since for bg-thr case will always raise..
): ):
trio.from_thread.run_sync( trio.from_thread.run_sync(
signal.signal, signal.signal,
@ -932,27 +928,12 @@ class DebugStatus:
try: try:
# sometimes the task might already be terminated in # sometimes the task might already be terminated in
# which case this call will raise an RTE? # which case this call will raise an RTE?
# See below for reporting on that.. if repl_release is not None:
if (
repl_release is not None
and
not repl_release.is_set()
):
if cls.is_main_trio_thread(): if cls.is_main_trio_thread():
repl_release.set() repl_release.set()
elif ( elif current_actor().is_infected_aio():
_state._runtime_vars.get(
'_is_infected_aio',
False,
)
# ^XXX, again bc we need to not except
# but for bg-thread case it will always raise..
#
# TODO, is there a better api then using
# `err_on_no_runtime=False` in the below?
# current_actor().is_infected_aio()
):
async def _set_repl_release(): async def _set_repl_release():
repl_release.set() repl_release.set()
@ -968,15 +949,6 @@ class DebugStatus:
trio.from_thread.run_sync( trio.from_thread.run_sync(
repl_release.set repl_release.set
) )
except RuntimeError as rte:
log.exception(
f'Failed to release debug-request ??\n\n'
f'{cls.repr()}\n'
)
# pdbp.set_trace()
raise rte
finally: finally:
# if req_ctx := cls.req_ctx: # if req_ctx := cls.req_ctx:
# req_ctx._scope.cancel() # req_ctx._scope.cancel()
@ -1004,12 +976,11 @@ class DebugStatus:
# logging when we don't need to? # logging when we don't need to?
cls.repl = None cls.repl = None
# maybe restore original sigint handler # restore original sigint handler
# XXX requires runtime check to avoid crash!
if current_actor(err_on_no_runtime=False):
cls.unshield_sigint() cls.unshield_sigint()
# TODO: use the new `@lowlevel.singleton` for this! # TODO: use the new `@lowlevel.singleton` for this!
def get_debug_req() -> DebugStatus|None: def get_debug_req() -> DebugStatus|None:
return DebugStatus return DebugStatus
@ -1095,7 +1066,7 @@ class PdbREPL(pdbp.Pdb):
# Lock.release(raise_on_thread=False) # Lock.release(raise_on_thread=False)
Lock.release() Lock.release()
# XXX AFTER `Lock.release()` for root local repl usage # XXX after `Lock.release()` for root local repl usage
DebugStatus.release() DebugStatus.release()
def set_quit(self): def set_quit(self):
@ -1420,10 +1391,6 @@ def any_connected_locker_child() -> bool:
return False return False
_ctlc_ignore_header: str = (
'Ignoring SIGINT while debug REPL in use'
)
def sigint_shield( def sigint_shield(
signum: int, signum: int,
frame: 'frame', # type: ignore # noqa frame: 'frame', # type: ignore # noqa
@ -1505,9 +1472,7 @@ def sigint_shield(
# NOTE: don't emit this with `.pdb()` level in # NOTE: don't emit this with `.pdb()` level in
# root without a higher level. # root without a higher level.
log.runtime( log.runtime(
_ctlc_ignore_header f'Ignoring SIGINT while debug REPL in use by child '
+
f' by child '
f'{uid_in_debug}\n' f'{uid_in_debug}\n'
) )
problem = None problem = None
@ -1541,9 +1506,7 @@ def sigint_shield(
# NOTE: since we emit this msg on ctl-c, we should # NOTE: since we emit this msg on ctl-c, we should
# also always re-print the prompt the tail block! # also always re-print the prompt the tail block!
log.pdb( log.pdb(
_ctlc_ignore_header 'Ignoring SIGINT while pdb REPL in use by root actor..\n'
+
f' by root actor..\n'
f'{DebugStatus.repl_task}\n' f'{DebugStatus.repl_task}\n'
f' |_{repl}\n' f' |_{repl}\n'
) )
@ -1604,20 +1567,16 @@ def sigint_shield(
repl repl
): ):
log.pdb( log.pdb(
_ctlc_ignore_header f'Ignoring SIGINT while local task using debug REPL\n'
+ f'|_{repl_task}\n'
f' by local task\n\n'
f'{repl_task}\n'
f' |_{repl}\n' f' |_{repl}\n'
) )
elif req_task: elif req_task:
log.debug( log.debug(
_ctlc_ignore_header 'Ignoring SIGINT while debug request task is open but either,\n'
+ '- someone else is already REPL-in and has the `Lock`, or\n'
f' by local request-task and either,\n' '- some other local task already is replin?\n'
f'- someone else is already REPL-in and has the `Lock`, or\n' f'|_{req_task}\n'
f'- some other local task already is replin?\n\n'
f'{req_task}\n'
) )
# TODO can we remove this now? # TODO can we remove this now?
@ -1713,7 +1672,7 @@ class DebugRequestError(RuntimeError):
''' '''
_repl_fail_msg: str|None = ( _repl_fail_msg: str = (
'Failed to REPl via `_pause()` ' 'Failed to REPl via `_pause()` '
) )
@ -1753,7 +1712,6 @@ async def _pause(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
pause_err: BaseException|None = None
actor: Actor = current_actor() actor: Actor = current_actor()
try: try:
task: Task = current_task() task: Task = current_task()
@ -2136,13 +2094,11 @@ async def _pause(
# TODO: prolly factor this plus the similar block from # TODO: prolly factor this plus the similar block from
# `_enter_repl_sync()` into a common @cm? # `_enter_repl_sync()` into a common @cm?
except BaseException as _pause_err: except BaseException as pause_err:
pause_err: BaseException = _pause_err
if isinstance(pause_err, bdb.BdbQuit): if isinstance(pause_err, bdb.BdbQuit):
log.devx( log.devx(
'REPL for pdb was explicitly quit!\n' 'REPL for pdb was quit!\n'
) )
_repl_fail_msg = None
# when the actor is mid-runtime cancellation the # when the actor is mid-runtime cancellation the
# `Actor._service_n` might get closed before we can spawn # `Actor._service_n` might get closed before we can spawn
@ -2161,17 +2117,12 @@ async def _pause(
) )
return return
elif isinstance(pause_err, trio.Cancelled):
_repl_fail_msg = (
'You called `tractor.pause()` from an already cancelled scope!\n\n'
'Consider `await tractor.pause(shield=True)` to make it work B)\n'
)
else: else:
_repl_fail_msg += f'on behalf of {repl_task} ??\n' log.exception(
_repl_fail_msg
if _repl_fail_msg: +
log.exception(_repl_fail_msg) f'on behalf of {repl_task} ??\n'
)
if not actor.is_infected_aio(): if not actor.is_infected_aio():
DebugStatus.release(cancel_req_task=True) DebugStatus.release(cancel_req_task=True)
@ -2201,8 +2152,6 @@ async def _pause(
DebugStatus.req_err DebugStatus.req_err
or or
repl_err repl_err
or
pause_err
): ):
__tracebackhide__: bool = False __tracebackhide__: bool = False
@ -2486,8 +2435,6 @@ def pause_from_sync(
called_from_builtin: bool = False, called_from_builtin: bool = False,
api_frame: FrameType|None = None, api_frame: FrameType|None = None,
allow_no_runtime: bool = False,
# proxy to `._pause()`, for ex: # proxy to `._pause()`, for ex:
# shield: bool = False, # shield: bool = False,
# api_frame: FrameType|None = None, # api_frame: FrameType|None = None,
@ -2506,25 +2453,16 @@ def pause_from_sync(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
repl_owner: Task|Thread|None = None
try: try:
actor: tractor.Actor = current_actor( actor: tractor.Actor = current_actor(
err_on_no_runtime=False, err_on_no_runtime=False,
) )
if ( if not actor:
not actor raise RuntimeError(
and 'Not inside the `tractor`-runtime?\n'
not allow_no_runtime
):
raise NoRuntime(
'The actor runtime has not been opened?\n\n'
'`tractor.pause_from_sync()` is not functional without a wrapping\n' '`tractor.pause_from_sync()` is not functional without a wrapping\n'
'- `async with tractor.open_nursery()` or,\n' '- `async with tractor.open_nursery()` or,\n'
'- `async with tractor.open_root_actor()`\n\n' '- `async with tractor.open_root_actor()`\n'
'If you are getting this from a builtin `breakpoint()` call\n'
'it might mean the runtime was started then '
'stopped prematurely?\n'
) )
message: str = ( message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n' f'{actor.uid} task called `tractor.pause_from_sync()`\n'
@ -2547,7 +2485,6 @@ def pause_from_sync(
repl: PdbREPL = mk_pdb() repl: PdbREPL = mk_pdb()
# message += f'-> created local REPL {repl}\n' # message += f'-> created local REPL {repl}\n'
is_trio_thread: bool = DebugStatus.is_main_trio_thread()
is_root: bool = is_root_process() is_root: bool = is_root_process()
is_aio: bool = actor.is_infected_aio() is_aio: bool = actor.is_infected_aio()
@ -2563,7 +2500,7 @@ def pause_from_sync(
# thread which will call `._pause()` manually with special # thread which will call `._pause()` manually with special
# handling for root-actor caller usage. # handling for root-actor caller usage.
if ( if (
not is_trio_thread not DebugStatus.is_main_trio_thread()
and and
not is_aio # see below for this usage not is_aio # see below for this usage
): ):
@ -2637,11 +2574,7 @@ def pause_from_sync(
DebugStatus.shield_sigint() DebugStatus.shield_sigint()
assert bg_task is not DebugStatus.repl_task assert bg_task is not DebugStatus.repl_task
elif ( elif is_aio:
not is_trio_thread
and
is_aio
):
greenback: ModuleType = maybe_import_greenback() greenback: ModuleType = maybe_import_greenback()
repl_owner: Task = asyncio.current_task() repl_owner: Task = asyncio.current_task()
DebugStatus.shield_sigint() DebugStatus.shield_sigint()
@ -2825,11 +2758,9 @@ def _post_mortem(
# ^TODO, instead a nice runtime-info + maddr + uid? # ^TODO, instead a nice runtime-info + maddr + uid?
# -[ ] impl a `Actor.__repr()__`?? # -[ ] impl a `Actor.__repr()__`??
# |_ <task>:<thread> @ <actor> # |_ <task>:<thread> @ <actor>
# no_runtime: bool = False
except NoRuntime: except NoRuntime:
actor_repr: str = '<no-actor-runtime?>' actor_repr: str = '<no-actor-runtime?>'
# no_runtime: bool = True
try: try:
task_repr: Task = current_task() task_repr: Task = current_task()
@ -2865,8 +2796,6 @@ def _post_mortem(
# Since we presume the post-mortem was enaged to a task-ending # Since we presume the post-mortem was enaged to a task-ending
# error, we MUST release the local REPL request so that not other # error, we MUST release the local REPL request so that not other
# local task nor the root remains blocked! # local task nor the root remains blocked!
# if not no_runtime:
# DebugStatus.release()
DebugStatus.release() DebugStatus.release()
@ -3104,7 +3033,6 @@ async def maybe_wait_for_debugger(
# pass # pass
return False return False
# TODO: better naming and what additionals? # TODO: better naming and what additionals?
# - [ ] optional runtime plugging? # - [ ] optional runtime plugging?
# - [ ] detection for sync vs. async code? # - [ ] detection for sync vs. async code?

View File

@ -234,7 +234,7 @@ def find_caller_info(
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
# TODO: -[x] move all this into new `.devx._frame_stack`! # TODO: -[x] move all this into new `.devx._code`!
# -[ ] consider rename to _callstack? # -[ ] consider rename to _callstack?
# -[ ] prolly create a `@runtime_api` dec? # -[ ] prolly create a `@runtime_api` dec?
# |_ @api_frame seems better? # |_ @api_frame seems better?
@ -286,18 +286,3 @@ def api_frame(
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
wrapped.__api_func__: bool = True wrapped.__api_func__: bool = True
return wrapper(wrapped) return wrapper(wrapped)
# TODO: something like this instead of the adhoc frame-unhiding
# blocks all over the runtime!! XD
# -[ ] ideally we can expect a certain error (set) and if something
# else is raised then all frames below the wrapped one will be
# un-hidden via `__tracebackhide__: bool = False`.
# |_ might need to dynamically mutate the code objs like
# `pdbp.hideframe()` does?
# -[ ] use this as a `@acm` decorator as introed in 3.10?
# @acm
# async def unhide_frame_when_not(
# error_set: set[BaseException],
# ) -> TracebackType:
# ...

View File

@ -53,7 +53,6 @@ def pformat_boxed_tb(
tb_box_indent: int|None = None, tb_box_indent: int|None = None,
tb_body_indent: int = 1, tb_body_indent: int = 1,
boxer_header: str = '-'
) -> str: ) -> str:
''' '''
@ -89,9 +88,9 @@ def pformat_boxed_tb(
tb_box: str = ( tb_box: str = (
f'|\n' f'|\n'
f' ------ {boxer_header} ------\n' f' ------ - ------\n'
f'{tb_body}' f'{tb_body}'
f' ------ {boxer_header}- ------\n' f' ------ - ------\n'
f'_|\n' f'_|\n'
) )
tb_box_indent: str = ( tb_box_indent: str = (

View File

@ -1,26 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level design patterns, APIs and runtime extensions built on top
of the `tractor` runtime core.
'''
from ._service import (
open_service_mngr as open_service_mngr,
get_service_mngr as get_service_mngr,
ServiceMngr as ServiceMngr,
)

View File

@ -21,26 +21,18 @@ and API.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, # asynccontextmanager as acm,
# contextmanager as cm, contextmanager as cm,
) )
from collections import defaultdict from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import ( from typing import (
Callable, Callable,
Any, Any,
) )
import tractor
import trio import trio
from trio import TaskStatus from trio import TaskStatus
from tractor import ( from tractor import (
log,
ActorNursery, ActorNursery,
current_actor, current_actor,
ContextCancelled, ContextCancelled,
@ -48,7 +40,9 @@ from tractor import (
Portal, Portal,
) )
log = log.get_logger('tractor') from ._util import (
log, # sub-sys logger
)
# TODO: implement a `@singleton` deco-API for wrapping the below # TODO: implement a `@singleton` deco-API for wrapping the below
@ -99,30 +93,11 @@ log = log.get_logger('tractor')
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API # TODO: singleton factory API instead of a class API
@acm @cm
async def open_service_mngr( def open_service_mngr(
*, *,
debug_mode: bool = False, _singleton: list[ServiceMngr|None] = [None],
# NOTE; since default values for keyword-args are effectively # NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from, # module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
@ -131,466 +106,27 @@ async def open_service_mngr(
# a difference when the default is a mutable object such as # a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes" # a list, dictionary, or instances of most classes"
# #
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs, **init_kwargs,
) -> ServiceMngr: ) -> ServiceMngr:
''' '''
Open an actor-global "service-manager" for supervising a tree Open a multi-subactor-as-service-daemon tree supervisor.
of subactors and/or actor-global tasks.
The delivered `ServiceMngr` is singleton instance for each The delivered `ServiceMngr` is a singleton instance for each
actor-process, that is, allocated on first open and never actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to de-allocated unless explicitly deleted by al call to
`del_service_mngr()`. `del_service_mngr()`.
''' '''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None: if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!') log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs) mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else: else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info( log.info(
'Using extant service mngr!\n\n' 'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
) )
try: with mngr:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
# TODO, unify this interface with our `TaskManager` PR!
#
#
async def start_service_task(
self,
name: str,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
fn: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Any,
trio.Event,
]:
async def _task_manager_start(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
task_cs = trio.CancelScope()
task_complete = trio.Event()
with task_cs as cs:
task_status.started((
cs,
task_complete,
))
try:
await fn()
except trio.Cancelled as taskc:
log.cancel(
f'Service task for `{name}` was cancelled!\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
)
raise taskc
finally:
task_complete.set()
(
cs,
complete,
) = await self.service_n.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (
cs,
complete,
)
return (
cs,
complete,
)
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
ctx_fn: Callable,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Context,
Any,
]:
'''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.service_n.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
name=name,
ctx_fn=ctx_fn,
portal=portal,
**ctx_kwargs,
)
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return (
cs,
ipc_ctx,
started,
)
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
# ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False,
**start_actor_kwargs,
) -> Context:
'''
Start new subactor and schedule a supervising "service task"
in it which explicitly defines the sub's lifetime.
"Service daemon subactors" are cancelled (and thus
terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
'''
entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**start_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(
cs,
sub_ctx,
started,
) = await self.start_service_ctx(
name=daemon_name,
portal=portal,
ctx_fn=ctx_ep,
**ctx_kwargs,
)
return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'

View File

@ -41,10 +41,8 @@ import textwrap
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Protocol,
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
TypeVar,
Union, Union,
) )
from types import ModuleType from types import ModuleType
@ -183,11 +181,7 @@ def mk_dec(
dec_hook: Callable|None = None, dec_hook: Callable|None = None,
) -> MsgDec: ) -> MsgDec:
'''
Create an IPC msg decoder, normally used as the
`PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`.
'''
return MsgDec( return MsgDec(
_dec=msgpack.Decoder( _dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]` type=spec, # like `MsgType[Any]`
@ -233,13 +227,6 @@ def pformat_msgspec(
join_char: str = '\n', join_char: str = '\n',
) -> str: ) -> str:
'''
Pretty `str` format the `msgspec.msgpack.Decoder.type` attribute
for display in (console) log messages as a nice (maybe multiline)
presentation of all supported `Struct`s (subtypes) available for
typed decoding.
'''
dec: msgpack.Decoder = getattr(codec, 'dec', codec) dec: msgpack.Decoder = getattr(codec, 'dec', codec)
return join_char.join( return join_char.join(
mk_msgspec_table( mk_msgspec_table(
@ -643,57 +630,31 @@ def limit_msg_spec(
# # import pdbp; pdbp.set_trace() # # import pdbp; pdbp.set_trace()
# assert ext_codec.pld_spec == extended_spec # assert ext_codec.pld_spec == extended_spec
# yield ext_codec # yield ext_codec
# TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort?
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
# and then call `.to_dict()` on them?
# -[x] we're going to need to re-impl all the stuff changed in the
# runtime port such that it can handle dicts or `Msg`s?
# #
# ^-TODO-^ is it impossible to make something like this orr!? # def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# '''
# TODO: make an auto-custom hook generator from a set of input custom # Deliver a `enc_hook()`/`dec_hook()` pair which does
# types? # manual convertion from our above native `Msg` set
# -[ ] below is a proto design using a `TypeCodec` idea? # to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation.
# #
# type var for the expected interchange-lib's # Note: this is is/was primarly used while moving the core
# IPC-transport type when not available as a built-in # runtime over to using native `Msg`-struct types wherein we
# serialization output. # start with the send side emitting without loading
WireT = TypeVar('WireT') # a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has
# been adjusted appropriately.
# TODO: some kinda (decorator) API for built-in subtypes #
# that builds this implicitly by inspecting the `mro()`? # '''
class TypeCodec(Protocol): # return (
''' # # enc_to_dict,
A per-custom-type wire-transport serialization translator # dec_from_dict,
description type. # )
'''
src_type: Type
wire_type: WireT
def encode(obj: Type) -> WireT:
...
def decode(
obj_type: Type[WireT],
obj: WireT,
) -> Type:
...
class MsgpackTypeCodec(TypeCodec):
...
def mk_codec_hooks(
type_codecs: list[TypeCodec],
) -> tuple[Callable, Callable]:
'''
Deliver a `enc_hook()`/`dec_hook()` pair which handle
manual convertion from an input `Type` set such that whenever
the `TypeCodec.filter()` predicate matches the
`TypeCodec.decode()` is called on the input native object by
the `dec_hook()` and whenever the
`isiinstance(obj, TypeCodec.type)` matches against an
`enc_hook(obj=obj)` the return value is taken from a
`TypeCodec.encode(obj)` callback.
'''
...

View File

@ -30,9 +30,9 @@ from msgspec import (
Struct as _Struct, Struct as _Struct,
structs, structs,
) )
# from pprint import ( from pprint import (
# saferepr, saferepr,
# ) )
from tractor.log import get_logger from tractor.log import get_logger
@ -75,8 +75,8 @@ class DiffDump(UserList):
for k, left, right in self: for k, left, right in self:
repstr += ( repstr += (
f'({k},\n' f'({k},\n'
f' |_{repr(left)},\n' f'\t{repr(left)},\n'
f' |_{repr(right)},\n' f'\t{repr(right)},\n'
')\n' ')\n'
) )
repstr += ']\n' repstr += ']\n'
@ -144,22 +144,15 @@ def pformat(
field_indent=indent + field_indent, field_indent=indent + field_indent,
) )
else: else: # the `pprint` recursion-safe format:
val_str: str = repr(v)
# XXX LOL, below just seems to be f#$%in causing
# recursion errs..
#
# the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
# try: try:
# val_str: str = saferepr(v) val_str: str = saferepr(v)
# except Exception: except Exception:
# log.exception( log.exception(
# 'Failed to `saferepr({type(struct)})` !?\n' 'Failed to `saferepr({type(struct)})` !?\n'
# ) )
# raise return _Struct.__repr__(struct)
# return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -210,7 +203,12 @@ class Struct(
return sin_props return sin_props
pformat = pformat pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def __repr__(self) -> str: def __repr__(self) -> str:
try: try:
return pformat(self) return pformat(self)
@ -220,13 +218,6 @@ class Struct(
) )
return _Struct.__repr__(self) return _Struct.__repr__(self)
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def copy( def copy(
self, self,
update: dict | None = None, update: dict | None = None,
@ -276,15 +267,13 @@ class Struct(
fi.type(getattr(self, fi.name)), fi.type(getattr(self, fi.name)),
) )
# TODO: make a mod func instead and just point to it here for
# method impl?
def __sub__( def __sub__(
self, self,
other: Struct, other: Struct,
) -> DiffDump[tuple[str, Any, Any]]: ) -> DiffDump[tuple[str, Any, Any]]:
''' '''
Compare fields/items key-wise and return a `DiffDump` Compare fields/items key-wise and return a ``DiffDump``
for easy visual REPL comparison B) for easy visual REPL comparison B)
''' '''
@ -301,42 +290,3 @@ class Struct(
)) ))
return diffs return diffs
@classmethod
def fields_diff(
cls,
other: dict|Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Very similar to `PrettyStruct.__sub__()` except accepts an
input `other: dict` (presumably that would normally be called
like `Struct(**other)`) which returns a `DiffDump` of the
fields of the struct and the `dict`'s fields.
'''
nullish = object()
consumed: dict = other.copy()
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(cls):
field_name: str = fi.name
# ours: Any = getattr(self, field_name)
theirs: Any = consumed.pop(field_name, nullish)
if theirs is nullish:
diffs.append((
field_name,
f'{fi.type!r}',
'NOT-DEFINED in `other: dict`',
))
# when there are lingering fields in `other` that this struct
# DOES NOT define we also append those.
if consumed:
for k, v in consumed.items():
diffs.append((
k,
f'NOT-DEFINED for `{cls.__name__}`',
f'`other: dict` has value = {v!r}',
))
return diffs

View File

@ -245,14 +245,14 @@ def _run_asyncio_task(
result != orig and result != orig and
aio_err is None and aio_err is None and
# in the `open_channel_from()` case we don't # in the ``open_channel_from()`` case we don't
# relay through the "return value". # relay through the "return value".
not provide_channels not provide_channels
): ):
to_trio.send_nowait(result) to_trio.send_nowait(result)
finally: finally:
# if the task was spawned using `open_channel_from()` # if the task was spawned using ``open_channel_from()``
# then we close the channels on exit. # then we close the channels on exit.
if provide_channels: if provide_channels:
# only close the sender side which will relay # only close the sender side which will relay
@ -500,7 +500,7 @@ async def run_task(
''' '''
# simple async func # simple async func
chan: LinkedTaskChannel = _run_asyncio_task( chan = _run_asyncio_task(
func, func,
qsize=1, qsize=1,
**kwargs, **kwargs,
@ -530,7 +530,7 @@ async def open_channel_from(
spawned ``asyncio`` task and ``trio``. spawned ``asyncio`` task and ``trio``.
''' '''
chan: LinkedTaskChannel = _run_asyncio_task( chan = _run_asyncio_task(
target, target,
qsize=2**8, qsize=2**8,
provide_channels=True, provide_channels=True,

View File

@ -382,7 +382,7 @@ class BroadcastReceiver(ReceiveChannel):
# likely it makes sense to unwind back to the # likely it makes sense to unwind back to the
# underlying? # underlying?
# import tractor # import tractor
# await tractor.pause() # await tractor.breakpoint()
log.warning( log.warning(
f'Only one sub left for {self}?\n' f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?' 'We can probably unwind from breceiver?'