Compare commits

..

43 Commits

Author SHA1 Message Date
Tyler Goodlet e271ebcb87 More prep-to-reduce the `Actor` method-iface
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
  primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 6cac1fe22b Add `.ipc._shm` todo-idea for `@actor_fixture` API 2025-07-14 13:15:40 -04:00
Tyler Goodlet 42056f4f53 Update buncha log msg fmting in `.msg._ops`
Mostly just multi-line code styling again: always putting standalone
`'f\n'` on separate LOC so it reads like it renders to console. Oh and
and a level drop to `.runtime()` for rx-msg reports.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 5790ee9254 Couple more `._root` logging tweaks.. 2025-07-14 13:15:40 -04:00
Tyler Goodlet 57e25411ee Update buncha log msg fmting in `._spawn`
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and
 converting to multi-line code style an ' for str-report-contents. Tweak
 some imports to sub-mod level as well.
2025-07-14 13:15:40 -04:00
Tyler Goodlet ee9fa2e91d Update buncha log msg fmting in `._portal`
Namely to use `Channel.aid.reprol()` and converting to our newer style
multi-line code style for str-reports.
2025-07-14 13:15:40 -04:00
Tyler Goodlet cb8cb67680 Use `._supervise._shutdown_msg` in tooling test 2025-07-14 13:15:40 -04:00
Tyler Goodlet 51e95ebc3e Use `nest_from_op()`/`pretty_struct` in `._rpc`
Again for nicer console logging. Also fix a double `req_chan` arg bug
when passed to `_invoke` in the `self.cancel()` rt-ep; don't update the
`kwargs: dict` just merge in `req_chan` input at call time.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 517ae67d0e Use `nest_from_op()` in actor-nursery shutdown
Including a new one-line `_shutdown_msg: str` which we mod-var-set for
testing usage and some denoising at `.info()` level. Adjust `Actor()`
instantiating input to the new `.registry_addrs` wrapped addrs property.
2025-07-14 13:15:40 -04:00
Tyler Goodlet acb09ab29c Use `Address` where possible in (root) actor boot
Namely inside various bootup-sequences in `._root` and `._runtime`
particularly in the root actor to support both better tpt-address
denoting in our logging and as part of clarifying logic around setting
the root's registry addresses which is soon to be much better factored
out of the core and into an explicit subsystem + API.

Some `_root.open_root_actor()` deats,
- set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be
  more explicit about wrapped addr types thoughout.
- instead ensure `registry_addrs` are the wrapped types and pass down
  into the root `Actor` singleton-instance.
- factor the root-actor check + rt-vars update (updating the `'_root_addrs'`)
  out of `._runtime.async_main()` into this fn.
- as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will
  be passed down both through rt-vars as `'_root_addrs'` and to
  `._runtim.async_main()` as `accept_addrs` (which is then passed to the
  IPC server).
- adjust/simplify much logging.
- shield the `await actor.cancel(None)  # self cancel` to avoid any
  finally-footguns.
- as mentioned convert the

For `_runtime.async_main()` tweaks,
- expect `registry_addrs: list[Address]|None = None` with appropriate
  unwrapping prior to setting both `.reg_addrs` and the equiv rt-var.
- add a new `.registry_addrs` prop for the wrapped form.
- convert a final loose-eg for the `service_nursery` to use
  `collapse_eg()`.
- simplify teardown report logging.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 5ffab73ced Add #TODO for `._context` to use `.msg.Aid` 2025-07-14 13:15:40 -04:00
Tyler Goodlet 473de28b67 Add todo for py3.13+ `.shared_memory`'s new `track=False` support.. finally they added it XD 2025-07-14 13:15:40 -04:00
Tyler Goodlet f3a5986db5 Even more `.ipc.*` repr refinements
Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.

Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
  `._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
  leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
  be the 0-port.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 21d64b91e3 More `.ipc.Channel`-repr related tweaks
- only generate a repr in `.from_addr()` when log level is >= 'runtime'.
 |_ add a todo about supporting this optimization more generally on our
   adapter.
- fix `Channel.pformat()` to show unknown peer field line fmt correctly.
- add a `Channel.maddr: str` which just delegates directly to the
  `._transport` like other pass-thru property fields.
2025-07-14 13:15:40 -04:00
Tyler Goodlet fbc4208439 Mk `Aid` hashable, use pretty-`.__repr__()`
Hash on the `.uuid: str` and delegate verbatim to
`msg.pretty_struct.Struct`'s equiv method.
2025-07-14 13:15:40 -04:00
Tyler Goodlet e6ff4561a7 .trionics: link in `finally`-footgun `trio` GH ish 2025-07-14 13:15:40 -04:00
Tyler Goodlet db7fd44751 .log: expose `at_least_level()` as `StackLevelAdapter` meth 2025-07-14 13:15:40 -04:00
Tyler Goodlet 16fed20856 Drop `actor_info: str` from `._entry` logs 2025-07-14 13:15:40 -04:00
Tyler Goodlet 4723809a32 Try `nest_from_op()` in some `._rpc` spots
To start trying out,
- using in the `Start`-msg handler-block to repr the msg coming
  *from* a `repr(Channel)` using '<=)` sclang op.
- for a completed RPC task in `_invoke_non_context()`.
- for the msg loop task's termination report.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 75cb17371d Hide more `Channel._transport` privates for repr
Such as the `MsgTransport.stream` and `.drain` attrs since they're
rarely that important at the chan level. Also start adopting
a `.<attr>=` style for actual attrs of the type versus a `<name>:
` style for meta-field info lines.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 7fc98d15b1 Refine `Actor` status iface, use `Aid` throughout
To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.

Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.

New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
  it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
  completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
  which likely will just be dropped since we already have
  `._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.

In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
  internals as well) use the `Aid.uid` remap property when assigning
  `Context._canceller`.
- adjust all log msg refs to match obvi.
2025-07-14 13:15:40 -04:00
Tyler Goodlet acb1f905dc Add flag to toggle private vars in `Channel.pformat()`
Call it `privates: bool` and only show certain internal instance vars
when set in the `repr()` output.
2025-07-14 13:15:40 -04:00
Tyler Goodlet acc7e26f17 Extend `.msg.types.Aid` method interface
Providing the legacy `.uid -> tuple` style id (since still used for the
`Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str`
for rendering a compact unique actor ID summary (useful in
logging/.pformat()s at the least).
2025-07-14 13:15:40 -04:00
Tyler Goodlet a9f3d8d9d5 Update `_runtime/_root` call-sigs to `nest_from_op()`
To match the new signature now already cherry-picked upstream.

The original commit was split after moving the original fn-params
changes into the upstream branch: `sclang_formatting`.

The orig commit was,

> ea53d03 Bah! just refine `devx.pformat.nest_from_op()`now

but was re-written to only contain the diff to calling code.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 53ae1ba271 Enforce named-args only to `.open_nursery()` 2025-07-14 13:15:40 -04:00
Tyler Goodlet d54d4e605f Hide `._rpc._errors_relayed_via_ipc()` frame by def 2025-07-14 13:15:40 -04:00
Tyler Goodlet 953abc6b7b Facepalm, fix `raise from` in `collapse_eg()`
I dunno what exactly I was thinking but we definitely don't want to
**ever** raise from the original exc-group, instead always raise from
any original `.__cause__` to be consistent with the embedded src-error's
context.

Also, adjust `maybe_collapse_eg()` to return `False` in the non-single
`.exceptions` case, again don't know what I was trying to do but this
simplifies caller logic and the prior return-semantic had no real
value..

This fixes some final usage in the runtime (namely top level nursery
usage in `._root`/`._runtime`) which was previously causing test suite
failures prior to this fix.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 69965224f2 Just import `._runtime` ns in `._root`; be a bit more explicit 2025-07-14 13:15:33 -04:00
Tyler Goodlet 5ec20ffe68 Use collapse in `._root.open_root_actor()` too
Seems to add one more cancellation suite failure as well as now cause
the discovery test to error instead of fail?
2025-07-14 13:15:33 -04:00
Tyler Goodlet 6dc5f4c914 Use collapser around root tn in `.async_main()`
Seems to cause the following test suites to fail however..

- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'

Also tweak some ctxc request logging content.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 5f6240939f Drop msging-err patt from `subactor_breakpoint` ex
Since the `bdb` module was added to the namespace lookup set in
`._exceptions.get_err_type()` we can now relay a RAE-boxed
`bdb.BdbQuit`.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 135e9b40b1 Switch to strict-eg nurseries almost everywhere
That is just throughout the core library, not the tests yet. Again, we
simply change over to using our (nearly equivalent?)
`.trionics.collapse_eg()` in place of the already deprecated
`strict_exception_groups=False` flag in the following internals,
- the conc-fan-out tn use in `._discovery.find_actor()`.
- `._portal.open_portal()`'s internal tn used to spawn a bg rpc-msg-loop
  task.
- the daemon and "run-in-actor" layered tn pair allocated in
  `._supervise._open_and_supervise_one_cancels_all_nursery()`.

The remaining loose-eg usage in `._root` and `._runtime` seem to be
necessary to keep the test suite green?? For the moment these are left
out.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 0388eead6a Use collapser in rent side of `Context` 2025-07-14 13:15:33 -04:00
Tyler Goodlet 006ed72aea Flip to `collapse_eg()` use in `.trionics.gather_contexts()` 2025-07-14 13:15:33 -04:00
Tyler Goodlet 88b55d868f Always `Cancelled`-unmask ctx endpoint excs
To resolve the recently added and failing
`test_remote_exc_relay::test_unmasked_remote_exc`: never allow
`trio.Cancelled` to mask an underlying user-code exception, ever.

Our first real-world (runtime internal) use case for the new
`.trionics.maybe_raise_from_masking_exc()` such that the failing
test now passes with an properly relayed remote RTE unmasking B)

Details,
- flip the `Context._scope_nursery` to the default strict-eg behaviour
  and instead stack its outer scope with a `.trionics.collapse_eg()`.
- wrap the inner-most scope (after `msgops.maybe_limit_plds()`) with
  a `maybe_raise_from_masking_exc()` to ensure user-code errors are
  never masked by `trio.Cancelled`s.

Some err-reporting refinement,
- always capture any `scope_err` from the entire block for debug
  purposes; report it in the `finally` block's log.
- always capture any suppressed `maybe_re`, output from
  `ctx.maybe_raise()`, and `log.cancel()` report it.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 8b1094a8d3 Adjust ep-masking-suite for the real-use-case
Namely that the more common-and-pertinent case is when
a `@context`-ep-fn contains the `finally`-footgun but without
a surrounding embedded `tn` (which currently still requires its own
scope embedded `trionics.maybe_raise_from_masking_exc()`) which can't
be compensated-for by `._rpc._invoke()` easily. Instead the test is
composed where the `._invoke()`-internal `tn` is the machinery being
addressed in terms of masking user-code excs with `trio.Cancelled`.

Deats,
- rename the test -> `test_unmasked_remote_exc` to reflect what the
  runtime should actually be addressing/solving.
- drop the embedded `tn` from `sleep_n_chkpt_in_finally()` (for now)
  since that case can't currently easily be addressed without the user
  code using its own `trionics.maybe_raise_from_masking_exc()` inside
  the nursery scope.
- as such drop all `tn` related params/logic/usage from the ep.
- add in a `Cancelled` handler block which checks for RTE masking and
  always prints the occurrence loudly.

Follow up,
- obvi this suite will currently fail until the appropriate adjustment
  is made to `._rpc._invoke()` to do the unmasking; coming next.
- we probably still need a case with an embedded user `tn` where if
  the default strict-eg mode is used then a ctxc from the parent might
  cause a non-graceful `Context.cancel()` outcome?
 |_since the embedded user-`tn` will raise
   `ExceptionGroup[trio.Cancelled]` upward despite the parent nursery's
   scope being the canceller, or will a `collapse_eg()` inside the
   `._invoke()` scope handle this as well?
2025-07-14 13:15:33 -04:00
Tyler Goodlet eddbedb97d Extend `._taskc.maybe_raise_from_masking_exc()`
To handle captured non-egs (when the now optional `tn` isn't provided)
as well as yield up a `BoxedMaybeException` which contains any detected
and un-masked `exc_ctx` as its `.value`.

Also add some additional tooling,
- a `raise_unmasked: bool` toggle for when the caller just wants to
  report the masked exc and not raise-it-in-place of the masker.
- `extra_note: str` which by default is tuned to the default
  `unmask_from = (trio.Cancelled,)` but which can be used to deliver
  custom exception msg content.
- `always_warn_on: tuple[BaseException]` which will always emit
  a warning log of what would have been the raised-in-place-of
  `ctx_exc`'s msg for special cases where you want to report
  a masking case that might not be otherwise noticed by the runtime
  (cough like a `Cancelled` masking another `Cancelled) but which
  you'd still like to warn the caller about.
- factor out the masked-`ext_ctx` predicate logic into
  a `find_masked_excs()` and also use it for non-eg cases.

Still maybe todo?
- rewrapping multiple masked sub-excs in an eg back into an eg? left in
  #TODOs and a pause-point where applicable.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 9a587c6edb Mv `maybe_raise_from_masking_exc()` to `.trionics`
Factor the `@acm`-closure it out of the
`test_trioisms::test_acm_embedded_nursery_propagates_enter_err` suite
for real use internally.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 537b77d74e Add ctx-ep suite for `trio`'s *finally-footgun*
Deats are documented within, but basically a subtlety we already track
with `trio`'s masking of excs by a checkpoint-in-`finally` can cause
compounded issues with our `@context` endpoints, mostly in terms of
remote error and cancel-ack relay semantics.
2025-07-14 13:15:33 -04:00
Tyler Goodlet faec496686 Add some tooling params to `collapse_eg()` 2025-07-14 13:15:33 -04:00
Tyler Goodlet d262926773 Move `.is_multi_cancelled()` to `.trioniics._beg`
Since it's for beg filtering, the current impl should be renamed anyway;
it's not just for filtering cancelled excs.

Deats,
- added a real doc string, links to official eg docs and fixed the
  return typing.
- adjust all internal imports to match.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 93802486bb Drop stale comment from inter-peer suite 2025-07-14 13:15:27 -04:00
Tyler Goodlet 41877c476e Use `nest_from_op()` in some runtime logs for actor-state-repring 2025-07-14 13:15:27 -04:00
39 changed files with 493 additions and 1752 deletions

View File

@ -16,7 +16,6 @@ from tractor import (
ContextCancelled, ContextCancelled,
MsgStream, MsgStream,
_testing, _testing,
trionics,
) )
import trio import trio
import pytest import pytest
@ -63,8 +62,9 @@ async def recv_and_spawn_net_killers(
await ctx.started() await ctx.started()
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
trionics.collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, strict_exception_groups=False,
) as tn,
): ):
async for i in stream: async for i in stream:
print(f'child echoing {i}') print(f'child echoing {i}')

