Compare commits

..

20 Commits

Author SHA1 Message Date
Tyler Goodlet 63d5e65100 Handle cpython builds with `libedit` for `readline`
Since `uv`'s cpython distributions are built this way `pdbp`'s tab
completion was breaking (as was vi-mode). This adds a new
`.devx._enable_readline_feats()` import hook which checks for the
appropriate library and applies settings accordingly.
2025-03-10 12:15:56 -04:00
Tyler Goodlet e9d7f9cff4 Add in some dev deps for @goodboy
Namely since i use `xonsh` for a main shell, this includes adding it as
well as related tooling. Obvi bump the `uv.lock`.

Some other stuff retained from `poetry` days,
- add usage-comments around various (optional) deps.
- add toml section separator lines.
- go with 2-space indent.
- add comment on `trio>0.27` needed for py3.13+
2025-03-10 12:15:56 -04:00
Tyler Goodlet 2a4f0b829b Disable invalid line in `ruff` config? 2025-03-10 12:15:56 -04:00
Tyler Goodlet 1d5521bcbc Add a `ruff.toml` with ignore set taken from old `pyproject.toml` content 2025-03-10 12:15:56 -04:00
Guillermo Rodriguez f9709e5aa2 Migrate to uv using "uvx migrate-to-uv", use msgspec from git due to python 3.13 compat 2025-03-10 12:15:56 -04:00
Tyler Goodlet a1d75625e4 Draft test-doc for "out-of-band" `asyncio.Task`..
Since there's no way to activate `greenback`'s portal in such cases, we
should at least have a test verifying our very loud error about the
inability to support this usage..
2025-03-10 12:14:40 -04:00
Tyler Goodlet 85c60095ba Raise "independent" task errors in an eg
The (rare) condition is heavily detailed in new comments in
the `cancel_trio()` callback but, more or less the idea here is to be
extra pedantic in raising an `Exceptiongroup` of errors from each task
(both `asyncio` and `trio`) whenever the 2 tasks raise "independently"
- in the sense that it's not obviously one side's task causing an error
(or cancellation) in the other. In this case we set the error for each
side on the `LinkedTaskChannel` (via new attrs described later).

As a synopsis, most of this work was refined out of supporting
`infected_aio=True` mode in the **root actor** and in particular as part
of getting that to work inside the `modden` daemon which at the time of
writing was still using the `i3ipc` lib and thus `asyncio`.

Impl deats,
- extend the `LinkedTaskChannel` field/API set (and type it),
  - `._trio_task: trio.Task` for test/user introspection.
- also "stage" some ideas for a more refined interface,
  - `.started()` to deliver the value yielded to the `trio.Task` parent.
   |_ also includes some todos for how to implement this design
      underneath.
  - `._aio_first: Any|None = None` to hold that value ^.
  - `.wait_aio_complete()` for syncing to the asyncio task.
- some detailed logging around "asyncio cancelled trio" case.
- Move `AsyncioCancelled` in this module.

Styling changes,
- generally more explicit var naming.
- some todos for getting modern and fancy with typing..

NB, Let it be known this commit msg was written on a friday with the
help of various "mr. white" solns.
2025-03-10 12:14:40 -04:00
Tyler Goodlet e2b9c3e769 Add a `tests/test_root_infect_asyncio`
Might as well break apart the specific test set since there are some
(minor) subtleties and the orig test mod is already getting pretty big
XD

Includes both the new "independent"-event-loops test as well as the std
usage base case suite.
2025-03-10 12:04:51 -04:00
Tyler Goodlet ae18ceb633 Impl a proto "unmasker" `@acm` alongside our test
Such that the suite verifies the wip `maybe_raise_from_masking_exc()`
will raise from a `trio.Cancelled.__context__` since I can't think of
any reason a `Cancelled` should ever be raised in-place of
a non-`Cancelled` XD