View File

@ -1,35 +0,0 @@
import trio
import tractor
async def main():
async with tractor.open_root_actor(
debug_mode=True,
loglevel='cancel',
) as _root:
# manually trigger self-cancellation and wait
# for it to fully trigger.
_root.cancel_soon()
await _root._cancel_complete.wait()
print('root cancelled')
# now ensure we can still use the REPL
try:
await tractor.pause()
except trio.Cancelled as _taskc:
assert (root_cs := _root._root_tn.cancel_scope).cancel_called
# NOTE^^ above logic but inside `open_root_actor()` and
# passed to the `shield=` expression is effectively what
# we're testing here!
await tractor.pause(shield=root_cs.cancel_called)
# XXX, if shield logic *is wrong* inside `open_root_actor()`'s
# crash-handler block this should never be interacted,
# instead `trio.Cancelled` would be bubbled up: the original
# BUG.
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -23,8 +23,9 @@ async def main():
modules=[__name__] modules=[__name__]
) as portal_map, ) as portal_map,
tractor.trionics.collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, strict_exception_groups=False,
) as tn,
): ):
for (name, portal) in portal_map.items(): for (name, portal) in portal_map.items():

View File

@ -1,13 +1,13 @@
""" """
That "native" debug mode better work! That "native" debug mode better work!
All these tests can be understood (somewhat) by running the All these tests can be understood (somewhat) by running the equivalent
equivalent `examples/debugging/` scripts manually. `examples/debugging/` scripts manually.
TODO: TODO:
- none of these tests have been run successfully on windows yet but - none of these tests have been run successfully on windows yet but
there's been manual testing that verified it works. there's been manual testing that verified it works.
- wonder if any of it'll work on OS X? - wonder if any of it'll work on OS X?
""" """
from __future__ import annotations from __future__ import annotations
@ -925,7 +925,6 @@ def test_post_mortem_api(
"<Task 'name_error'", "<Task 'name_error'",
"NameError", "NameError",
"('child'", "('child'",
'getattr(doggypants)', # exc-LoC
] ]
) )
if ctlc: if ctlc:
@ -942,8 +941,8 @@ def test_post_mortem_api(
"<Task '__main__.main'", "<Task '__main__.main'",
"('root'", "('root'",
"NameError", "NameError",
"tractor.post_mortem()",
"src_uid=('child'", "src_uid=('child'",
"tractor.post_mortem()", # in `main()`-LoC
] ]
) )
if ctlc: if ctlc:
@ -961,10 +960,6 @@ def test_post_mortem_api(
"('root'", "('root'",
"NameError", "NameError",
"src_uid=('child'", "src_uid=('child'",
# raising line in `main()` but from crash-handling
# in `tractor.open_nursery()`.
'async with p.open_context(name_error) as (ctx, first):',
] ]
) )
if ctlc: if ctlc:
@ -1156,54 +1151,6 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
) )
def test_crash_handling_within_cancelled_root_actor(
spawn: PexpectSpawner,
):
'''
Ensure that when only a root-actor is started via `open_root_actor()`
we can crash-handle in debug-mode despite self-cancellation.
More-or-less ensures we conditionally shield the pause in
`._root.open_root_actor()`'s `await debug._maybe_enter_pm()`
call.
'''
child = spawn('root_self_cancelled_w_error')
child.expect(PROMPT)
assert_before(
child,
[
"Actor.cancel_soon()` was called!",
"root cancelled",
_pause_msg,
"('root'", # actor name
]
)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"('root'", # actor name
"AssertionError",
"assert 0",
]
)
child.sendline('c')
child.expect(EOF)
assert_before(
child,
[
"AssertionError",
"assert 0",
]
)
# TODO: better error for "non-ideal" usage from the root actor. # TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests # -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead # using `await tractor.pause()` instead since it's less overhead

View File