Not sure what should be raised instead (or maybe just a `log.warning()`
emitted?) but this starts a draft for refinement at the least. Use the
new `@pytest.mark.parametrize` explicit tuple-of-params form with an
`pytest.param + `.mark.xfail()` for the default behaviour case.
2025-03-10 12:04:51 -04:00
Tyler Goodlet 917699417f Add a "raise-from-`finally:`" example test
Since i wasted 2 days just to find an example of this inside an `@acm`,
figured I better reproduce for the purposes of maybe implementing
a warning sys (inside our wip proto `open_taskman()`) when a nursery
detects a single `Cancelled` in an eg where the `.__context__` is set to
some non-cancel error (which likely means a cancel-causing source
exception was suppressed by accident).

Left in a buncha commented code using `maybe_open_nursery()` which
i thought might be part of the issue but didn't end up being required;
will likely remove on a follow up refinement.
2025-03-10 12:04:51 -04:00
Tyler Goodlet 71a29d0106 Yield a boxed-maybe-error from `open_crash_handler()`
Along the lines of something like `pytest.raises()` where the handled
exception can be inspected from the `pdbp` REPL using its `.value` field
B)

This is super handy in particular for understanding
`BaseException[Group]`s without manually adding surrounding handler code
to assign the `except[*] Exception as exc_var:` particularly when trying
to understand multi-cancelled eg trees.
2025-03-10 12:04:51 -04:00
Tyler Goodlet 095bf28f5d Add an inter-leaved-task error test
Trying to replicate cases where errors are raised in both `trio` and
`asyncio` tasks independently (at least in `.to_asyncio` API terms) with
a new `test_trio_prestarted_task_bubbles` that generates 3 cases inside
a `@acm` calls stack composing a `trio.Nursery` with
a `to_asyncio.open_channel_from()` call where a set of `trio` tasks are
started in a loop using `.start()` with various exc raising sequences,
- the aio task raising *before* the last `trio` task spawns.
- the aio task raising just after the last trio task spawns, but before
  it starts.
- after the last trio task `.start()` call returns control to the
  parent - but (for now) did not error.

TODO, still more cases to discover as i'm still fighting a `modden` bug
of this sort atm..

Other,
- tweak some other tests to have timeouts since some recent hangs were
  found..
- started mucking with py3.13 and thus adjustments for strict egs in
  some tests; full patchset to test suite likely coming soon!
2025-03-10 12:04:51 -04:00
Tyler Goodlet 129dff575f Hm, `asyncio.Task._fut_waiter.set_exception()`?
Since we can't use it to `Task.set_exception()` (since that task method never
seems to work.. XD) and setting the private/internal always seems to do
the desired raising in the task? I realize it's an internal `asyncio`
runtime field but i'd rather take the risk of it breaking then having to
rely on our own equivalent hack..

Also, it seems like the case where the task's associated (and internal)
future-waiter field is null, we won't run into the (same?) prior hanging
issues (maybe since there's nothing for `asyncio` internals to use to
wait XD ??) when `Task.cancel()` is used..??

Main deats,
- add and `Future.set_exception()` a new signal-exception
  `class TrioTaskExited(AsyncioCancelled):` whenever the trio-task exits
  gracefully and the asyncio-side task is still doing blocking work (of
  some sort) which *seem to* be predicated by a check that
  `._fut_waiter is not None`.
- always call `asyncio.Queue.shutdown()` for the same^ as well as
  whenever we decide to call `Task.cancel()`; in that case the shutdown
  relays correctly?

Some further refinements,
- only warn about `Task.cancel()` usage when actually used ;)
- more local scope vars setting in the exit phase of
  `translate_aio_errors()`.
- also in ^ use explicit caught-exc var names for each error-type.
2025-03-10 12:04:51 -04:00
Tyler Goodlet 9167fbb0a8 Much more limited `asyncio.Task.cancel()` use
Since it can not only cause the guest-mode run to abandon but also in
some edge cases prevent `trio`-errors from propagating (at least on
py3.12-13?) as discovered as part of supporting this mode officially
in the *root actor*.

As such try to avoid that method as much as possible instead opting to
pass the `trio`-side error via the iter-task channel ref.

Deats,
- add a `LinkedTaskChannel._trio_err: BaseException|None` which gets set
  whenver the `trio.Task` error is caught; ONLY set `AsyncioCancelled`
  when the `trio` task was for sure the cause, whether itself cancelled
  or errored.
- always check for this error when exiting the `asyncio` side (even when
  terminated via a call to `asyncio.Task.cancel()` or during any other
  `CancelledError` handling such that the `asyncio`-task can expect to
  handle `AsyncioCancelled` due to the above^^ cases.
- never `cs.cancel()` the `trio` side unless that cancel scope has not
  yet been `.cancel_called` whatsoever; it's a noop anyway.
- only raise any exc from `asyncio.Task.result()` when `chan._aio_err`
  does not already match it since the existence of the pre-existing
  `task_err` means `asyncio` prolly intends (or has already) raised and
  interrupted the task elsewhere.

Various supporting tweaks,
- don't bother maybe-init-ing `greenback` from the actor entrypoint
  since we already need to (and do) bestow the portals to each `asyncio`
  task spawned using the `run_task()`/`open_channel_from()` API; further
  the init-ing should be done already by client code that enables
  infected mode (even in the root actor).
 |_we should prolly also codify it from any
   `run_daemon(infected_aio=True, debug_mode=True)` usage we offer.
- pass all the `_<field>`s to `Linked TaskChannel` explicitly in named
  kwarg style.
- better sclang-style log reports throughout, particularly on teardowns.
- generally more/better comments and docs around (not well understood)
  edge cases.
- prep to just inline `maybe_raise_aio_side_err()` closure..
2025-03-10 12:04:51 -04:00
Tyler Goodlet b6608e1c46 Expose `debug_filter` from `open_root_actor()` also
Such that actor-runtime graceful cancel handling can be used throughout
any process tree.
2025-03-10 12:04:51 -04:00
Tyler Goodlet 33e5e2c06f Drop extra nl from boxed error fmt 2025-03-10 12:04:51 -04:00
Tyler Goodlet 52238ade28 Raise explicitly on missing `greenback` portal
When `.pause_from_sync()` is called from an `asyncio.Task` which was
never bestowed a portal we want to be mega pedantic about it; indicate
that the task was NOT spawned from our `.to_asyncio` API and likely by
some out-of-our-control code (normally using
`asyncio.ensure_future()/.create_task()`). Though `greenback` already
errors on such usage, it's not always clear why no portal exists;
explaining the situation of a 3rd-party-bg-spawned-task should avoid
dev confusion for most cases.

Impl deats,
- distinguish between an actor in infected mode versus the actual caller
  of `.pause_from_sync()` being an `asyncio.Task` with more explicit
  `asyncio_task` and `is_infected_aio` vars.
- ONLY in the case of being both an infected-mode-actor AND detecting
  that the caller is an `asyncio.Task`, check `greenback.has_portal()`
  such that when not bestowed we presume the aforementioned
  3rd-party-bg-task case above and raise a new explicit RTE with
  a detailed explanatory message.
- add some masked draft code for handling the speical case of a root
  actor `asyncio.Task` caller which could (in theory) not actually
  require gb portal use since the `Lock` can be acquired directly
  without IPC.
 |_this will likely require factoring of various pause machinery funcs
   into a `_pause_from_root_task()` to mk the impl sane XD

Other,
- expose a new `debug_filter: Callable` which can be provided by the
  caller of `_maybe_enter_pm()` to predicate whether to enter the
  debugger REPL based on the caught `BaseException|BaseExceptionGroup`;
  this is handy for customizing the meaning of "graceful cancellations"
  so as to avoid crash handling on expected egs of more then
  `trioCancelled`.
|_ make the default as it was implemented: `not is_multi_cancelled(err)`
- pass-through a new `ignore: set[BaseException]` as
  `open_crash_handler(ignore_nested=ignore)` to allow for the same
  silent-cancellation-egs-swallowing as desired from outside the actor
  runtime.
2025-03-10 12:04:51 -04:00
Tyler Goodlet f7cd8739a5 Accept err-type override in `is_multi_cancelled()`
Such that equivalents of `trio.Cancelled` from other runtimes such as
`asyncio.CancelledError` and `subprocess.CalledProcessError` (with
a `.returncode == -2`) can be gracefully ignored as needed by the
caller.

For example this is handy if you want to avoid debug-mode REPL entry on
an exception-group full of only some subset of exception types since you
expect certain tasks to raise such errors after having been cancelled by
a request from some parent supervision sys (some "higher up"
`trio.CancelScope`, a remote triggered `ContextCancelled` or just from
and OS SIGINT).

Impl deats,
- offer a new `ignore_nested: set[BaseException]` param which by
  default we add `trio.Cancelled` to when no other types are provided.
- use `ExceptionGroup.subgroup(tuple(ignore_nested)` to filter to egs of
  the "ignored sub-errors set" and return any such match (instead of
  `True`).
- detail a comment on exclusion case.
2025-03-10 12:04:51 -04:00
Tyler Goodlet 7537c6f053 Support passing pre-conf-ed `Logger`
Such that we can hook into 3rd-party-libs more easily to monkey them and
use our (prettier/hipper) console logging with something like (an
example from the client project `modden`),

```python
    connection_mod = i3ipc.connection
    tractor_style_i3ipc_logger: logging.LoggingAdapter = tractor.log.get_console_log(
        _root_name=connection_mod.__name__,
        logger=i3ipc.connection_mod.logger,
        level='info',
    )
    # monkey the instance-ref in 3rd-party module
    connection_mod.logger = our_logger
```

Impl deats,
- expose as `get_console_log(logger: logging.Logger)` and add default
  failover logic.
- toss in more typing, also for mod-global instance.
2025-03-10 12:04:51 -04:00
Tyler Goodlet 9c83f02568 Support and test infected-`asyncio`-mode for root
Such that you can use,

```python

    tractor.to_asyncio.run_as_asyncio_guest(
        trio_main=_trio_main,
    )
```

to boostrap the root actor (and thus main parent process) to embed
the actor-rumtime into an `asyncio` loop. Prove it all works with an
subactor-free version of the aio echo-server test suite B)
2025-03-10 12:04:51 -04:00
6 changed files with 203 additions and 689 deletions

View File

@ -150,6 +150,18 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize("start_method", [spawn_backend], scope='module')
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't registry addr collide!
# @pytest.fixture
# def open_test_runtime(
# reg_addr: tuple,
# ) -> AsyncContextManager:
# return partial(
# tractor.open_nursery,
# registry_addrs=[reg_addr],
# )
def sig_prog(proc, sig):
"Kill the actor-process with ``sig``."
proc.send_signal(sig)

View File

@ -218,10 +218,9 @@ def expect_any_of(
)
return expected_patts
# yield child
def test_pause_from_asyncio_task(
def test_sync_pause_from_aio_task(
spawn,
ctlc: bool
# ^TODO, fix for `asyncio`!!
@ -327,3 +326,25 @@ def test_pause_from_asyncio_task(
child.sendline('c')
child.expect(EOF)
def test_sync_pause_from_non_greenbacked_aio_task():
'''
Where the `breakpoint()` caller task is NOT spawned by
`tractor.to_asyncio` and thus never activates
a `greenback.ensure_portal()` beforehand, presumably bc the task
was started by some lib/dep as in often seen in the field.
Ensure sync pausing works when the pause is in,
- the root actor running in infected-mode?
|_ since we don't need any IPC to acquire the debug lock?
|_ is there some way to handle this like the non-main-thread case?
All other cases need to error out appropriately right?
- for any subactor we can't avoid needing the repl lock..
|_ is there a way to hook into `asyncio.ensure_future(obj)`?
'''
pass

View File

@ -47,9 +47,6 @@ from functools import partial
import inspect
from pprint import pformat
import textwrap
from types import (
UnionType,
)
from typing import (
Any,
AsyncGenerator,
@ -2547,14 +2544,7 @@ def context(
name: str
param: Type
for name, param in annots.items():
if (
param is Context
or (
isinstance(param, UnionType)
and
Context in param.__args__
)
):
if param is Context:
ctx_var_name: str = name
break
else:

View File

@ -1,26 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level design patterns, APIs and runtime extensions built on top
of the `tractor` runtime core.
'''
from ._service import (
open_service_mngr as open_service_mngr,
get_service_mngr as get_service_mngr,
ServiceMngr as ServiceMngr,
)

View File

@ -1,592 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Daemon subactor as service(s) management and supervision primitives
and API.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import tractor
import trio
from trio import TaskStatus
from tractor import (
log,
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
)
log = log.get_logger('tractor')
# TODO: implement a `@singleton` deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# -[ ] go through the options peeps on SO did?
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
# * including @mikenerone's answer
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
#
# -[ ] put it in `tractor.lowlevel._globals` ?
# * fits with our oustanding actor-local/global feat req?
# |_ https://github.com/goodboy/tractor/issues/55
# * how can it relate to the `Actor.lifetime_stack` that was
# silently patched in?
# |_ we could implicitly call both of these in the same
# spot in the runtime using the lifetime stack?
# - `open_singleton_cm().__exit__()`
# -`del_singleton()`
# |_ gives SC fixtue semantics to sync code oriented around
# sub-process lifetime?
# * what about with `trio.RunVar`?
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
# - which we'll need for no-GIL cpython (right?) presuming
# multiple `trio.run()` calls in process?
#
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# a deletion API for explicit instance de-allocation?
# @open_service_mngr.deleter
# def del_service_mngr() -> None:
# mngr = open_service_mngr._singleton[0]
# open_service_mngr._singleton[0] = None
# del mngr
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
#
# > "The default value is evaluated only once. This makes
# a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes"
#
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open an actor-global "service-manager" for supervising a tree
of subactors and/or actor-global tasks.
The delivered `ServiceMngr` is singleton instance for each
actor-process, that is, allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'an': an,
'tn': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.an = an
mngr.tn = tn
else:
assert (mngr.an and mngr.tn)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
an: ActorNursery
tn: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
# TODO, unify this interface with our `TaskManager` PR!
#
#
async def start_service_task(
self,
name: str,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
fn: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Any,
trio.Event,
]:
async def _task_manager_start(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
task_cs = trio.CancelScope()
task_complete = trio.Event()
with task_cs as cs:
task_status.started((
cs,
task_complete,
))
try:
await fn()
except trio.Cancelled as taskc:
log.cancel(
f'Service task for `{name}` was cancelled!\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
)
raise taskc
finally:
task_complete.set()
(
cs,
complete,
) = await self.tn.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (
cs,
complete,
)
return (
cs,
complete,
)
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
ctx_fn: Callable,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Context,
Any,
]:
'''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.tn.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
name=name,
ctx_fn=ctx_fn,
portal=portal,
**ctx_kwargs,
)
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return (
cs,
ipc_ctx,
started,
)
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
# ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False,
**start_actor_kwargs,
) -> Context:
'''
Start new subactor and schedule a supervising "service task"
in it which explicitly defines the sub's lifetime.
"Service daemon subactors" are cancelled (and thus
terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
'''
entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.an.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**start_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(
cs,
sub_ctx,
started,
) = await self.start_service_ctx(
name=daemon_name,
portal=portal,
ctx_fn=ctx_ep,
**ctx_kwargs,
)
return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'

View File

@ -34,7 +34,6 @@ from typing import (
import tractor
from tractor._exceptions import (
AsyncioCancelled,
is_multi_cancelled,
)
from tractor._state import (
@ -46,6 +45,11 @@ from tractor.log import (
get_logger,
StackLevelAdapter,
)
# TODO, wite the equiv of `trio.abc.Channel` but without attrs..
# -[ ] `trionics.chan_types.ChanStruct` maybe?
# from tractor.msg import (
# pretty_struct,
# )
from tractor.trionics._broadcast import (
broadcast_receiver,
BroadcastReceiver,
@ -66,7 +70,12 @@ __all__ = [
@dataclass
class LinkedTaskChannel(trio.abc.Channel):
class LinkedTaskChannel(
trio.abc.Channel,
# XXX LAME! meta-base conflict..
# pretty_struct.Struct,
):
'''
A "linked task channel" which allows for two-way synchronized msg
passing between a ``trio``-in-guest-mode task and an ``asyncio``
@ -77,12 +86,14 @@ class LinkedTaskChannel(trio.abc.Channel):
_from_aio: trio.MemoryReceiveChannel
_to_trio: trio.MemorySendChannel
_trio_cs: trio.CancelScope
_trio_task: trio.Task
_aio_task_complete: trio.Event
_trio_err: BaseException|None = None
_trio_exited: bool = False
# set after ``asyncio.create_task()``
# _aio_first: Any|None = None
_aio_task: asyncio.Task|None = None
_aio_err: BaseException|None = None
_broadcaster: BroadcastReceiver|None = None
@ -90,6 +101,25 @@ class LinkedTaskChannel(trio.abc.Channel):
async def aclose(self) -> None:
await self._from_aio.aclose()
def started(
self,
val: Any = None,
) -> None:
self._aio_started_val = val
return self._to_trio.send_nowait(val)
# TODO, mk this side-agnostic?
#
# -[ ] add private meths for both sides and dynamically
# determine which to use based on task-type read at calltime?
# -[ ] `._recv_trio()`: receive to trio<-asyncio
# -[ ] `._send_trio()`: send from trio->asyncio
# -[ ] `._recv_aio()`: send from asyncio->trio
# -[ ] `._send_aio()`: receive to asyncio<-trio
#
# -[ ] pass the instance to the aio side instead of the separate
# per-side chan types?
#
async def receive(self) -> Any:
'''
Receive a value from the paired `asyncio.Task` with
@ -115,7 +145,16 @@ class LinkedTaskChannel(trio.abc.Channel):
):
raise err
async def wait_asyncio_complete(self) -> None:
async def send(self, item: Any) -> None:
'''
Send a value through to the asyncio task presuming
it defines a ``from_trio`` argument, if it does not
this method will raise an error.
'''
self._to_aio.put_nowait(item)
async def wait_aio_complete(self) -> None:
await self._aio_task_complete.wait()
def cancel_asyncio_task(
@ -126,15 +165,6 @@ class LinkedTaskChannel(trio.abc.Channel):
msg=msg,
)
async def send(self, item: Any) -> None:
'''
Send a value through to the asyncio task presuming
it defines a ``from_trio`` argument, if it does not
this method will raise an error.
'''
self._to_aio.put_nowait(item)
def closed(self) -> bool:
return self._from_aio._closed # type: ignore
@ -218,7 +248,8 @@ def _run_asyncio_task(
coro = func(**kwargs)
cancel_scope = trio.CancelScope()
trio_task: trio.Task = trio.lowlevel.current_task()
trio_cs = trio.CancelScope()
aio_task_complete = trio.Event()
aio_err: BaseException|None = None
@ -226,7 +257,8 @@ def _run_asyncio_task(
_to_aio=aio_q, # asyncio.Queue
_from_aio=from_aio, # recv chan
_to_trio=to_trio, # send chan
_trio_cs=cancel_scope,
_trio_cs=trio_cs,
_trio_task=trio_task,
_aio_task_complete=aio_task_complete,
)
@ -274,6 +306,9 @@ def _run_asyncio_task(
to_trio.send_nowait(result)
finally:
# breakpoint()
# import pdbp; pdbp.set_trace()
# if the task was spawned using `open_channel_from()`
# then we close the channels on exit.
if provide_channels:
@ -281,7 +316,6 @@ def _run_asyncio_task(
# a ``trio.EndOfChannel`` to the trio (consumer) side.
to_trio.close()
# import pdbp; pdbp.set_trace()
aio_task_complete.set()
# await asyncio.sleep(0.1)
log.info(
@ -325,14 +359,17 @@ def _run_asyncio_task(
)
greenback.bestow_portal(task)
def cancel_trio(task: asyncio.Task) -> None:
def cancel_trio(
task: asyncio.Task,
) -> None:
'''
Cancel the calling `trio` task on error.
Cancel the parent `trio` task on any error raised by the
`asyncio` side.
'''
nonlocal chan
aio_err: BaseException|None = chan._aio_err
task_err: BaseException|None = None
relayed_aio_err: BaseException|None = chan._aio_err
aio_err: BaseException|None = None
# only to avoid `asyncio` complaining about uncaptured
# task exceptions
@ -343,20 +380,20 @@ def _run_asyncio_task(
f'|_{res}\n'
)
except BaseException as _aio_err:
task_err: BaseException = _aio_err
aio_err: BaseException = _aio_err
# read again AFTER the `asyncio` side errors in case
# it was cancelled due to an error from `trio` (or
# some other out of band exc).
aio_err: BaseException|None = chan._aio_err
# some other out of band exc) and then set to something
# else?
relayed_aio_err: BaseException|None = chan._aio_err
# always true right?
assert (
type(_aio_err) is type(aio_err)
type(_aio_err) is type(relayed_aio_err)
), (
f'`asyncio`-side task errors mismatch?!?\n\n'
f'caught: {_aio_err}\n'
f'chan._aio_err: {aio_err}\n'
f'(caught) aio_err: {aio_err}\n'
f'chan._aio_err: {relayed_aio_err}\n'
)
msg: str = (
@ -381,12 +418,13 @@ def _run_asyncio_task(
msg.format(etype_str='errored')
)
if aio_err is not None:
trio_err: BaseException|None = chan._trio_err
if (
relayed_aio_err
or
trio_err
):
# import pdbp; pdbp.set_trace()
# XXX: uhh is this true?
# assert task_err, f'Asyncio task {task.get_name()} discrepancy!?'
# NOTE: currently mem chan closure may act as a form
# of error relay (at least in the `asyncio.CancelledError`
# case) since we have no way to directly trigger a `trio`
@ -394,8 +432,6 @@ def _run_asyncio_task(
# We might want to change this in the future though.
from_aio.close()
if task_err is None:
assert aio_err
# wait, wut?
# aio_err.with_traceback(aio_err.__traceback__)
@ -404,7 +440,7 @@ def _run_asyncio_task(
# elif (
# type(aio_err) is CancelledError
# and # trio was the cause?
# cancel_scope.cancel_called
# trio_cs.cancel_called
# ):
# log.cancel(
# 'infected task was cancelled by `trio`-side'
@ -415,26 +451,83 @@ def _run_asyncio_task(
# error in case the trio task is blocking on
# a checkpoint.
if (
not cancel_scope.cancelled_caught
not trio_cs.cancelled_caught
or
not cancel_scope.cancel_called
not trio_cs.cancel_called
):
# import pdbp; pdbp.set_trace()
cancel_scope.cancel()
trio_cs.cancel()
if task_err:
# maybe the `trio` task errored independent from the
# `asyncio` one and likely in between
# a guest-run-sched-tick.
#
# The obvious ex. is where one side errors during
# the current tick and then the other side immediately
# errors before its next checkpoint; i.e. the 2 errors
# are "independent".
#
# "Independent" here means in the sense that neither task
# was the explicit cause of the other side's exception
# according to our `tractor.to_asyncio` SC API's error
# relaying mechanism(s); the error pair is *possibly
# due-to* but **not necessarily** inter-related by some
# (subsys) state between the tasks,
#
# NOTE, also see the `test_trio_prestarted_task_bubbles`
# for reproducing detailed edge cases as per the above
# cases.
#
if (
not trio_cs.cancelled_caught
and
(trio_err := chan._trio_err)
and
type(trio_err) not in {
trio.Cancelled,
}
and (
aio_err
and
type(aio_err) not in {
asyncio.CancelledError
}
)
):
eg = ExceptionGroup(
'Both the `trio` and `asyncio` tasks errored independently!!\n',
(trio_err, aio_err),
)
chan._trio_err = eg
chan._aio_err = eg
raise eg
elif aio_err:
# XXX raise any `asyncio` side error IFF it doesn't
# match the one we just caught from the task above!
# (that would indicate something weird/very-wrong
# going on?)
if aio_err is not task_err:
# import pdbp; pdbp.set_trace()
raise aio_err from task_err
if aio_err is not relayed_aio_err:
raise aio_err from relayed_aio_err
raise aio_err
task.add_done_callback(cancel_trio)
return chan
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
NOTE: this should NOT inherit from `asyncio.CancelledError` or
tests should break!
'''
class TrioTaskExited(AsyncioCancelled):
'''
The `trio`-side task exited without explicitly cancelling the
@ -483,13 +576,12 @@ async def translate_aio_errors(
# import pdbp; pdbp.set_trace() # lolevel-debug
# relay cancel through to called ``asyncio`` task
# relay cancel through to called `asyncio` task
chan._aio_err = AsyncioCancelled(
f'trio`-side cancelled the `asyncio`-side,\n'
f'c)>\n'
f' |_{trio_task}\n\n'
f'{trio_err!r}\n'
)
@ -546,6 +638,7 @@ async def translate_aio_errors(
raise
except BaseException as _trio_err:
# await tractor.pause(shield=True)
trio_err = _trio_err
log.exception(
'`trio`-side task errored?'
@ -619,11 +712,17 @@ async def translate_aio_errors(
# pump the other side's task? needed?
await trio.lowlevel.checkpoint()
# from tractor._state import is_root_process
# if is_root_process():
# breakpoint()
if (
not chan._trio_err
and
(fut := aio_task._fut_waiter)
):
# await trio.lowlevel.checkpoint()
# import pdbp; pdbp.set_trace()
fut.set_exception(
TrioTaskExited(
f'The peer `asyncio` task is still blocking/running?\n'
@ -632,11 +731,6 @@ async def translate_aio_errors(
)
)
else:
# from tractor._state import is_root_process
# if is_root_process():
# breakpoint()
# import pdbp; pdbp.set_trace()
aio_taskc_warn: str = (
f'\n'
f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n'
@ -663,7 +757,7 @@ async def translate_aio_errors(
# it erroed out up there!
#
if wait_on_aio_task:
# await chan.wait_asyncio_complete()
# await chan.wait_aio_complete()
await chan._aio_task_complete.wait()
log.info(
'asyncio-task is done and unblocked trio-side!\n'
@ -771,11 +865,22 @@ async def open_channel_from(
# sync to a "started()"-like first delivered value from the
# ``asyncio`` task.
try:
with chan._trio_cs:
with (cs := chan._trio_cs):
first = await chan.receive()
# deliver stream handle upward
yield first, chan
except trio.Cancelled as taskc:
# await tractor.pause(shield=True) # ya it worx ;)
if cs.cancel_called:
log.cancel(
f'trio-side was manually cancelled by aio side\n'
f'|_c>}}{cs!r}?\n'
)
# TODO, maybe a special `TrioCancelled`???
raise taskc
finally:
chan._trio_exited = True
chan._to_trio.close()
@ -893,12 +998,12 @@ def run_as_asyncio_guest(
_sigint_loop_pump_delay: float = 0,
) -> None:
# ^-TODO-^ technically whatever `trio_main` returns.. we should
# try to use func-typevar-params at leaast by 3.13!
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#callback-protocols
# -[ ] https://peps.python.org/pep-0646/#using-type-variable-tuples-in-functions
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#unpack-for-keyword-arguments
# -[ ] https://peps.python.org/pep-0718/
# ^-TODO-^ technically whatever `trio_main` returns.. we should
# try to use func-typevar-params at leaast by 3.13!
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#callback-protocols
# -[ ] https://peps.python.org/pep-0646/#using-type-variable-tuples-in-functions
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#unpack-for-keyword-arguments
# -[ ] https://peps.python.org/pep-0718/
'''
Entry for an "infected ``asyncio`` actor".
@ -957,15 +1062,15 @@ def run_as_asyncio_guest(
# force_reload=True,
# )
def trio_done_callback(main_outcome):
def trio_done_callback(main_outcome: Outcome):
log.runtime(
f'`trio` guest-run finishing with outcome\n'
f'>) {main_outcome}\n'
f'|_{trio_done_fute}\n'
)
# import pdbp; pdbp.set_trace()
if isinstance(main_outcome, Error):
# import pdbp; pdbp.set_trace()
error: BaseException = main_outcome.error
# show an dedicated `asyncio`-side tb from the error
@ -1165,6 +1270,10 @@ def run_as_asyncio_guest(
)
raise AsyncioRuntimeTranslationError(message) from state_err
# XXX, should never get here ;)
# else:
# import pdbp; pdbp.set_trace()
# might as well if it's installed.
try:
import uvloop