@ -14,9 +14,6 @@ TODO:
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
import os import os
import signal import signal
import time import time
@ -31,8 +28,6 @@ from .conftest import (
PROMPT, PROMPT,
_pause_msg, _pause_msg,
) )
import pytest
from pexpect.exceptions import ( from pexpect.exceptions import (
# TIMEOUT, # TIMEOUT,
EOF, EOF,
@ -190,117 +185,3 @@ def test_breakpoint_hook_restored(
) )
child.sendline('c') child.sendline('c')
child.expect(EOF) child.expect(EOF)
_to_raise = Exception('Triggering a crash')
@pytest.mark.parametrize(
'to_raise',
[
None,
_to_raise,
RuntimeError('Never crash handle this!'),
],
)
@pytest.mark.parametrize(
'raise_on_exit',
[
True,
[type(_to_raise)],
False,
]
)
def test_crash_handler_cms(
debug_mode: bool,
to_raise: Exception,
raise_on_exit: bool|list[Exception],
):
'''
Verify the `.devx.open_crash_handler()` API(s) by also
(conveniently enough) tesing its `repl_fixture: ContextManager`
param support which for this suite allows use to avoid use of
a `pexpect`-style-test since we use the fixture to avoid actually
entering `PdbpREPL.iteract()` :smirk:
'''
import tractor
# import trio
# state flags
repl_acquired: bool = False
repl_released: bool = False
@cm
def block_repl_ux(
repl: tractor.devx.debug.PdbREPL,
maybe_bxerr: (
tractor.devx._debug.BoxedMaybeException
|None
) = None,
enter_repl: bool = True,
) -> bool:
'''
Set pre/post-REPL state vars and bypass actual conole
interaction.
'''
nonlocal repl_acquired, repl_released
# task: trio.Task = trio.lowlevel.current_task()
# print(f'pre-REPL active_task={task.name}')
print('pre-REPL')
repl_acquired = True
yield False # never actually .interact()
print('post-REPL')
repl_released = True
try:
# TODO, with runtime's `debug_mode` setting
# -[ ] need to open runtime tho obvi..
#
# with tractor.devx.maybe_open_crash_handler(
# pdb=True,
with tractor.devx.open_crash_handler(
raise_on_exit=raise_on_exit,
repl_fixture=block_repl_ux
) as bxerr:
if to_raise is not None:
raise to_raise
except Exception as _exc:
exc = _exc
if (
raise_on_exit is True
or
type(to_raise) in raise_on_exit
):
assert (
exc
is
to_raise
is
bxerr.value
)
else:
raise
else:
assert (
to_raise is None
or
not raise_on_exit
or
type(to_raise) not in raise_on_exit
)
assert bxerr.value is to_raise
assert bxerr.raise_on_exit == raise_on_exit
if to_raise is not None:
assert repl_acquired
assert repl_released

View File

@ -1,114 +0,0 @@
'''
Unit-ish tests for specific IPC transport protocol backends.
'''
from __future__ import annotations
from pathlib import Path
import pytest
import trio
import tractor
from tractor import (
Actor,
_state,
_addr,
)
@pytest.fixture
def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir()
bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir()
yield bs_dir_str
# delete it on suite teardown.
# ?TODO? should we support this internally
# or is leaking it ok?
if bs_dir.is_dir():
bs_dir.rmdir()
def test_uds_bindspace_created_implicitly(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
bs_dir_str: str = registry_addr[0]
# XXX, ensure bindspace-dir DNE beforehand!
assert not Path(bs_dir_str).is_dir()
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# XXX MUST be created implicitly by
# `.ipc._uds.start_listener()`!
assert Path(bs_dir_str).is_dir()
root: Actor = tractor.current_actor()
assert root.is_registrar
assert registry_addr in root.reg_addrs
assert (
registry_addr
in
_state._runtime_vars['_registry_addrs']
)
assert (
_addr.wrap_address(registry_addr)
in
root.registry_addrs
)
trio.run(main)
def test_uds_double_listen_raises_connerr(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# runtime up
root: Actor = tractor.current_actor()
from tractor.ipc._uds import (
start_listener,
UDSAddress,
)
ya_bound_addr: UDSAddress = root.registry_addrs[0]
try:
await start_listener(
addr=ya_bound_addr,
)
except ConnectionError as connerr:
assert type(src_exc := connerr.__context__) is OSError
assert 'Address already in use' in src_exc.args
# complete, exit test.
else:
pytest.fail('It dint raise a connerr !?')
trio.run(main)

View File

@ -313,8 +313,9 @@ async def inf_streamer(
# `trio.EndOfChannel` doesn't propagate directly to the above # `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead # .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle? # of gracefully absorbing as normal.. so how to handle?
tractor.trionics.collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, strict_exception_groups=False,
) as tn,
): ):
async def close_stream_on_sentinel(): async def close_stream_on_sentinel():
async for msg in stream: async for msg in stream:

View File

@ -236,10 +236,7 @@ async def stream_forever():
async def test_cancel_infinite_streamer(start_method): async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds # stream for at most 1 seconds
with ( with trio.move_on_after(1) as cancel_scope:
trio.fail_after(4),
trio.move_on_after(1) as cancel_scope
):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
'donny', 'donny',
@ -287,32 +284,20 @@ async def test_cancel_infinite_streamer(start_method):
], ],
) )
@tractor_test @tractor_test
async def test_some_cancels_all( async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
num_actors_and_errs: tuple, """Verify a subset of failed subactors causes all others in
start_method: str,
loglevel: str,
):
'''
Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio. the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment. This is the first and only supervisory strategy at the moment.
"""
''' num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
(
num_actors,
first_err,
err_type,
ria_func,
da_func,
) = num_actors_and_errs
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as n:
# spawn the same number of deamon actors which should be cancelled # spawn the same number of deamon actors which should be cancelled
dactor_portals = [] dactor_portals = []
for i in range(num_actors): for i in range(num_actors):
dactor_portals.append(await an.start_actor( dactor_portals.append(await n.start_actor(
f'deamon_{i}', f'deamon_{i}',
enable_modules=[__name__], enable_modules=[__name__],
)) ))
@ -322,7 +307,7 @@ async def test_some_cancels_all(
for i in range(num_actors): for i in range(num_actors):
# start actor(s) that will fail immediately # start actor(s) that will fail immediately
riactor_portals.append( riactor_portals.append(
await an.run_in_actor( await n.run_in_actor(
func, func,
name=f'actor_{i}', name=f'actor_{i}',
**kwargs **kwargs
@ -352,8 +337,7 @@ async def test_some_cancels_all(
# should error here with a ``RemoteActorError`` or ``MultiError`` # should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as _err: except first_err as err:
err = _err
if isinstance(err, BaseExceptionGroup): if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors assert len(err.exceptions) == num_actors
for exc in err.exceptions: for exc in err.exceptions:
@ -364,8 +348,8 @@ async def test_some_cancels_all(
elif isinstance(err, tractor.RemoteActorError): elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type assert err.boxed_type == err_type
assert an.cancelled is True assert n.cancelled is True
assert not an._children assert not n._children
else: else:
pytest.fail("Should have gotten a remote assertion error?") pytest.fail("Should have gotten a remote assertion error?")
@ -535,15 +519,10 @@ def test_cancel_via_SIGINT_other_task(
async def main(): async def main():
# should never timeout since SIGINT should cancel the current program # should never timeout since SIGINT should cancel the current program
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with ( async with trio.open_nursery(
strict_exception_groups=False,
# XXX ?TODO? why no work!? ) as n:
# tractor.trionics.collapse_eg(), await n.start(spawn_and_sleep_forever)
trio.open_nursery(
strict_exception_groups=False,
) as tn,
):
await tn.start(spawn_and_sleep_forever)
if 'mp' in spawn_backend: if 'mp' in spawn_backend:
time.sleep(0.1) time.sleep(0.1)
os.kill(pid, signal.SIGINT) os.kill(pid, signal.SIGINT)
@ -554,123 +533,38 @@ def test_cancel_via_SIGINT_other_task(
async def spin_for(period=3): async def spin_for(period=3):
"Sync sleep." "Sync sleep."
print(f'sync sleeping in sub-sub for {period}\n')
time.sleep(period) time.sleep(period)
async def spawn_sub_with_sync_blocking_task(): async def spawn():
async with tractor.open_nursery() as an: async with tractor.open_nursery() as tn:
print('starting sync blocking subactor..\n') await tn.run_in_actor(
await an.run_in_actor(
spin_for, spin_for,
name='sleeper', name='sleeper',
) )
print('exiting first subactor layer..\n')
@pytest.mark.parametrize(
'man_cancel_outer',
[
False, # passes if delay != 2
# always causes an unexpected eg-w-embedded-assert-err?
pytest.param(True,
marks=pytest.mark.xfail(
reason=(
'always causes an unexpected eg-w-embedded-assert-err?'
)
),
),
],
)
@no_windows @no_windows
def test_cancel_while_childs_child_in_sync_sleep( def test_cancel_while_childs_child_in_sync_sleep(
loglevel: str, loglevel,
start_method: str, start_method,
spawn_backend: str, spawn_backend,
debug_mode: bool,
reg_addr: tuple,
man_cancel_outer: bool,
): ):
''' """Verify that a child cancelled while executing sync code is torn
Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent down even when that cancellation is triggered by the parent
2 nurseries "up". 2 nurseries "up".
"""
Though the grandchild should stay blocking its actor runtime, its
parent should issue a "zombie reaper" to hard kill it after
sufficient timeout.
'''
if start_method == 'forkserver': if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...") pytest.skip("Forksever sux hard at resuming from sync sleep...")
async def main(): async def main():
# with trio.fail_after(2):
# XXX BIG TODO NOTE XXX async with tractor.open_nursery() as tn:
# await tn.run_in_actor(
# it seems there's a strange race that can happen spawn,
# where where the fail-after will trigger outer scope name='spawn',
# .cancel() which then causes the inner scope to raise,
#
# BaseExceptionGroup('Exceptions from Trio nursery', [
# BaseExceptionGroup('Exceptions from Trio nursery',
# [
# Cancelled(),
# Cancelled(),
# ]
# ),
# AssertionError('assert 0')
# ])
#
# WHY THIS DOESN'T MAKE SENSE:
# ---------------------------
# - it should raise too-slow-error when too slow..
# * verified that using simple-cs and manually cancelling
# you get same outcome -> indicates that the fail-after
# can have its TooSlowError overriden!
# |_ to check this it's easy, simplly decrease the timeout
# as per the var below.
#
# - when using the manual simple-cs the outcome is different
# DESPITE the `assert 0` which means regardless of the
# inner scope effectively failing in the same way, the
# bubbling up **is NOT the same**.
#
# delays trigger diff outcomes..
# ---------------------------
# as seen by uncommenting various lines below there is from
# my POV an unexpected outcome due to the delay=2 case.
#
# delay = 1 # no AssertionError in eg, TooSlowError raised.
# delay = 2 # is AssertionError in eg AND no TooSlowError !?
delay = 4 # is AssertionError in eg AND no _cs cancellation.
with trio.fail_after(delay) as _cs:
# with trio.CancelScope() as cs:
# ^XXX^ can be used instead to see same outcome.
async with (
# tractor.trionics.collapse_eg(), # doesn't help
tractor.open_nursery(
hide_tb=False,
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an,
):
await an.run_in_actor(
spawn_sub_with_sync_blocking_task,
name='sync_blocking_sub',
) )
await trio.sleep(1) await trio.sleep(1)
if man_cancel_outer:
print('Cancelling manually in root')
_cs.cancel()
# trigger exc-srced taskc down
# the actor tree.
print('RAISING IN ROOT')
assert 0 assert 0
with pytest.raises(AssertionError): with pytest.raises(AssertionError):

View File

@ -117,10 +117,9 @@ async def open_actor_local_nursery(
ctx: tractor.Context, ctx: tractor.Context,
): ):
global _nursery global _nursery
async with ( async with trio.open_nursery(
tractor.trionics.collapse_eg(), strict_exception_groups=False,
trio.open_nursery() as tn ) as tn:
):
_nursery = tn _nursery = tn
await ctx.started() await ctx.started()
await trio.sleep(10) await trio.sleep(10)

View File

@ -13,24 +13,26 @@ MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None: def test_empty_mngrs_input_raises() -> None:
async def main(): async def main():
with trio.fail_after(3): with trio.fail_after(1):
async with ( async with (
open_actor_cluster( open_actor_cluster(
modules=[__name__], modules=[__name__],
# NOTE: ensure we can passthrough runtime opts # NOTE: ensure we can passthrough runtime opts
loglevel='cancel', loglevel='info',
debug_mode=False, # debug_mode=True,
) as portals, ) as portals,
gather_contexts(mngrs=()), gather_contexts(
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
): ):
# should fail before this? assert 0
assert portals
# test should fail if we mk it here!
assert 0, 'Should have raised val-err !?'
with pytest.raises(ValueError): with pytest.raises(ValueError):
trio.run(main) trio.run(main)

View File

@ -11,7 +11,6 @@ import psutil
import pytest import pytest
import subprocess import subprocess
import tractor import tractor
from tractor.trionics import collapse_eg
from tractor._testing import tractor_test from tractor._testing import tractor_test
import trio import trio
@ -194,10 +193,10 @@ async def spawn_and_check_registry(
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
async with ( async with trio.open_nursery(
collapse_eg(), strict_exception_groups=False,
trio.open_nursery() as trion, ) as trion:
):
portals = {} portals = {}
for i in range(3): for i in range(3):
name = f'a{i}' name = f'a{i}'
@ -339,12 +338,11 @@ async def close_chans_before_nursery(
async with portal2.open_stream_from( async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen2:
async with ( async with trio.open_nursery(
collapse_eg(), strict_exception_groups=False,
trio.open_nursery() as tn, ) as n:
): n.start_soon(streamer, agen1)
tn.start_soon(streamer, agen1) n.start_soon(cancel, use_signal, .5)
tn.start_soon(cancel, use_signal, .5)
try: try:
await streamer(agen2) await streamer(agen2)
finally: finally:

View File

@ -234,8 +234,10 @@ async def trio_ctx(
with trio.fail_after(1 + delay): with trio.fail_after(1 + delay):
try: try:
async with ( async with (
tractor.trionics.collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, # TODO, for new `trio` / py3.13
# strict_exception_groups=False,
) as tn,
tractor.to_asyncio.open_channel_from( tractor.to_asyncio.open_channel_from(
sleep_and_err, sleep_and_err,
) as (first, chan), ) as (first, chan),
@ -571,16 +573,14 @@ def test_basic_interloop_channel_stream(
fan_out: bool, fan_out: bool,
): ):
async def main(): async def main():
# TODO, figure out min timeout here! async with tractor.open_nursery() as an:
with trio.fail_after(6): portal = await an.run_in_actor(
async with tractor.open_nursery() as an: stream_from_aio,
portal = await an.run_in_actor( infect_asyncio=True,
stream_from_aio, fan_out=fan_out,
infect_asyncio=True, )
fan_out=fan_out, # should raise RAE diectly
) await portal.result()
# should raise RAE diectly
await portal.result()
trio.run(main) trio.run(main)
@ -1088,108 +1088,6 @@ def test_sigint_closes_lifetime_stack(
trio.run(main) trio.run(main)
# ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ??
async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel,
) -> None:
'''
`asyncio.Task` entry point which RTEs before calling
`to_trio.send_nowait()`.
'''
await asyncio.sleep(0.2)
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
await asyncio.sleep(float('inf'))
@tractor.context
async def caching_ep(
ctx: tractor.Context,
):
log = tractor.log.get_logger('caching_ep')
log.info('syncing via `ctx.started()`')
await ctx.started()
# XXX, allocate the `open_channel_from()` inside
# a `.trionics.maybe_open_context()`.
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={
'target': raise_before_started,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access
key=tractor.current_actor().uid,
) as (cache_hit, (clients, chan)),
):
if cache_hit:
log.error(
'Re-using cached `.open_from_channel()` call!\n'
)
else:
log.info(
'Allocating SHOULD-FAIL `.open_from_channel()`\n'
)
await trio.sleep_forever()
def test_aio_side_raises_before_started(
reg_addr: tuple[str, int],
debug_mode: bool,
loglevel: str,
):
'''
Simulates connection-err from `piker.brokers.ib.api`..
Ensure any error raised by child-`asyncio.Task` BEFORE
`chan.started()`
'''
# delay = 999 if debug_mode else 1
async def main():
with trio.fail_after(3):
an: tractor.ActorNursery
async with tractor.open_nursery(
debug_mode=debug_mode,
loglevel=loglevel,
) as an:
p: tractor.Portal = await an.start_actor(
'lchan_cacher_that_raises_fast',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
caching_ep,
) as (ctx, first):
assert not first
with pytest.raises(
expected_exception=(RemoteActorError),
) as excinfo:
trio.run(main)
# ensure `asyncio.Task` exception is bubbled
# allll the way erp!!
rae = excinfo.value
assert rae.boxed_type is RuntimeError
# TODO: debug_mode tests once we get support for `asyncio`! # TODO: debug_mode tests once we get support for `asyncio`!
# #
# -[ ] need tests to wrap both scripts: # -[ ] need tests to wrap both scripts:

View File

@ -235,16 +235,10 @@ async def cancel_after(wait, reg_addr):
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def time_quad_ex( def time_quad_ex(reg_addr, ci_env, spawn_backend):
reg_addr: tuple,
ci_env: bool,
spawn_backend: str,
):
if spawn_backend == 'mp': if spawn_backend == 'mp':
''' """no idea but the mp *nix runs are flaking out here often...
no idea but the mp *nix runs are flaking out here often... """
'''
pytest.skip("Test is too flaky on mp in CI") pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
@ -255,24 +249,12 @@ def time_quad_ex(
return results, diff return results, diff
def test_a_quadruple_example( def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
time_quad_ex: tuple, """This also serves as a kind of "we'd like to be this fast test"."""
ci_env: bool,
spawn_backend: str,
):
'''
This also serves as a kind of "we'd like to be this fast test".
'''
results, diff = time_quad_ex results, diff = time_quad_ex
assert results assert results
this_fast = ( this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
6 if platform.system() in (
'Windows',
'Darwin',
)
else 3
)
assert diff < this_fast assert diff < this_fast

View File

@ -1,6 +1,5 @@
''' '''
Suites for our `.trionics.maybe_open_context()` multi-task Async context manager cache api testing: ``trionics.maybe_open_context():``
shared-cached `@acm` API.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
@ -10,15 +9,6 @@ from typing import Awaitable
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor.trionics import (
maybe_open_context,
)
from tractor.log import (
get_console_log,
get_logger,
)
log = get_logger(__name__)
_resource: int = 0 _resource: int = 0
@ -62,7 +52,7 @@ def test_resource_only_entered_once(key_on):
# different task names per task will be used # different task names per task will be used
kwargs = {'task_name': name} kwargs = {'task_name': name}
async with maybe_open_context( async with tractor.trionics.maybe_open_context(
maybe_increment_counter, maybe_increment_counter,
kwargs=kwargs, kwargs=kwargs,
key=key, key=key,
@ -82,13 +72,11 @@ def test_resource_only_entered_once(key_on):
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
async with ( async with (
tractor.open_root_actor(), tractor.open_root_actor(),
trio.open_nursery() as tn, trio.open_nursery() as n,
): ):
for i in range(10): for i in range(10):
tn.start_soon( n.start_soon(enter_cached_mngr, f'task_{i}')
enter_cached_mngr,
f'task_{i}',
)
await trio.sleep(0.001) await trio.sleep(0.001)
trio.run(main) trio.run(main)
@ -110,34 +98,23 @@ async def streamer(
@acm @acm
async def open_stream() -> Awaitable[ async def open_stream() -> Awaitable[tractor.MsgStream]:
tuple[
tractor.ActorNursery,
tractor.MsgStream,
]
]:
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
portal = await an.start_actor( portal = await an.start_actor(
'streamer', 'streamer',
enable_modules=[__name__], enable_modules=[__name__],
) )
try: async with (
async with ( portal.open_context(streamer) as (ctx, first),
portal.open_context(streamer) as (ctx, first), ctx.open_stream() as stream,
ctx.open_stream() as stream, ):
): yield stream
print('Entered open_stream() caller')
yield an, stream
print('Exited open_stream() caller')
finally: print('Cancelling streamer')
print( await portal.cancel_actor()
'Cancelling streamer with,\n' print('Cancelled streamer')
'=> `Portal.cancel_actor()`'
)
await portal.cancel_actor()
print('Cancelled streamer')
except Exception as err: except Exception as err:
print( print(
@ -150,15 +127,11 @@ async def open_stream() -> Awaitable[
@acm @acm
async def maybe_open_stream(taskname: str): async def maybe_open_stream(taskname: str):
async with maybe_open_context( async with tractor.trionics.maybe_open_context(
# NOTE: all secondary tasks should cache hit on the same key # NOTE: all secondary tasks should cache hit on the same key
acm_func=open_stream, acm_func=open_stream,
) as ( ) as (cache_hit, stream):
cache_hit,
(an, stream)
):
# when the actor + portal + ctx + stream has already been
# allocated we want to just bcast to this task.
if cache_hit: if cache_hit:
print(f'{taskname} loaded from cache') print(f'{taskname} loaded from cache')
@ -166,43 +139,10 @@ async def maybe_open_stream(taskname: str):
# if this feed is already allocated by the first # if this feed is already allocated by the first
# task that entereed # task that entereed
async with stream.subscribe() as bstream: async with stream.subscribe() as bstream:
yield an, bstream yield bstream
print(
f'cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
# we should always unreg the "cloned" bcrc for this
# consumer-task
assert id(bstream) not in bstream._state.subs
else: else:
# yield the actual stream # yield the actual stream
try: yield stream
yield an, stream
finally:
print(
f'NON-cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
first_bstream = stream._broadcaster
bcrx_state = first_bstream._state
subs: dict[int, int] = bcrx_state.subs
if len(subs) == 1:
assert id(first_bstream) in subs
# ^^TODO! the bcrx should always de-allocate all subs,
# including the implicit first one allocated on entry
# by the first subscribing peer task, no?
#
# -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
# |_ allows reverting `MsgStream.receive()` to the
# non-bcaster method.
# |_ we can decide whether to reset `._broadcaster`?
#
# await tractor.pause(shield=True)
def test_open_local_sub_to_stream( def test_open_local_sub_to_stream(
@ -219,24 +159,16 @@ def test_open_local_sub_to_stream(
if debug_mode: if debug_mode:
timeout = 999 timeout = 999
print(f'IN debug_mode, setting large timeout={timeout!r}..')
async def main(): async def main():
full = list(range(1000)) full = list(range(1000))
an: tractor.ActorNursery|None = None
num_tasks: int = 10
async def get_sub_and_pull(taskname: str): async def get_sub_and_pull(taskname: str):
nonlocal an
stream: tractor.MsgStream stream: tractor.MsgStream
async with ( async with (
maybe_open_stream(taskname) as ( maybe_open_stream(taskname) as stream,
an,
stream,
),
): ):
if '0' in taskname: if '0' in taskname:
assert isinstance(stream, tractor.MsgStream) assert isinstance(stream, tractor.MsgStream)
@ -248,159 +180,34 @@ def test_open_local_sub_to_stream(
first = await stream.receive() first = await stream.receive()
print(f'{taskname} started with value {first}') print(f'{taskname} started with value {first}')
seq: list[int] = [] seq = []
async for msg in stream: async for msg in stream:
seq.append(msg) seq.append(msg)
assert set(seq).issubset(set(full)) assert set(seq).issubset(set(full))
# end of @acm block
print(f'{taskname} finished') print(f'{taskname} finished')
root: tractor.Actor
with trio.fail_after(timeout) as cs: with trio.fail_after(timeout) as cs:
# TODO: turns out this isn't multi-task entrant XD # TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic? # We probably need an indepotent entry semantic?
async with tractor.open_root_actor( async with tractor.open_root_actor(
debug_mode=debug_mode, debug_mode=debug_mode,
# maybe_enable_greenback=True, ):
#
# ^TODO? doesn't seem to mk breakpoint() usage work
# bc each bg task needs to open a portal??
# - [ ] we should consider making this part of
# our taskman defaults?
# |_see https://github.com/goodboy/tractor/pull/363
#
) as root:
assert root.is_registrar
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
for i in range(num_tasks): for i in range(10):
tn.start_soon( tn.start_soon(
get_sub_and_pull, get_sub_and_pull,
f'task_{i}', f'task_{i}',
) )
await trio.sleep(0.001) await trio.sleep(0.001)
print('all consumer tasks finished!') print('all consumer tasks finished')
# ?XXX, ensure actor-nursery is shutdown or we might
# hang here due to a minor task deadlock/race-condition?
#
# - seems that all we need is a checkpoint to ensure
# the last suspended task, which is inside
# `.maybe_open_context()`, can do the
# `Portal.cancel_actor()` call?
#
# - if that bg task isn't resumed, then this blocks
# timeout might hit before that?
#
if root.ipc_server.has_peers():
await trio.lowlevel.checkpoint()
# alt approach, cancel the entire `an`
# await tractor.pause()
# await an.cancel()
# end of runtime scope
print('root actor terminated.')
if cs.cancelled_caught: if cs.cancelled_caught:
pytest.fail( pytest.fail(
'Should NOT time out in `open_root_actor()` ?' 'Should NOT time out in `open_root_actor()` ?'
) )
print('exiting main.')
trio.run(main)
@acm
async def cancel_outer_cs(
cs: trio.CancelScope|None = None,
delay: float = 0,
):
# on first task delay this enough to block
# the 2nd task but then cancel it mid sleep
# so that the tn.start() inside the key-err handler block
# is cancelled and would previously corrupt the
# mutext state.
log.info(f'task entering sleep({delay})')
await trio.sleep(delay)
if cs:
log.info('task calling cs.cancel()')
cs.cancel()
trio.lowlevel.checkpoint()
yield
await trio.sleep_forever()
def test_lock_not_corrupted_on_fast_cancel(
debug_mode: bool,
loglevel: str,
):
'''
Verify that if the caching-task (the first to enter
`maybe_open_context()`) is cancelled mid-cache-miss, the embedded
mutex can never be left in a corrupted state.
That is, the lock is always eventually released ensuring a peer
(cache-hitting) task will never,
- be left to inf-block/hang on the `lock.acquire()`.
- try to release the lock when still owned by the caching-task
due to it having erronously exited without calling
`lock.release()`.
'''
delay: float = 1.
async def use_moc(
cs: trio.CancelScope|None,
delay: float,
):
log.info('task entering moc')
async with maybe_open_context(
cancel_outer_cs,
kwargs={
'cs': cs,
'delay': delay,
},
) as (cache_hit, _null):
if cache_hit:
log.info('2nd task entered')
else:
log.info('1st task entered')
await trio.sleep_forever()
async def main():
with trio.fail_after(delay + 2):
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
trio.open_nursery() as tn,
):
get_console_log('info')
log.info('yo starting')
cs = tn.cancel_scope
tn.start_soon(
use_moc,
cs,
delay,
name='child',
)
with trio.CancelScope() as rent_cs:
await use_moc(
cs=rent_cs,
delay=delay,
)
trio.run(main) trio.run(main)

View File

@ -147,7 +147,8 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever() await trio.sleep_forever()
async def _trio_main(): async def _trio_main():
with trio.fail_after(2 if not debug_mode else 999): # with trio.fail_after(2):
with trio.fail_after(999):
first: str first: str
chan: to_asyncio.LinkedTaskChannel chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event() aio_ev = asyncio.Event()
@ -216,25 +217,32 @@ def test_trio_prestarted_task_bubbles(
): ):
aio_ev.set() aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side # ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first. # having (maybe) errored first.
if aio_err_trigger in ( if aio_err_trigger in (
'after_trio_task_starts', 'after_trio_task_starts',
'after_start_point', 'after_start_point',
): ):
patt: str = 'trio-side' assert len(errs := rest_eg.exceptions) == 1
expect_exc = TypeError typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should # when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side. # never see anythinb but the aio-side.
else: else:
patt: str = 'asyncio-side' assert len(rtes := rte_eg.exceptions) == 1
expect_exc = RuntimeError assert 'asyncio-side' in rtes[0].args[0]
with pytest.raises(expect_exc) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
caught_exc = excinfo.value
assert patt in caught_exc.args

View File

@ -8,7 +8,6 @@ from contextlib import (
) )
import pytest import pytest
from tractor.trionics import collapse_eg
import trio import trio
from trio import TaskStatus from trio import TaskStatus
@ -65,8 +64,9 @@ def test_stashed_child_nursery(use_start_soon):
async def main(): async def main():
async with ( async with (
collapse_eg(), trio.open_nursery(
trio.open_nursery() as pn, strict_exception_groups=False,
) as pn,
): ):
cn = await pn.start(mk_child_nursery) cn = await pn.start(mk_child_nursery)
assert cn assert cn
@ -197,8 +197,10 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
async with ( async with (
# XXX should ensure ONLY the KBI # XXX should ensure ONLY the KBI
# is relayed upward # is relayed upward
collapse_eg(), trionics.collapse_eg(),
trio.open_nursery(), # as tn, trio.open_nursery(
# strict_exception_groups=False,
), # as tn,
trionics.gather_contexts([ trionics.gather_contexts([
open_memchan(), open_memchan(),

View File

@ -55,17 +55,10 @@ async def open_actor_cluster(
raise ValueError( raise ValueError(
'Number of names is {len(names)} but count it {count}') 'Number of names is {len(names)} but count it {count}')
async with ( async with tractor.open_nursery(
# tractor.trionics.collapse_eg(), **runtime_kwargs,
tractor.open_nursery( ) as an:
**runtime_kwargs, async with trio.open_nursery() as n:
) as an
):
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc()
):
uid = tractor.current_actor().uid uid = tractor.current_actor().uid
async def _start(name: str) -> None: async def _start(name: str) -> None:
@ -76,8 +69,9 @@ async def open_actor_cluster(
) )
for name in names: for name in names:
tn.start_soon(_start, name) n.start_soon(_start, name)
assert len(portals) == count assert len(portals) == count
yield portals yield portals
await an.cancel(hard_kill=hard_kill) await an.cancel(hard_kill=hard_kill)

View File

@ -154,7 +154,7 @@ class Context:
2 cancel-scope-linked, communicating and parallel executing 2 cancel-scope-linked, communicating and parallel executing
`Task`s. Contexts are allocated on each side of any task `Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "child" side a context is actor from a `Portal`. On the "callee" side a context is
always allocated inside `._rpc._invoke()`. always allocated inside `._rpc._invoke()`.
TODO: more detailed writeup on cancellation, error and TODO: more detailed writeup on cancellation, error and
@ -222,8 +222,8 @@ class Context:
# `._runtime.invoke()`. # `._runtime.invoke()`.
_remote_func_type: str | None = None _remote_func_type: str | None = None
# NOTE: (for now) only set (a portal) on the parent side since # NOTE: (for now) only set (a portal) on the caller side since
# the child doesn't generally need a ref to one and should # the callee doesn't generally need a ref to one and should
# normally need to explicitly ask for handle to its peer if # normally need to explicitly ask for handle to its peer if
# more the the `Context` is needed? # more the the `Context` is needed?
_portal: Portal | None = None _portal: Portal | None = None
@ -252,12 +252,12 @@ class Context:
_outcome_msg: Return|Error|ContextCancelled = Unresolved _outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value # on a clean exit there should be a final value
# delivered from the far end "child" task, so # delivered from the far end "callee" task, so
# this value is only set on one side. # this value is only set on one side.
# _result: Any | int = None # _result: Any | int = None
_result: PayloadT|Unresolved = Unresolved _result: PayloadT|Unresolved = Unresolved
# if the local "parent" task errors this value is always set # if the local "caller" task errors this value is always set
# to the error that was captured in the # to the error that was captured in the
# `Portal.open_context().__aexit__()` teardown block OR, in # `Portal.open_context().__aexit__()` teardown block OR, in
# 2 special cases when an (maybe) expected remote error # 2 special cases when an (maybe) expected remote error
@ -293,7 +293,7 @@ class Context:
# a `ContextCancelled` due to a call to `.cancel()` triggering # a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side: # "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging # - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "child" # the crash handler REPL in such cases where the "callee"
# raises the cancellation, # raises the cancellation,
# - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if # - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if
# the global tty-lock has been configured to filter out some # the global tty-lock has been configured to filter out some
@ -307,8 +307,8 @@ class Context:
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
# the parent-task's calling-fn's frame-info, the frame above # caller of `Portal.open_context()` for
# `Portal.open_context()`, for introspection/logging. # logging purposes mostly
_caller_info: CallerInfo|None = None _caller_info: CallerInfo|None = None
# overrun handling machinery # overrun handling machinery
@ -529,11 +529,11 @@ class Context:
''' '''
Exactly the value of `self._scope.cancelled_caught` Exactly the value of `self._scope.cancelled_caught`
(delegation) and should only be (able to be read as) (delegation) and should only be (able to be read as)
`True` for a `.side == "parent"` ctx wherein the `True` for a `.side == "caller"` ctx wherein the
`Portal.open_context()` block was exited due to a call to `Portal.open_context()` block was exited due to a call to
`._scope.cancel()` - which should only ocurr in 2 cases: `._scope.cancel()` - which should only ocurr in 2 cases:
- a parent side calls `.cancel()`, the far side cancels - a caller side calls `.cancel()`, the far side cancels
and delivers back a `ContextCancelled` (making and delivers back a `ContextCancelled` (making
`.cancel_acked == True`) and `._scope.cancel()` is `.cancel_acked == True`) and `._scope.cancel()` is
called by `._maybe_cancel_and_set_remote_error()` which called by `._maybe_cancel_and_set_remote_error()` which
@ -542,20 +542,20 @@ class Context:
=> `._scope.cancelled_caught == True` by normal `trio` => `._scope.cancelled_caught == True` by normal `trio`
cs semantics. cs semantics.
- a parent side is delivered a `._remote_error: - a caller side is delivered a `._remote_error:
RemoteActorError` via `._deliver_msg()` and a transitive RemoteActorError` via `._deliver_msg()` and a transitive
call to `_maybe_cancel_and_set_remote_error()` calls call to `_maybe_cancel_and_set_remote_error()` calls
`._scope.cancel()` and that cancellation eventually `._scope.cancel()` and that cancellation eventually
results in `trio.Cancelled`(s) caught in the results in `trio.Cancelled`(s) caught in the
`.open_context()` handling around the @acm's `yield`. `.open_context()` handling around the @acm's `yield`.
Only as an FYI, in the "child" side case it can also be Only as an FYI, in the "callee" side case it can also be
set but never is readable by any task outside the RPC set but never is readable by any task outside the RPC
machinery in `._invoke()` since,: machinery in `._invoke()` since,:
- when a child side calls `.cancel()`, `._scope.cancel()` - when a callee side calls `.cancel()`, `._scope.cancel()`
is called immediately and handled specially inside is called immediately and handled specially inside
`._invoke()` to raise a `ContextCancelled` which is then `._invoke()` to raise a `ContextCancelled` which is then
sent to the parent side. sent to the caller side.
However, `._scope.cancelled_caught` can NEVER be However, `._scope.cancelled_caught` can NEVER be
accessed/read as `True` by any RPC invoked task since it accessed/read as `True` by any RPC invoked task since it
@ -666,7 +666,7 @@ class Context:
when called/closed by actor local task(s). when called/closed by actor local task(s).
NOTEs: NOTEs:
- It is expected that the parent has previously unwrapped - It is expected that the caller has previously unwrapped
the remote error using a call to `unpack_error()` and the remote error using a call to `unpack_error()` and
provides that output exception value as the input provides that output exception value as the input
`error` argument *here*. `error` argument *here*.
@ -676,7 +676,7 @@ class Context:
`Portal.open_context()` (ideally) we want to interrupt `Portal.open_context()` (ideally) we want to interrupt
any ongoing local tasks operating within that any ongoing local tasks operating within that
`Context`'s cancel-scope so as to be notified ASAP of `Context`'s cancel-scope so as to be notified ASAP of
the remote error and engage any parent handling (eg. the remote error and engage any caller handling (eg.
for cross-process task supervision). for cross-process task supervision).
- In some cases we may want to raise the remote error - In some cases we may want to raise the remote error
@ -886,11 +886,6 @@ class Context:
@property @property
def repr_caller(self) -> str: def repr_caller(self) -> str:
'''
Render a "namespace-path" style representation of the calling
task-fn.
'''
ci: CallerInfo|None = self._caller_info ci: CallerInfo|None = self._caller_info
if ci: if ci:
return ( return (
@ -904,7 +899,7 @@ class Context:
def repr_api(self) -> str: def repr_api(self) -> str:
return 'Portal.open_context()' return 'Portal.open_context()'
# TODO: use `.dev._frame_stack` scanning to find caller fn! # TODO: use `.dev._frame_stack` scanning to find caller!
# ci: CallerInfo|None = self._caller_info # ci: CallerInfo|None = self._caller_info
# if ci: # if ci:
# return ( # return (
@ -939,7 +934,7 @@ class Context:
=> That is, an IPC `Context` (this) **does not** => That is, an IPC `Context` (this) **does not**
have the same semantics as a `trio.CancelScope`. have the same semantics as a `trio.CancelScope`.
If the parent (who entered the `Portal.open_context()`) If the caller (who entered the `Portal.open_context()`)
desires that the internal block's cancel-scope be desires that the internal block's cancel-scope be
cancelled it should open its own `trio.CancelScope` and cancelled it should open its own `trio.CancelScope` and
manage it as needed. manage it as needed.
@ -1011,6 +1006,7 @@ class Context:
else: else:
log.cancel( log.cancel(
f'Timed out on cancel request of remote task?\n' f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}' f'{reminfo}'
) )
@ -1021,7 +1017,7 @@ class Context:
# `_invoke()` RPC task. # `_invoke()` RPC task.
# #
# NOTE: on this side we ALWAYS cancel the local scope # NOTE: on this side we ALWAYS cancel the local scope
# since the parent expects a `ContextCancelled` to be sent # since the caller expects a `ContextCancelled` to be sent
# from `._runtime._invoke()` back to the other side. The # from `._runtime._invoke()` back to the other side. The
# logic for catching the result of the below # logic for catching the result of the below
# `._scope.cancel()` is inside the `._runtime._invoke()` # `._scope.cancel()` is inside the `._runtime._invoke()`
@ -1194,8 +1190,8 @@ class Context:
) -> Any|Exception: ) -> Any|Exception:
''' '''
From some (parent) side task, wait for and return the final From some (caller) side task, wait for and return the final
result from the remote (child) side's task. result from the remote (callee) side's task.
This provides a mechanism for one task running in some actor to wait This provides a mechanism for one task running in some actor to wait
on another task at the other side, in some other actor, to terminate. on another task at the other side, in some other actor, to terminate.
@ -1491,12 +1487,6 @@ class Context:
): ):
status = 'peer-cancelled' status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition # (remote) error condition
case ( case (
Unresolved, Unresolved,
@ -1610,7 +1600,7 @@ class Context:
raise err raise err
# TODO: maybe a flag to by-pass encode op if already done # TODO: maybe a flag to by-pass encode op if already done
# here in parent? # here in caller?
await self.chan.send(started_msg) await self.chan.send(started_msg)
# set msg-related internal runtime-state # set msg-related internal runtime-state
@ -1686,7 +1676,7 @@ class Context:
XXX RULES XXX XXX RULES XXX
------ - ------ ------ - ------
- NEVER raise remote errors from this method; a calling runtime-task. - NEVER raise remote errors from this method; a runtime task caller.
An error "delivered" to a ctx should always be raised by An error "delivered" to a ctx should always be raised by
the corresponding local task operating on the the corresponding local task operating on the
`Portal`/`Context` APIs. `Portal`/`Context` APIs.
@ -1762,7 +1752,7 @@ class Context:
else: else:
report = ( report = (
'Queueing OVERRUN msg on parent task:\n\n' 'Queueing OVERRUN msg on caller task:\n\n'
+ report + report
) )
log.debug(report) log.debug(report)
@ -1958,12 +1948,12 @@ async def open_context_from_portal(
IPC protocol. IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context` The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "child" task via a call and any first value "sent" by the "callee" task via a call
to `Context.started(<value: Any>)`; this side of the to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "child" task calls context does not unblock until the "callee" task calls
`.started()` in similar style to `trio.Nursery.start()`. `.started()` in similar style to `trio.Nursery.start()`.
When the "child" (side that is "called"/started by a call When the "callee" (side that is "called"/started by a call
to *this* method) returns, the parent side (this) unblocks to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be and any final value delivered from the other end can be
retrieved using the `Contex.wait_for_result()` api. retrieved using the `Contex.wait_for_result()` api.
@ -1976,7 +1966,7 @@ async def open_context_from_portal(
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack # denote this frame as a "runtime frame" for stack
# introspection where we report the parent code in logging # introspection where we report the caller code in logging
# and error message content. # and error message content.
# NOTE: 2 bc of the wrapping `@acm` # NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa __runtimeframe__: int = 2 # noqa
@ -2035,7 +2025,7 @@ async def open_context_from_portal(
# placeholder for any exception raised in the runtime # placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure. # or by user tasks which cause this context's closure.
scope_err: BaseException|None = None scope_err: BaseException|None = None
ctxc_from_child: ContextCancelled|None = None ctxc_from_callee: ContextCancelled|None = None
try: try:
async with ( async with (
collapse_eg(), collapse_eg(),
@ -2114,7 +2104,7 @@ async def open_context_from_portal(
# that we can re-use it around the `yield` ^ here # that we can re-use it around the `yield` ^ here
# or vice versa? # or vice versa?
# #
# maybe TODO NOTE: between the parent exiting and # maybe TODO NOTE: between the caller exiting and
# arriving here the far end may have sent a ctxc-msg or # arriving here the far end may have sent a ctxc-msg or
# other error, so the quetion is whether we should check # other error, so the quetion is whether we should check
# for it here immediately and maybe raise so as to engage # for it here immediately and maybe raise so as to engage
@ -2180,16 +2170,16 @@ async def open_context_from_portal(
# request in which case we DO let the error bubble to the # request in which case we DO let the error bubble to the
# opener. # opener.
# #
# 2-THIS "parent" task somewhere invoked `Context.cancel()` # 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "child" # and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from # task, in which case we mask the `ContextCancelled` from
# bubbling to this "parent" (much like how `trio.Nursery` # bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to # swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`) # `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
scope_err = ctxc scope_err = ctxc
ctx._local_error: BaseException = scope_err ctx._local_error: BaseException = scope_err
ctxc_from_child = ctxc ctxc_from_callee = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will # using this code and then resuming the REPL will
@ -2226,11 +2216,11 @@ async def open_context_from_portal(
# the above `._scope` can be cancelled due to: # the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or # 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`, # `Actor.cancel()`,
# 2. any "child"-side remote error, possibly also a cancellation # 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer, # request by some peer,
# 3. any "parent" (aka THIS scope's) local error raised in the above `yield` # 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except ( except (
# CASE 3: standard local error in this parent/yieldee # CASE 3: standard local error in this caller/yieldee
Exception, Exception,
# CASES 1 & 2: can manifest as a `ctx._scope_nursery` # CASES 1 & 2: can manifest as a `ctx._scope_nursery`
@ -2244,9 +2234,9 @@ async def open_context_from_portal(
# any `Context._maybe_raise_remote_err()` call. # any `Context._maybe_raise_remote_err()` call.
# #
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error delivered from the "child" side # from any error delivered from the "callee" side
# AND a group-exc is only raised if there was > 1 # AND a group-exc is only raised if there was > 1
# tasks started *here* in the "parent" / opener # tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls # block. If any one of those tasks calls
# `.wait_for_result()` or `MsgStream.receive()` # `.wait_for_result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively # `._maybe_raise_remote_err()` will be transitively
@ -2259,8 +2249,8 @@ async def open_context_from_portal(
trio.Cancelled, # NOTE: NOT from inside the ctx._scope trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt, KeyboardInterrupt,
) as rent_err: ) as caller_err:
scope_err = rent_err scope_err = caller_err
ctx._local_error: BaseException = scope_err ctx._local_error: BaseException = scope_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR. # XXX: ALWAYS request the context to CANCEL ON any ERROR.
@ -2270,7 +2260,7 @@ async def open_context_from_portal(
# await debug.pause() # await debug.pause()
# log.cancel( # log.cancel(
match scope_err: match scope_err:
case trio.Cancelled(): case trio.Cancelled:
logmeth = log.cancel logmeth = log.cancel
# XXX explicitly report on any non-graceful-taskc cases # XXX explicitly report on any non-graceful-taskc cases
@ -2278,7 +2268,7 @@ async def open_context_from_portal(
logmeth = log.exception logmeth = log.exception
logmeth( logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
) )
if debug_mode(): if debug_mode():
@ -2299,9 +2289,9 @@ async def open_context_from_portal(
'Calling `ctx.cancel()`!\n' 'Calling `ctx.cancel()`!\n'
) )
# we don't need to cancel the child if it already # we don't need to cancel the callee if it already
# told us it's cancelled ;p # told us it's cancelled ;p
if ctxc_from_child is None: if ctxc_from_callee is None:
try: try:
await ctx.cancel() await ctx.cancel()
except ( except (
@ -2332,8 +2322,8 @@ async def open_context_from_portal(
# via a call to # via a call to
# `Context._maybe_cancel_and_set_remote_error()`. # `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS # As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "child" side fails and causes # ALWAYS SET any time "callee" side fails and causes "caller
# "parent side" cancellation via a `ContextCancelled` here. # side" cancellation via a `ContextCancelled` here.
try: try:
result_or_err: Exception|Any = await ctx.wait_for_result() result_or_err: Exception|Any = await ctx.wait_for_result()
except BaseException as berr: except BaseException as berr:
@ -2369,7 +2359,7 @@ async def open_context_from_portal(
) )
case (None, _): case (None, _):
log.runtime( log.runtime(
'Context returned final result from child task:\n' 'Context returned final result from callee task:\n'
f'<= peer: {uid}\n' f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n' f' |_ {nsf}()\n\n'
@ -2464,7 +2454,7 @@ async def open_context_from_portal(
) )
# TODO: should we add a `._cancel_req_received` # TODO: should we add a `._cancel_req_received`
# flag to determine if the child manually called # flag to determine if the callee manually called
# `ctx.cancel()`? # `ctx.cancel()`?
# -[ ] going to need a cid check no? # -[ ] going to need a cid check no?
@ -2520,7 +2510,7 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan parent-info if log level so high! # TODO: only scan caller-info if log level so high!
from .devx._frame_stack import find_caller_info from .devx._frame_stack import find_caller_info
caller_info: CallerInfo|None = find_caller_info() caller_info: CallerInfo|None = find_caller_info()

View File

@ -300,7 +300,7 @@ class Portal:
) )
# XXX the one spot we set it? # XXX the one spot we set it?
chan._cancel_called: bool = True self.channel._cancel_called: bool = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with # XXX: sure would be nice to make this work with

View File

@ -481,11 +481,10 @@ async def open_root_actor(
collapse_eg(), collapse_eg(),
trio.open_nursery() as root_tn, trio.open_nursery() as root_tn,
# ?TODO? finally-footgun below? # XXX, finally-footgun below?
# -> see note on why shielding. # -> see note on why shielding.
# maybe_raise_from_masking_exc(), # maybe_raise_from_masking_exc(),
): ):
actor._root_tn = root_tn
# `_runtime.async_main()` creates an internal nursery # `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process) # and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called # tree has terminated thereby conducting so called
@ -524,11 +523,6 @@ async def open_root_actor(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
debug_filter=debug_filter, debug_filter=debug_filter,
# XXX NOTE, required to debug root-actor
# crashes under cancellation conditions; so
# most of them!
shield=root_tn.cancel_scope.cancel_called,
) )
if ( if (
@ -568,7 +562,6 @@ async def open_root_actor(
f'{op_nested_actor_repr}' f'{op_nested_actor_repr}'
) )
# XXX, THIS IS A *finally-footgun*! # XXX, THIS IS A *finally-footgun*!
# (also mentioned in with-block above)
# -> though already shields iternally it can # -> though already shields iternally it can
# taskc here and mask underlying errors raised in # taskc here and mask underlying errors raised in
# the try-block above? # the try-block above?

View File

@ -384,7 +384,7 @@ async def _errors_relayed_via_ipc(
# RPC task bookeeping. # RPC task bookeeping.
# since RPC tasks are scheduled inside a flat # since RPC tasks are scheduled inside a flat
# `Actor._service_tn`, we add "handles" to each such that # `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled. # they can be individually ccancelled.
finally: finally:
@ -462,7 +462,7 @@ async def _invoke(
connected IPC channel. connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_tn: Nursery`. remotely invoked function, normally in `Actor._service_n: Nursery`.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -642,7 +642,7 @@ async def _invoke(
tn: Nursery tn: Nursery
rpc_ctx_cs: CancelScope rpc_ctx_cs: CancelScope
async with ( async with (
collapse_eg(hide_tb=False), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
@ -823,44 +823,24 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n' f'after having {ctx.repr_state!r}\n'
) )
if merr: if merr:
logmeth: Callable = log.error logmeth: Callable = log.error
if ( if isinstance(merr, ContextCancelled):
# ctxc: by `Context.cancel()` logmeth: Callable = log.runtime
isinstance(merr, ContextCancelled)
# out-of-layer cancellation, one of: if not isinstance(merr, RemoteActorError):
# - actorc: by `Portal.cancel_actor()` tb_str: str = ''.join(traceback.format_exception(merr))
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
descr_str += ( descr_str += (
f'\n{merr!r}\n' # needed? f'\n{merr!r}\n' # needed?
f'{tb_str}\n' f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
) )
else: else:
descr_str += ( descr_str += f'\n{merr!r}\n'
f'{merr!r}\n'
)
else: else:
descr_str += ( descr_str += f'\nwith final result {ctx.outcome!r}\n'
f'\n'
f'with final result {ctx.outcome!r}\n'
)
logmeth( logmeth(
f'{message}\n' f'{message}\n'
@ -936,7 +916,7 @@ async def process_messages(
Receive (multiplexed) per-`Channel` RPC requests as msgs from Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_tn: Nursery`. `trio.Task`s inside the `Actor._service_n: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting) Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`) request payloads (eg. `started`, `yield`, `return`, `error`)
@ -961,7 +941,7 @@ async def process_messages(
''' '''
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
assert actor._service_tn # runtime state sanity assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
@ -1172,7 +1152,7 @@ async def process_messages(
start_status += '->( scheduling new task..\n' start_status += '->( scheduling new task..\n'
log.runtime(start_status) log.runtime(start_status)
try: try:
ctx: Context = await actor._service_tn.start( ctx: Context = await actor._service_n.start(
partial( partial(
_invoke, _invoke,
actor, actor,
@ -1312,7 +1292,7 @@ async def process_messages(
) as err: ) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn: Nursery = actor._service_tn sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel( log.cancel(
f'Service nursery cancelled before it handled {funcname}' f'Service nursery cancelled before it handled {funcname}'

View File

@ -35,15 +35,6 @@ for running all lower level spawning, supervision and msging layers:
SC-transitive RPC via scheduling of `trio` tasks. SC-transitive RPC via scheduling of `trio` tasks.
- registration of newly spawned actors with the discovery sys. - registration of newly spawned actors with the discovery sys.
Glossary:
--------
- tn: a `trio.Nursery` or "task nursery".
- an: an `ActorNursery` or "actor nursery".
- root: top/parent-most scope/task/process/actor (or other runtime
primitive) in a hierarchical tree.
- parent-ish: "higher-up" in the runtime-primitive hierarchy.
- child-ish: "lower-down" in the runtime-primitive hierarchy.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import (
@ -85,7 +76,6 @@ from tractor.msg import (
) )
from .trionics import ( from .trionics import (
collapse_eg, collapse_eg,
maybe_open_nursery,
) )
from .ipc import ( from .ipc import (
Channel, Channel,
@ -183,11 +173,10 @@ class Actor:
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`, # nursery placeholders filled in by `async_main()` after fork
# - after fork for subactors. _root_n: Nursery|None = None
# - during boot for the root actor. _service_n: Nursery|None = None
_root_tn: Nursery|None = None
_service_tn: Nursery|None = None
_ipc_server: _server.IPCServer|None = None _ipc_server: _server.IPCServer|None = None
@property @property
@ -1021,48 +1010,12 @@ class Actor:
the RPC service nursery. the RPC service nursery.
''' '''
actor_repr: str = _pformat.nest_from_op( assert self._service_n
input_op='>c(', self._service_n.start_soon(
text=self.pformat(),
nest_indent=1,
)
log.cancel(
'Actor.cancel_soon()` was called!\n'
f'>> scheduling `Actor.cancel()`\n'
f'{actor_repr}'
)
assert self._service_tn
self._service_tn.start_soon(
self.cancel, self.cancel,
None, # self cancel all rpc tasks None, # self cancel all rpc tasks
) )
# schedule a "canceller task" in the `._root_tn` once the
# `._service_tn` is fully shutdown; task waits for child-ish
# scopes to fully exit then finally cancels its parent,
# root-most, scope.
async def cancel_root_tn_after_services():
log.runtime(
'Waiting on service-tn to cancel..\n'
f'c>)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
)
await self._cancel_complete.wait()
log.cancel(
f'`._service_tn` cancelled\n'
f'>c)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
f'\n'
f'>> cancelling `._root_tn`\n'
f'c>(\n'
f' |_{self._root_tn.cancel_scope!r}\n'
)
self._root_tn.cancel_scope.cancel()
self._root_tn.start_soon(
cancel_root_tn_after_services
)
@property @property
def cancel_complete(self) -> bool: def cancel_complete(self) -> bool:
return self._cancel_complete.is_set() return self._cancel_complete.is_set()
@ -1167,8 +1120,8 @@ class Actor:
await ipc_server.wait_for_shutdown() await ipc_server.wait_for_shutdown()
# cancel all rpc tasks permanently # cancel all rpc tasks permanently
if self._service_tn: if self._service_n:
self._service_tn.cancel_scope.cancel() self._service_n.cancel_scope.cancel()
log_meth(msg) log_meth(msg)
self._cancel_complete.set() self._cancel_complete.set()
@ -1305,7 +1258,7 @@ class Actor:
''' '''
Cancel all ongoing RPC tasks owned/spawned for a given Cancel all ongoing RPC tasks owned/spawned for a given
`parent_chan: Channel` or simply all tasks (inside `parent_chan: Channel` or simply all tasks (inside
`._service_tn`) when `parent_chan=None`. `._service_n`) when `parent_chan=None`.
''' '''
tasks: dict = self._rpc_tasks tasks: dict = self._rpc_tasks
@ -1517,55 +1470,46 @@ async def async_main(
accept_addrs.append(addr.unwrap()) accept_addrs.append(addr.unwrap())
assert accept_addrs assert accept_addrs
# The "root" nursery ensures the channel with the immediate
ya_root_tn: bool = bool(actor._root_tn) # parent is kept alive as a resilient service until
ya_service_tn: bool = bool(actor._service_tn) # cancellation steps have (mostly) occurred in
# a deterministic way.
# NOTE, a top-most "root" nursery in each actor-process
# enables a lifetime priority for the IPC-channel connection
# with a sub-actor's immediate parent. I.e. this connection
# is kept alive as a resilient service connection until all
# other machinery has exited, cancellation of all
# embedded/child scopes have completed. This helps ensure
# a deterministic (and thus "graceful")
# first-class-supervision style teardown where a parent actor
# (vs. say peers) is always the last to be contacted before
# disconnect.
root_tn: trio.Nursery root_tn: trio.Nursery
async with ( async with (
collapse_eg(), collapse_eg(),
maybe_open_nursery( trio.open_nursery() as root_tn,
nursery=actor._root_tn,
) as root_tn,
): ):
if ya_root_tn: actor._root_n = root_tn
assert root_tn is actor._root_tn assert actor._root_n
else:
actor._root_tn = root_tn
ipc_server: _server.IPCServer ipc_server: _server.IPCServer
async with ( async with (
collapse_eg(), collapse_eg(),
maybe_open_nursery( trio.open_nursery() as service_nursery,
nursery=actor._service_tn,
) as service_tn,
_server.open_ipc_server( _server.open_ipc_server(
parent_tn=service_tn, # ?TODO, why can't this be the root-tn parent_tn=service_nursery,
stream_handler_tn=service_tn, stream_handler_tn=service_nursery,
) as ipc_server, ) as ipc_server,
# ) as actor._ipc_server,
# ^TODO? prettier?
): ):
if ya_service_tn: # This nursery is used to handle all inbound
assert service_tn is actor._service_tn # connections to us such that if the TCP server
else: # is killed, connections can continue to process
# This nursery is used to handle all inbound # in the background until this nursery is cancelled.
# connections to us such that if the TCP server actor._service_n = service_nursery
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_tn = service_tn
# set after allocate
actor._ipc_server = ipc_server actor._ipc_server = ipc_server
assert (
actor._service_n
and (
actor._service_n
is
actor._ipc_server._parent_tn
is
ipc_server._stream_handler_tn
)
)
# load exposed/allowed RPC modules # load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent # XXX: do this **after** establishing a channel to the parent
@ -1591,11 +1535,10 @@ async def async_main(
# - root actor: the ``accept_addr`` passed to this method # - root actor: the ``accept_addr`` passed to this method
# TODO: why is this not with the root nursery? # TODO: why is this not with the root nursery?
# - see above that the `._service_tn` is what's used?
try: try:
eps: list = await ipc_server.listen_on( eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs, accept_addrs=accept_addrs,
stream_handler_nursery=service_tn, stream_handler_nursery=service_nursery,
) )
log.runtime( log.runtime(
f'Booted IPC server\n' f'Booted IPC server\n'
@ -1603,7 +1546,7 @@ async def async_main(
) )
assert ( assert (
(eps[0].listen_tn) (eps[0].listen_tn)
is not service_tn is not service_nursery
) )
except OSError as oserr: except OSError as oserr:
@ -1765,7 +1708,7 @@ async def async_main(
# XXX TODO but hard XXX # XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the # we can't actually do this bc the debugger uses the
# _service_tn to spawn the lock task, BUT, in theory if we had # _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be # the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way # actually possible to debug THIS machinery in the same way
# as user task code? # as user task code?
@ -1817,7 +1760,9 @@ async def async_main(
f' {pformat(ipc_server._peers)}' f' {pformat(ipc_server._peers)}'
) )
log.runtime(teardown_report) log.runtime(teardown_report)
await ipc_server.wait_for_no_more_peers() await ipc_server.wait_for_no_more_peers(
shield=True,
)
teardown_report += ( teardown_report += (
'-]> all peer channels are complete.\n' '-]> all peer channels are complete.\n'

View File

@ -236,6 +236,10 @@ async def hard_kill(
# whilst also hacking on it XD # whilst also hacking on it XD
# terminate_after: int = 99999, # terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None: ) -> None:
''' '''
Un-gracefully terminate an OS level `trio.Process` after timeout. Un-gracefully terminate an OS level `trio.Process` after timeout.
@ -297,23 +301,6 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the # zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort. # removal swad as the last resort.
if cs.cancelled_caught: if cs.cancelled_caught:
# TODO? attempt at intermediary-rent-sub
# with child in debug lock?
# |_https://github.com/goodboy/tractor/issues/320
#
# if not is_root_process():
# log.warning(
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
# )
# with trio.CancelScope(shield=True):
# async with debug.acquire_debug_lock(
# subactor_uid=current_actor().uid,
# ) as _ctx:
# log.warning(
# 'Acquired debug lock, child ready to be killed ??\n'
# )
# TODO: toss in the skynet-logo face as ascii art? # TODO: toss in the skynet-logo face as ascii art?
log.critical( log.critical(
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'

View File

@ -117,6 +117,7 @@ class ActorNursery:
] ]
] = {} ] = {}
self.cancelled: bool = False
self._join_procs = trio.Event() self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False self._at_least_one_child_in_debug: bool = False
self.errors = errors self.errors = errors
@ -134,53 +135,10 @@ class ActorNursery:
# TODO: remove the `.run_in_actor()` API and thus this 2ndary # TODO: remove the `.run_in_actor()` API and thus this 2ndary
# nursery when that API get's moved outside this primitive! # nursery when that API get's moved outside this primitive!
self._ria_nursery = ria_nursery self._ria_nursery = ria_nursery
# TODO, factor this into a .hilevel api!
#
# portals spawned with ``run_in_actor()`` are # portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives # cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set() self._cancel_after_result_on_exit: set = set()
# trio.Nursery-like cancel (request) statuses
self._cancelled_caught: bool = False
self._cancel_called: bool = False
@property
def cancel_called(self) -> bool:
'''
Records whether cancellation has been requested for this
actor-nursery by a call to `.cancel()` either due to,
- an explicit call by some actor-local-task,
- an implicit call due to an error/cancel emited inside
the `tractor.open_nursery()` block.
'''
return self._cancel_called
@property
def cancelled_caught(self) -> bool:
'''
Set when this nursery was able to cance all spawned subactors
gracefully via an (implicit) call to `.cancel()`.
'''
return self._cancelled_caught
# TODO! remove internal/test-suite usage!
@property
def cancelled(self) -> bool:
warnings.warn(
"`ActorNursery.cancelled` is now deprecated, use "
" `.cancel_called` instead.",
DeprecationWarning,
stacklevel=2,
)
return (
self._cancel_called
# and
# self._cancelled_caught
)
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
@ -358,7 +316,7 @@ class ActorNursery:
''' '''
__runtimeframe__: int = 1 # noqa __runtimeframe__: int = 1 # noqa
self._cancel_called = True self.cancelled = True
# TODO: impl a repr for spawn more compact # TODO: impl a repr for spawn more compact
# then `._children`.. # then `._children`..
@ -436,8 +394,6 @@ class ActorNursery:
) in children.values(): ) in children.values():
log.warning(f"Hard killing process {proc}") log.warning(f"Hard killing process {proc}")
proc.terminate() proc.terminate()
else:
self._cancelled_caught
# mark ourselves as having (tried to have) cancelled all subactors # mark ourselves as having (tried to have) cancelled all subactors
self._join_procs.set() self._join_procs.set()
@ -643,9 +599,8 @@ _shutdown_msg: str = (
'Actor-runtime-shutdown' 'Actor-runtime-shutdown'
) )
@acm
# @api_frame # @api_frame
@acm
async def open_nursery( async def open_nursery(
*, # named params only! *, # named params only!
hide_tb: bool = True, hide_tb: bool = True,

View File

@ -237,9 +237,9 @@ def enable_stack_on_sig(
try: try:
import stackscope import stackscope
except ImportError: except ImportError:
log.warning( log.error(
'The `stackscope` lib is not installed!\n' '`stackscope` not installed for use in debug mode!\n'
'`Ignoring enable_stack_on_sig() call!\n' '`Ignoring {enable_stack_on_sig!r} call!\n'
) )
return None return None

View File

@ -148,61 +148,59 @@ def _post_mortem(
repl_fixture=repl_fixture, repl_fixture=repl_fixture,
boxed_maybe_exc=boxed_maybe_exc, boxed_maybe_exc=boxed_maybe_exc,
) )
if not enter_repl:
return
try: try:
if not enter_repl: actor: Actor = current_actor()
# XXX, trigger `.release()` below immediately! actor_repr: str = str(actor.uid)
return # ^TODO, instead a nice runtime-info + maddr + uid?
try: # -[ ] impl a `Actor.__repr()__`??
actor: Actor = current_actor() # |_ <task>:<thread> @ <actor>
actor_repr: str = str(actor.uid)
# ^TODO, instead a nice runtime-info + maddr + uid?
# -[ ] impl a `Actor.__repr()__`??
# |_ <task>:<thread> @ <actor>
except NoRuntime: except NoRuntime:
actor_repr: str = '<no-actor-runtime?>' actor_repr: str = '<no-actor-runtime?>'
try: try:
task_repr: Task = trio.lowlevel.current_task() task_repr: Task = trio.lowlevel.current_task()
except RuntimeError: except RuntimeError:
task_repr: str = '<unknown-Task>' task_repr: str = '<unknown-Task>'
# TODO: print the actor supervion tree up to the root # TODO: print the actor supervion tree up to the root
# here! Bo # here! Bo
log.pdb( log.pdb(
f'{_crash_msg}\n' f'{_crash_msg}\n'
f'x>(\n' f'x>(\n'
f' |_ {task_repr} @ {actor_repr}\n' f' |_ {task_repr} @ {actor_repr}\n'
) )
# XXX NOTE(s) on `pdbp.xpm()` version.. # XXX NOTE(s) on `pdbp.xpm()` version..
# #
# - seems to lose the up-stack tb-info? # - seems to lose the up-stack tb-info?
# - currently we're (only) replacing this from `pdbp.xpm()` # - currently we're (only) replacing this from `pdbp.xpm()`
# to add the `end=''` to the print XD # to add the `end=''` to the print XD
# #
print(traceback.format_exc(), end='') print(traceback.format_exc(), end='')
caller_frame: FrameType = api_frame.f_back caller_frame: FrameType = api_frame.f_back
# NOTE, see the impl details of these in the lib to # NOTE, see the impl details of these in the lib to
# understand usage: # understand usage:
# - `pdbp.post_mortem()` # - `pdbp.post_mortem()`
# - `pdbp.xps()` # - `pdbp.xps()`
# - `bdb.interaction()` # - `bdb.interaction()`
repl.reset() repl.reset()
repl.interaction( repl.interaction(
frame=caller_frame, frame=caller_frame,
# frame=None, # frame=None,
traceback=tb, traceback=tb,
) )
finally:
# XXX NOTE XXX: this is abs required to avoid hangs! # XXX NOTE XXX: this is abs required to avoid hangs!
# #
# Since we presume the post-mortem was enaged to # Since we presume the post-mortem was enaged to
# a task-ending error, we MUST release the local REPL request # a task-ending error, we MUST release the local REPL request
# so that not other local task nor the root remains blocked! # so that not other local task nor the root remains blocked!
DebugStatus.release() DebugStatus.release()
async def post_mortem( async def post_mortem(
@ -250,7 +248,7 @@ async def _maybe_enter_pm(
*, *,
tb: TracebackType|None = None, tb: TracebackType|None = None,
api_frame: FrameType|None = None, api_frame: FrameType|None = None,
hide_tb: bool = True, hide_tb: bool = False,
# only enter debugger REPL when returns `True` # only enter debugger REPL when returns `True`
debug_filter: Callable[ debug_filter: Callable[

View File

@ -58,7 +58,6 @@ from tractor._context import Context
from tractor import _state from tractor import _state
from tractor._exceptions import ( from tractor._exceptions import (
NoRuntime, NoRuntime,
InternalError,
) )
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
@ -80,9 +79,6 @@ from ._sigint import (
sigint_shield as sigint_shield, sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header _ctlc_ignore_header as _ctlc_ignore_header
) )
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
@ -481,12 +477,12 @@ async def _pause(
# we have to figure out how to avoid having the service nursery # we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below: # cancel on this task start? I *think* this works below:
# ```python # ```python
# actor._service_tn.cancel_scope.shield = shield # actor._service_n.cancel_scope.shield = shield
# ``` # ```
# but not entirely sure if that's a sane way to implement it? # but not entirely sure if that's a sane way to implement it?
# NOTE currently we spawn the lock request task inside this # NOTE currently we spawn the lock request task inside this
# subactor's global `Actor._service_tn` so that the # subactor's global `Actor._service_n` so that the
# lifetime of the lock-request can outlive the current # lifetime of the lock-request can outlive the current
# `._pause()` scope while the user steps through their # `._pause()` scope while the user steps through their
# application code and when they finally exit the # application code and when they finally exit the
@ -510,7 +506,7 @@ async def _pause(
f'|_{task}\n' f'|_{task}\n'
) )
with trio.CancelScope(shield=shield): with trio.CancelScope(shield=shield):
req_ctx: Context = await actor._service_tn.start( req_ctx: Context = await actor._service_n.start(
partial( partial(
request_root_stdio_lock, request_root_stdio_lock,
actor_uid=actor.uid, actor_uid=actor.uid,
@ -544,7 +540,7 @@ async def _pause(
_repl_fail_report = None _repl_fail_report = None
# when the actor is mid-runtime cancellation the # when the actor is mid-runtime cancellation the
# `Actor._service_tn` might get closed before we can spawn # `Actor._service_n` might get closed before we can spawn
# the request task, so just ignore expected RTE. # the request task, so just ignore expected RTE.
elif ( elif (
isinstance(pause_err, RuntimeError) isinstance(pause_err, RuntimeError)
@ -989,7 +985,7 @@ def pause_from_sync(
# that output and assign the `repl` created above! # that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run( bg_task, _ = trio.from_thread.run(
afn=partial( afn=partial(
actor._service_tn.start, actor._service_n.start,
partial( partial(
_pause_from_bg_root_thread, _pause_from_bg_root_thread,
behalf_of_thread=thread, behalf_of_thread=thread,
@ -1157,10 +1153,9 @@ def pause_from_sync(
'use_greenback', 'use_greenback',
False, False,
): ):
raise InternalError( raise RuntimeError(
f'`greenback` was never initialized in this actor?\n' '`greenback` was never initialized in this actor!?\n\n'
f'\n' f'{_state._runtime_vars}\n'
f'{ppfmt(_state._runtime_vars)}\n'
) from rte ) from rte
raise raise

View File

@ -846,9 +846,9 @@ class DebugStatus:
''' '''
if not ( if not (
(rt_repl_fixture := _state._runtime_vars.get('repl_fixture'))
or
repl_fixture repl_fixture
or
(rt_repl_fixture := _state._runtime_vars.get('repl_fixture'))
): ):
return True # YES always enter return True # YES always enter

View File

@ -101,27 +101,11 @@ class Channel:
# ^XXX! ONLY set if a remote actor sends an `Error`-msg # ^XXX! ONLY set if a remote actor sends an `Error`-msg
self._closed: bool = False self._closed: bool = False
# flag set by `Portal.cancel_actor()` indicating remote # flag set by ``Portal.cancel_actor()`` indicating remote
# (possibly peer) cancellation of the far end actor runtime. # (possibly peer) cancellation of the far end actor
# runtime.
self._cancel_called: bool = False self._cancel_called: bool = False
@property
def closed(self) -> bool:
'''
Was `.aclose()` successfully called?
'''
return self._closed
@property
def cancel_called(self) -> bool:
'''
Set when `Portal.cancel_actor()` is called on a portal which
wraps this IPC channel.
'''
return self._cancel_called
@property @property
def uid(self) -> tuple[str, str]: def uid(self) -> tuple[str, str]:
''' '''
@ -185,9 +169,7 @@ class Channel:
addr, addr,
**kwargs, **kwargs,
) )
# XXX, for UDS *no!* since we recv the peer-pid and build out assert transport.raddr == addr
# a new addr..
# assert transport.raddr == addr
chan = Channel(transport=transport) chan = Channel(transport=transport)
# ?TODO, compact this into adapter level-methods? # ?TODO, compact this into adapter level-methods?
@ -303,7 +285,7 @@ class Channel:
self, self,
payload: Any, payload: Any,
hide_tb: bool = False, hide_tb: bool = True,
) -> None: ) -> None:
''' '''

View File

@ -17,59 +17,36 @@
Utils to tame mp non-SC madeness Utils to tame mp non-SC madeness
''' '''
import platform
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
# a flag,
# - [ ] soo if it works like this, drop this module entirely for
# 3.13+ B)
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
#
def disable_mantracker(): def disable_mantracker():
''' '''
Disable all `multiprocessing` "resource tracking" machinery since Disable all `multiprocessing` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness. it's an absolute multi-threaded mess of non-SC madness.
''' '''
from multiprocessing.shared_memory import SharedMemory from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
# 3.13+ only.. can pass `track=False` to disable def unregister(self, name, rtype):
# all the resource tracker bs. pass
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
if (_py_313 := (
platform.python_version_tuple()[:-1]
>=
('3', '13')
)
):
from functools import partial
return partial(
SharedMemory,
track=False,
)
# !TODO, once we drop 3.12- we can obvi remove all this! def ensure_running(self):
else: pass
from multiprocessing import (
resource_tracker as mantracker,
)
# Tell the "resource tracker" thing to fuck off. # "know your land and know your prey"
class ManTracker(mantracker.ResourceTracker): # https://www.dailymotion.com/video/x6ozzco
def register(self, name, rtype): mantracker._resource_tracker = ManTracker()
pass mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
def unregister(self, name, rtype): mantracker.unregister = mantracker._resource_tracker.unregister
pass mantracker.getfd = mantracker._resource_tracker.getfd
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
# use std type verbatim
shmT = SharedMemory
return shmT

View File

@ -814,14 +814,10 @@ class Server(Struct):
async def wait_for_no_more_peers( async def wait_for_no_more_peers(
self, self,
# XXX, should this even be allowed? shield: bool = False,
# -> i've seen it cause hangs on teardown
# in `test_resource_cache.py`
# _shield: bool = False,
) -> None: ) -> None:
await self._no_more_peers.wait() with trio.CancelScope(shield=shield):
# with trio.CancelScope(shield=_shield): await self._no_more_peers.wait()
# await self._no_more_peers.wait()
async def wait_for_peer( async def wait_for_peer(
self, self,
@ -1001,11 +997,7 @@ class Server(Struct):
partial( partial(
_serve_ipc_eps, _serve_ipc_eps,
server=self, server=self,
stream_handler_tn=( stream_handler_tn=stream_handler_nursery,
stream_handler_nursery
or
self._stream_handler_tn
),
listen_addrs=accept_addrs, listen_addrs=accept_addrs,
) )
) )
@ -1149,17 +1141,13 @@ async def open_ipc_server(
async with maybe_open_nursery( async with maybe_open_nursery(
nursery=parent_tn, nursery=parent_tn,
) as parent_tn: ) as rent_tn:
no_more_peers = trio.Event() no_more_peers = trio.Event()
no_more_peers.set() no_more_peers.set()
ipc_server = IPCServer( ipc_server = IPCServer(
_parent_tn=parent_tn, _parent_tn=rent_tn,
_stream_handler_tn=( _stream_handler_tn=stream_handler_tn or rent_tn,
stream_handler_tn
or
parent_tn
),
_no_more_peers=no_more_peers, _no_more_peers=no_more_peers,
) )
try: try:

View File

@ -23,15 +23,14 @@ considered optional within the context of this runtime-library.
""" """
from __future__ import annotations from __future__ import annotations
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
# SharedMemory,
ShareableList,
)
import platform
from sys import byteorder from sys import byteorder
import time import time
from typing import Optional from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import ( from msgspec import (
Struct, Struct,
@ -62,7 +61,7 @@ except ImportError:
log = get_logger(__name__) log = get_logger(__name__)
SharedMemory = disable_mantracker() disable_mantracker()
class SharedInt: class SharedInt:
@ -798,15 +797,8 @@ def open_shm_list(
# "close" attached shm on actor teardown # "close" attached shm on actor teardown
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close) actor.lifetime_stack.callback(shml.shm.close)
actor.lifetime_stack.callback(shml.shm.unlink)
# XXX on 3.13+ we don't need to call this?
# -> bc we pass `track=False` for `SharedMemeory` orr?
if (
platform.python_version_tuple()[:-1] < ('3', '13')
):
actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError: except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps') log.warning('tractor runtime not active, skipping teardown steps')

View File

@ -430,25 +430,20 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError, ) as bre:
) as _re: trans_err = bre
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
match trans_err: match trans_err:
# XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
case trio.BrokenResourceError() if ( case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' '[Errno 32] Broken pipe' in trans_err.args[0]
in # ^XXX, specifc to UDS transport and its,
trans_err.args[0] # well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
): ):
tpt_closed = TransportClosed.from_src_exc( raise TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
@ -456,31 +451,14 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) ) from bre
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
# unless the disconnect condition falls under "a # unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn # normal operation breakage" we usualy console warn
# about it. # about it.
case _: case _:
log.exception( log.exception(
f'{tpt_name} layer failed pre-send ??\n' '{tpt_name} layer failed pre-send ??\n'
) )
raise trans_err raise trans_err
@ -525,7 +503,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str: def pformat(self) -> str:
return ( return (
f'<{type(self).__name__}(\n' f'<{type(self).__name__}(\n'
f' |_peers: 1\n' f' |_peers: 2\n'
f' laddr: {self._laddr}\n' f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n' f' raddr: {self._raddr}\n'
# f'\n' # f'\n'

View File

@ -18,9 +18,6 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
from pathlib import Path from pathlib import Path
import os import os
from socket import ( from socket import (
@ -32,7 +29,6 @@ from socket import (
) )
import struct import struct
from typing import ( from typing import (
Type,
TYPE_CHECKING, TYPE_CHECKING,
ClassVar, ClassVar,
) )
@ -103,6 +99,8 @@ class UDSAddress(
self.filedir self.filedir
or or
self.def_bindspace self.def_bindspace
# or
# get_rt_dir()
) )
@property @property
@ -207,35 +205,12 @@ class UDSAddress(
f']' f']'
) )
@cm
def _reraise_as_connerr(
src_excs: tuple[Type[Exception]],
addr: UDSAddress,
):
try:
yield
except src_excs as src_exc:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
f'\n'
f'from src: {src_exc!r}\n'
) from src_exc
async def start_listener( async def start_listener(
addr: UDSAddress, addr: UDSAddress,
**kwargs, **kwargs,
) -> SocketListener: ) -> SocketListener:
''' # sock = addr._sock = socket.socket(
Start listening for inbound connections via
a `trio.SocketListener` (task) which `socket.bind()`s on `addr`.
Note, if the `UDSAddress.bindspace: Path` directory dne it is
implicitly created.
'''
sock = socket.socket( sock = socket.socket(
socket.AF_UNIX, socket.AF_UNIX,
socket.SOCK_STREAM socket.SOCK_STREAM
@ -246,25 +221,17 @@ async def start_listener(
f'|_{addr}\n' f'|_{addr}\n'
) )
# ?TODO? should we use the `actor.lifetime_stack`
# to rm on shutdown?
bindpath: Path = addr.sockpath bindpath: Path = addr.sockpath
if not (bs := addr.bindspace).is_dir(): try:
log.info(
'Creating bindspace dir in file-sys\n'
f'>{{\n'
f'|_{bs!r}\n'
)
bs.mkdir()
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
OSError,
),
addr=addr
):
await sock.bind(str(bindpath)) await sock.bind(str(bindpath))
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
) from fdne
sock.listen(1) sock.listen(1)
log.info( log.info(
@ -389,30 +356,27 @@ class MsgpackUDSStream(MsgpackTransport):
# `.setsockopt()` call tells the OS provide it; the client # `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via # pid can then be read on server/listen() side via
# `get_peer_info()` above. # `get_peer_info()` above.
try:
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
),
addr=addr
):
stream = await open_unix_socket_w_passcred( stream = await open_unix_socket_w_passcred(
str(sockpath), str(sockpath),
**kwargs **kwargs
) )
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
tpt_stream = MsgpackUDSStream( stream = MsgpackUDSStream(
stream, stream,
prefix_size=prefix_size, prefix_size=prefix_size,
codec=codec codec=codec
) )
# XXX assign from new addrs after peer-PID extract! stream._raddr = addr
( return stream
tpt_stream._laddr,
tpt_stream._raddr,
) = cls.get_stream_addrs(stream)
return tpt_stream
@classmethod @classmethod
def get_stream_addrs( def get_stream_addrs(

View File

@ -130,7 +130,6 @@ class LinkedTaskChannel(
_trio_task: trio.Task _trio_task: trio.Task
_aio_task_complete: trio.Event _aio_task_complete: trio.Event
_closed_by_aio_task: bool = False
_suppress_graceful_exits: bool = True _suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None _trio_err: BaseException|None = None
@ -209,15 +208,10 @@ class LinkedTaskChannel(
async def aclose(self) -> None: async def aclose(self) -> None:
await self._from_aio.aclose() await self._from_aio.aclose()
# ?TODO? async version of this? def started(
def started_nowait(
self, self,
val: Any = None, val: Any = None,
) -> None: ) -> None:
'''
Synchronize aio-side with its trio-parent.
'''
self._aio_started_val = val self._aio_started_val = val
return self._to_trio.send_nowait(val) return self._to_trio.send_nowait(val)
@ -248,7 +242,6 @@ class LinkedTaskChannel(
# cycle on the trio side? # cycle on the trio side?
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
return await self._from_aio.receive() return await self._from_aio.receive()
except BaseException as err: except BaseException as err:
async with translate_aio_errors( async with translate_aio_errors(
chan=self, chan=self,
@ -326,7 +319,7 @@ def _run_asyncio_task(
qsize: int = 1, qsize: int = 1,
provide_channels: bool = False, provide_channels: bool = False,
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
hide_tb: bool = True, hide_tb: bool = False,
**kwargs, **kwargs,
) -> LinkedTaskChannel: ) -> LinkedTaskChannel:
@ -354,6 +347,18 @@ def _run_asyncio_task(
# value otherwise it would just return ;P # value otherwise it would just return ;P
assert qsize > 1 assert qsize > 1
if provide_channels:
assert 'to_trio' in args
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
coro = func(**kwargs)
trio_task: trio.Task = trio.lowlevel.current_task() trio_task: trio.Task = trio.lowlevel.current_task()
trio_cs = trio.CancelScope() trio_cs = trio.CancelScope()
aio_task_complete = trio.Event() aio_task_complete = trio.Event()
@ -368,25 +373,6 @@ def _run_asyncio_task(
_suppress_graceful_exits=suppress_graceful_exits, _suppress_graceful_exits=suppress_graceful_exits,
) )
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
if 'chan' in args:
kwargs['chan'] = chan
if provide_channels:
assert (
'to_trio' in args
or
'chan' in args
)
coro = func(**kwargs)
async def wait_on_coro_final_result( async def wait_on_coro_final_result(
to_trio: trio.MemorySendChannel, to_trio: trio.MemorySendChannel,
coro: Awaitable, coro: Awaitable,
@ -459,23 +445,9 @@ def _run_asyncio_task(
f'Task exited with final result: {result!r}\n' f'Task exited with final result: {result!r}\n'
) )
# XXX ALWAYS close the child-`asyncio`-task-side's # only close the sender side which will relay
# `to_trio` handle which will in turn relay # a `trio.EndOfChannel` to the trio (consumer) side.
# a `trio.EndOfChannel` to the `trio`-parent.
# Consequently the parent `trio` task MUST ALWAYS
# check for any `chan._aio_err` to be raised when it
# receives an EoC.
#
# NOTE, there are 2 EoC cases,
# - normal/graceful EoC due to the aio-side actually
# terminating its "streaming", but the task did not
# error and is not yet complete.
#
# - the aio-task terminated and we specially mark the
# closure as due to the `asyncio.Task`'s exit.
#
to_trio.close() to_trio.close()
chan._closed_by_aio_task = True
aio_task_complete.set() aio_task_complete.set()
log.runtime( log.runtime(
@ -673,9 +645,8 @@ def _run_asyncio_task(
not trio_cs.cancel_called not trio_cs.cancel_called
): ):
log.cancel( log.cancel(
f'Cancelling trio-side due to aio-side src exc\n' f'Cancelling `trio` side due to aio-side src exc\n'
f'\n' f'{curr_aio_err}\n'
f'{curr_aio_err!r}\n'
f'\n' f'\n'
f'(c>\n' f'(c>\n'
f' |_{trio_task}\n' f' |_{trio_task}\n'
@ -787,7 +758,6 @@ async def translate_aio_errors(
aio_done_before_trio: bool = aio_task.done() aio_done_before_trio: bool = aio_task.done()
assert aio_task assert aio_task
trio_err: BaseException|None = None trio_err: BaseException|None = None
eoc: trio.EndOfChannel|None = None
try: try:
yield # back to one of the cross-loop apis yield # back to one of the cross-loop apis
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
@ -819,48 +789,12 @@ async def translate_aio_errors(
# ) # )
# raise # raise
# XXX EoC is a special SIGNAL from the aio-side here! # XXX always passthrough EoC since this translator is often
# There are 2 cases to handle: # called from `LinkedTaskChannel.receive()` which we want
# 1. the "EoC passthrough" case. # passthrough and further we have no special meaning for it in
# - the aio-task actually closed the channel "gracefully" and # terms of relaying errors or signals from the aio side!
# the trio-task should unwind any ongoing channel except trio.EndOfChannel as eoc:
# iteration/receiving, trio_err = chan._trio_err = eoc
# |_this exc-translator wraps calls to `LinkedTaskChannel.receive()`
# in which case we want to relay the actual "end-of-chan" for
# iteration purposes.
#
# 2. relaying the "asyncio.Task termination" case.
# - if the aio-task terminates, maybe with an error, AND the
# `open_channel_from()` API was used, it will always signal
# that termination.
# |_`wait_on_coro_final_result()` always calls
# `to_trio.close()` when `provide_channels=True` so we need to
# always check if there is an aio-side exc which needs to be
# relayed to the parent trio side!
# |_in this case the special `chan._closed_by_aio_task` is
# ALWAYS set.
#
except trio.EndOfChannel as _eoc:
eoc = _eoc
if (
chan._closed_by_aio_task
and
aio_err
):
log.cancel(
f'The asyncio-child task terminated due to error\n'
f'{aio_err!r}\n'
)
chan._trio_to_raise = aio_err
trio_err = chan._trio_err = eoc
#
# ?TODO?, raise something like a,
# chan._trio_to_raise = AsyncioErrored()
# BUT, with the tb rewritten to reflect the underlying
# call stack?
else:
trio_err = chan._trio_err = eoc
raise eoc raise eoc
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
@ -1113,7 +1047,7 @@ async def translate_aio_errors(
# #
if wait_on_aio_task: if wait_on_aio_task:
await chan._aio_task_complete.wait() await chan._aio_task_complete.wait()
log.debug( log.info(
'asyncio-task is done and unblocked trio-side!\n' 'asyncio-task is done and unblocked trio-side!\n'
) )
@ -1130,17 +1064,11 @@ async def translate_aio_errors(
trio_to_raise: ( trio_to_raise: (
AsyncioCancelled| AsyncioCancelled|
AsyncioTaskExited| AsyncioTaskExited|
Exception| # relayed from aio-task
None None
) = chan._trio_to_raise ) = chan._trio_to_raise
raise_from: Exception = (
trio_err if (aio_err is trio_to_raise)
else aio_err
)
if not suppress_graceful_exits: if not suppress_graceful_exits:
raise trio_to_raise from raise_from raise trio_to_raise from (aio_err or trio_err)
if trio_to_raise: if trio_to_raise:
match ( match (
@ -1173,7 +1101,7 @@ async def translate_aio_errors(
) )
return return
case _: case _:
raise trio_to_raise from raise_from raise trio_to_raise from (aio_err or trio_err)
# Check if the asyncio-side is the cause of the trio-side # Check if the asyncio-side is the cause of the trio-side
# error. # error.
@ -1239,6 +1167,7 @@ async def run_task(
@acm @acm
async def open_channel_from( async def open_channel_from(
target: Callable[..., Any], target: Callable[..., Any],
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
**target_kwargs, **target_kwargs,
@ -1272,6 +1201,7 @@ async def open_channel_from(
# deliver stream handle upward # deliver stream handle upward
yield first, chan yield first, chan
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
# await tractor.pause(shield=True) # ya it worx ;)
if cs.cancel_called: if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled): if isinstance(chan._trio_to_raise, AsyncioCancelled):
log.cancel( log.cancel(

View File

@ -31,7 +31,7 @@ from ._broadcast import (
) )
from ._beg import ( from ._beg import (
collapse_eg as collapse_eg, collapse_eg as collapse_eg,
get_collapsed_eg as get_collapsed_eg, maybe_collapse_eg as maybe_collapse_eg,
is_multi_cancelled as is_multi_cancelled, is_multi_cancelled as is_multi_cancelled,
) )
from ._taskc import ( from ._taskc import (

View File

@ -15,9 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
`BaseExceptionGroup` utils and helpers pertaining to `BaseExceptionGroup` related utils and helpers pertaining to
first-class-`trio` from a "historical" perspective, like "loose first-class-`trio` from a historical perspective B)
exception group" task-nurseries.
''' '''
from contextlib import ( from contextlib import (
@ -25,83 +24,28 @@ from contextlib import (
) )
from typing import ( from typing import (
Literal, Literal,
Type,
) )
import trio import trio
# from trio._core._concat_tb import (
# concat_tb,
# )
# XXX NOTE def maybe_collapse_eg(
# taken verbatim from `trio._core._run` except,
# - remove the NONSTRICT_EXCEPTIONGROUP_NOTE deprecation-note
# guard-check; we know we want an explicit collapse.
# - mask out tb rewriting in collapse case, i don't think it really
# matters?
#
def collapse_exception_group(
excgroup: BaseExceptionGroup[BaseException],
) -> BaseException:
"""Recursively collapse any single-exception groups into that single contained
exception.
"""
exceptions = list(excgroup.exceptions)
modified = False
for i, exc in enumerate(exceptions):
if isinstance(exc, BaseExceptionGroup):
new_exc = collapse_exception_group(exc)
if new_exc is not exc:
modified = True
exceptions[i] = new_exc
if (
len(exceptions) == 1
and isinstance(excgroup, BaseExceptionGroup)
# XXX trio's loose-setting condition..
# and NONSTRICT_EXCEPTIONGROUP_NOTE in getattr(excgroup, "__notes__", ())
):
# exceptions[0].__traceback__ = concat_tb(
# excgroup.__traceback__,
# exceptions[0].__traceback__,
# )
return exceptions[0]
elif modified:
return excgroup.derive(exceptions)
else:
return excgroup
def get_collapsed_eg(
beg: BaseExceptionGroup, beg: BaseExceptionGroup,
) -> BaseException|bool:
) -> BaseException|None:
''' '''
If the input beg can collapse to a single sub-exception which is If the input beg can collapse to a single non-eg sub-exception,
itself **not** an eg, return it. return it instead.
''' '''
maybe_exc = collapse_exception_group(beg) if len(excs := beg.exceptions) == 1:
if maybe_exc is beg: return excs[0]
return None
return maybe_exc return False
@acm @acm
async def collapse_eg( async def collapse_eg(
hide_tb: bool = True, hide_tb: bool = True,
# XXX, for ex. will always show begs containing single taskc
ignore: set[Type[BaseException]] = {
# trio.Cancelled,
},
add_notes: bool = True,
bp: bool = False,
): ):
''' '''
If `BaseExceptionGroup` raised in the body scope is If `BaseExceptionGroup` raised in the body scope is
@ -113,55 +57,16 @@ async def collapse_eg(
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
try: try:
yield yield
except BaseExceptionGroup as _beg: except* BaseException as beg:
beg = _beg
if ( if (
bp exc := maybe_collapse_eg(beg)
and
len(beg.exceptions) > 1
): ):
import tractor
if tractor.current_actor(
err_on_no_runtime=False,
):
await tractor.pause(shield=True)
else:
breakpoint()
if (
(exc := get_collapsed_eg(beg))
and
type(exc) not in ignore
):
# TODO? report number of nested groups it was collapsed
# *from*?
if add_notes:
from_group_note: str = (
'( ^^^ this exc was collapsed from a group ^^^ )\n'
)
if (
from_group_note
not in
getattr(exc, "__notes__", ())
):
exc.add_note(from_group_note)
# raise exc
# ^^ this will leave the orig beg tb above with the
# "during the handling of <beg> the following.."
# So, instead do..
#
if cause := exc.__cause__: if cause := exc.__cause__:
raise exc from cause raise exc from cause
else:
# suppress "during handling of <the beg>"
# output in tb/console.
raise exc from None
# keep original raise exc
raise # beg
raise beg
def is_multi_cancelled( def is_multi_cancelled(

View File

@ -40,10 +40,7 @@ from typing import (
import trio import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
# from ._beg import collapse_eg from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -109,9 +106,6 @@ async def _enter_and_wait(
async def gather_contexts( async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
tuple[ tuple[
T | None, T | None,
@ -154,45 +148,34 @@ async def gather_contexts(
'`.trionics.gather_contexts()` input mngrs is empty?\n' '`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n' '\n'
'Did try to use inline generator syntax?\n' 'Did try to use inline generator syntax?\n'
'Check that list({mngrs}) works!\n' 'Use a non-lazy iterator or sequence-type intead!\n'
# 'or sequence-type intead!\n'
# 'Use a non-lazy iterator or sequence-type intead!\n'
) )
try: async with (
async with ( collapse_eg(),
# trio.open_nursery() as tn,
# ?TODO, does including these (eg-collapsing, ):
# taskc-unmasking) improve tb noise-reduction/legibility? for mngr in mngrs:
# tn.start_soon(
# collapse_eg(), _enter_and_wait,
maybe_open_nursery( mngr,
nursery=tn, unwrapped,
) as tn, all_entered,
# maybe_raise_from_masking_exc(), parent_exit,
): seed,
for mngr in mngrs: )
tn.start_soon(
_enter_and_wait,
mngr,
unwrapped,
all_entered,
parent_exit,
seed,
)
# deliver control to caller once all ctx-managers have # deliver control once all managers have started up
# started (yielded back to us). await all_entered.wait()
await all_entered.wait()
try:
yield tuple(unwrapped.values()) yield tuple(unwrapped.values())
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set() parent_exit.set()
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set()
# Per actor task caching helpers. # Per actor task caching helpers.
# Further potential examples of interest: # Further potential examples of interest:
@ -204,7 +187,7 @@ class _Cache:
a kept-alive-while-in-use async resource. a kept-alive-while-in-use async resource.
''' '''
service_tn: Optional[trio.Nursery] = None service_n: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {} locks: dict[Hashable, trio.Lock] = {}
users: int = 0 users: int = 0
values: dict[Any, Any] = {} values: dict[Any, Any] = {}
@ -245,9 +228,6 @@ async def maybe_open_context(
kwargs: dict = {}, kwargs: dict = {},
key: Hashable | Callable[..., Hashable] = None, key: Hashable | Callable[..., Hashable] = None,
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncIterator[tuple[bool, T]]: ) -> AsyncIterator[tuple[bool, T]]:
''' '''
Maybe open an async-context-manager (acm) if there is not already Maybe open an async-context-manager (acm) if there is not already
@ -280,94 +260,40 @@ async def maybe_open_context(
# have it not be closed until all consumers have exited (which is # have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our # currently difficult to implement any other way besides using our
# pre-allocated runtime instance..) # pre-allocated runtime instance..)
if tn: service_n: trio.Nursery = current_actor()._service_n
# TODO, assert tn is eventual parent of this task!
task: trio.Task = trio.lowlevel.current_task()
task_tn: trio.Nursery = task.parent_nursery
if not tn._cancel_status.encloses(
task_tn._cancel_status
):
raise RuntimeError(
f'Mis-nesting of task under provided {tn} !?\n'
f'Current task is NOT a child(-ish)!!\n'
f'\n'
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_tn = tn
else:
service_tn: trio.Nursery = current_actor()._service_tn
# TODO: is there any way to allocate # TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery? # a 'stays-open-till-last-task-finshed nursery?
# service_tn: trio.Nursery # service_n: trio.Nursery
# async with maybe_open_nursery(_Cache.service_tn) as service_tn: # async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_tn = service_tn # _Cache.service_n = service_n
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
try: try:
# **critical section** that should prevent other tasks from # **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler # checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource. # may switch and by accident we create more then one resource.
yielded = _Cache.values[ctx_key] yielded = _Cache.values[ctx_key]
except KeyError as _ke: except KeyError:
# XXX, stay mutexed up to cache-miss yield log.debug(f'Allocating new {acm_func} for {ctx_key}')
try: mngr = acm_func(**kwargs)
cache_miss_ke = _ke resources = _Cache.resources
log.debug( assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
f'Allocating new @acm-func entry\n' resources[ctx_key] = (service_n, trio.Event())
f'ctx_key={ctx_key}\n'
f'acm_func={acm_func}\n'
)
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_tn, trio.Event())
yielded: Any = await service_tn.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
finally:
# XXX, since this runs from an `except` it's a checkpoint
# whih can be `trio.Cancelled`-masked.
#
# NOTE, in that case the mutex is never released by the
# (first and) caching task and **we can't** simply shield
# bc that will inf-block on the `await
# no_more_users.wait()`.
#
# SO just always unlock!
lock.release()
try: # sync up to the mngr's yielded value
yield ( yielded = await service_n.start(
False, # cache_hit = "no" _Cache.run_ctx,
yielded, mngr,
) ctx_key,
except trio.Cancelled as taskc: )
maybe_taskc = taskc _Cache.users += 1
log.cancel( lock.release()
f'Cancelled from cache-miss entry\n' yield False, yielded
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
# XXX, always unset ke from cancelled context
# since we never consider it a masked exc case!
# - bc this can be called directly ty `._rpc._invoke()`?
#
if maybe_taskc.__context__ is cache_miss_ke:
maybe_taskc.__context__ = None
raise taskc
else: else:
_Cache.users += 1 _Cache.users += 1
log.debug( log.runtime(
f'Re-using cached resource for user {_Cache.users}\n\n' f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n' f'{ctx_key!r} -> {type(yielded)}\n'
@ -377,19 +303,9 @@ async def maybe_open_context(
# f'{ctx_key!r} -> {yielded!r}\n' # f'{ctx_key!r} -> {yielded!r}\n'
) )
lock.release() lock.release()
yield ( yield True, yielded
True, # cache_hit = "yes"
yielded,
)
finally: finally:
if lock.locked():
stats: trio.LockStatistics = lock.statistics()
log.error(
f'Lock left locked by last owner !?\n'
f'{stats}\n'
)
_Cache.users -= 1 _Cache.users -= 1
if yielded is not None: if yielded is not None:

View File

@ -60,8 +60,8 @@ def find_masked_excs(
return None return None
# XXX, relevant discussion @ `trio`-core, # XXX, relevant ish discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455 # https://github.com/python-trio/trio/issues/455#issuecomment-2785122216
# #
@acm @acm
async def maybe_raise_from_masking_exc( async def maybe_raise_from_masking_exc(
@ -113,6 +113,7 @@ async def maybe_raise_from_masking_exc(
) )
matching: list[BaseException]|None = None matching: list[BaseException]|None = None
maybe_eg: ExceptionGroup|None maybe_eg: ExceptionGroup|None
maybe_eg: ExceptionGroup|None
if tn: if tn:
try: # handle egs try: # handle egs