Compare commits

...

43 Commits

Author SHA1 Message Date
Tyler Goodlet e271ebcb87 More prep-to-reduce the `Actor` method-iface
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
  primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 6cac1fe22b Add `.ipc._shm` todo-idea for `@actor_fixture` API 2025-07-14 13:15:40 -04:00
Tyler Goodlet 42056f4f53 Update buncha log msg fmting in `.msg._ops`
Mostly just multi-line code styling again: always putting standalone
`'f\n'` on separate LOC so it reads like it renders to console. Oh and
and a level drop to `.runtime()` for rx-msg reports.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 5790ee9254 Couple more `._root` logging tweaks.. 2025-07-14 13:15:40 -04:00
Tyler Goodlet 57e25411ee Update buncha log msg fmting in `._spawn`
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and
 converting to multi-line code style an ' for str-report-contents. Tweak
 some imports to sub-mod level as well.
2025-07-14 13:15:40 -04:00
Tyler Goodlet ee9fa2e91d Update buncha log msg fmting in `._portal`
Namely to use `Channel.aid.reprol()` and converting to our newer style
multi-line code style for str-reports.
2025-07-14 13:15:40 -04:00
Tyler Goodlet cb8cb67680 Use `._supervise._shutdown_msg` in tooling test 2025-07-14 13:15:40 -04:00
Tyler Goodlet 51e95ebc3e Use `nest_from_op()`/`pretty_struct` in `._rpc`
Again for nicer console logging. Also fix a double `req_chan` arg bug
when passed to `_invoke` in the `self.cancel()` rt-ep; don't update the
`kwargs: dict` just merge in `req_chan` input at call time.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 517ae67d0e Use `nest_from_op()` in actor-nursery shutdown
Including a new one-line `_shutdown_msg: str` which we mod-var-set for
testing usage and some denoising at `.info()` level. Adjust `Actor()`
instantiating input to the new `.registry_addrs` wrapped addrs property.
2025-07-14 13:15:40 -04:00
Tyler Goodlet acb09ab29c Use `Address` where possible in (root) actor boot
Namely inside various bootup-sequences in `._root` and `._runtime`
particularly in the root actor to support both better tpt-address
denoting in our logging and as part of clarifying logic around setting
the root's registry addresses which is soon to be much better factored
out of the core and into an explicit subsystem + API.

Some `_root.open_root_actor()` deats,
- set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be
  more explicit about wrapped addr types thoughout.
- instead ensure `registry_addrs` are the wrapped types and pass down
  into the root `Actor` singleton-instance.
- factor the root-actor check + rt-vars update (updating the `'_root_addrs'`)
  out of `._runtime.async_main()` into this fn.
- as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will
  be passed down both through rt-vars as `'_root_addrs'` and to
  `._runtim.async_main()` as `accept_addrs` (which is then passed to the
  IPC server).
- adjust/simplify much logging.
- shield the `await actor.cancel(None)  # self cancel` to avoid any
  finally-footguns.
- as mentioned convert the

For `_runtime.async_main()` tweaks,
- expect `registry_addrs: list[Address]|None = None` with appropriate
  unwrapping prior to setting both `.reg_addrs` and the equiv rt-var.
- add a new `.registry_addrs` prop for the wrapped form.
- convert a final loose-eg for the `service_nursery` to use
  `collapse_eg()`.
- simplify teardown report logging.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 5ffab73ced Add #TODO for `._context` to use `.msg.Aid` 2025-07-14 13:15:40 -04:00
Tyler Goodlet 473de28b67 Add todo for py3.13+ `.shared_memory`'s new `track=False` support.. finally they added it XD 2025-07-14 13:15:40 -04:00
Tyler Goodlet f3a5986db5 Even more `.ipc.*` repr refinements
Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.

Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
  `._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
  leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
  be the 0-port.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 21d64b91e3 More `.ipc.Channel`-repr related tweaks
- only generate a repr in `.from_addr()` when log level is >= 'runtime'.
 |_ add a todo about supporting this optimization more generally on our
   adapter.
- fix `Channel.pformat()` to show unknown peer field line fmt correctly.
- add a `Channel.maddr: str` which just delegates directly to the
  `._transport` like other pass-thru property fields.
2025-07-14 13:15:40 -04:00
Tyler Goodlet fbc4208439 Mk `Aid` hashable, use pretty-`.__repr__()`
Hash on the `.uuid: str` and delegate verbatim to
`msg.pretty_struct.Struct`'s equiv method.
2025-07-14 13:15:40 -04:00
Tyler Goodlet e6ff4561a7 .trionics: link in `finally`-footgun `trio` GH ish 2025-07-14 13:15:40 -04:00
Tyler Goodlet db7fd44751 .log: expose `at_least_level()` as `StackLevelAdapter` meth 2025-07-14 13:15:40 -04:00
Tyler Goodlet 16fed20856 Drop `actor_info: str` from `._entry` logs 2025-07-14 13:15:40 -04:00
Tyler Goodlet 4723809a32 Try `nest_from_op()` in some `._rpc` spots
To start trying out,
- using in the `Start`-msg handler-block to repr the msg coming
  *from* a `repr(Channel)` using '<=)` sclang op.
- for a completed RPC task in `_invoke_non_context()`.
- for the msg loop task's termination report.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 75cb17371d Hide more `Channel._transport` privates for repr
Such as the `MsgTransport.stream` and `.drain` attrs since they're
rarely that important at the chan level. Also start adopting
a `.<attr>=` style for actual attrs of the type versus a `<name>:
` style for meta-field info lines.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 7fc98d15b1 Refine `Actor` status iface, use `Aid` throughout
To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.

Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.

New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
  it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
  completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
  which likely will just be dropped since we already have
  `._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.

In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
  internals as well) use the `Aid.uid` remap property when assigning
  `Context._canceller`.
- adjust all log msg refs to match obvi.
2025-07-14 13:15:40 -04:00
Tyler Goodlet acb1f905dc Add flag to toggle private vars in `Channel.pformat()`
Call it `privates: bool` and only show certain internal instance vars
when set in the `repr()` output.
2025-07-14 13:15:40 -04:00
Tyler Goodlet acc7e26f17 Extend `.msg.types.Aid` method interface
Providing the legacy `.uid -> tuple` style id (since still used for the
`Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str`
for rendering a compact unique actor ID summary (useful in
logging/.pformat()s at the least).
2025-07-14 13:15:40 -04:00
Tyler Goodlet a9f3d8d9d5 Update `_runtime/_root` call-sigs to `nest_from_op()`
To match the new signature now already cherry-picked upstream.

The original commit was split after moving the original fn-params
changes into the upstream branch: `sclang_formatting`.

The orig commit was,

> ea53d03 Bah! just refine `devx.pformat.nest_from_op()`now

but was re-written to only contain the diff to calling code.
2025-07-14 13:15:40 -04:00
Tyler Goodlet 53ae1ba271 Enforce named-args only to `.open_nursery()` 2025-07-14 13:15:40 -04:00
Tyler Goodlet d54d4e605f Hide `._rpc._errors_relayed_via_ipc()` frame by def 2025-07-14 13:15:40 -04:00
Tyler Goodlet 953abc6b7b Facepalm, fix `raise from` in `collapse_eg()`
I dunno what exactly I was thinking but we definitely don't want to
**ever** raise from the original exc-group, instead always raise from
any original `.__cause__` to be consistent with the embedded src-error's
context.

Also, adjust `maybe_collapse_eg()` to return `False` in the non-single
`.exceptions` case, again don't know what I was trying to do but this
simplifies caller logic and the prior return-semantic had no real
value..

This fixes some final usage in the runtime (namely top level nursery
usage in `._root`/`._runtime`) which was previously causing test suite
failures prior to this fix.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 69965224f2 Just import `._runtime` ns in `._root`; be a bit more explicit 2025-07-14 13:15:33 -04:00
Tyler Goodlet 5ec20ffe68 Use collapse in `._root.open_root_actor()` too
Seems to add one more cancellation suite failure as well as now cause
the discovery test to error instead of fail?
2025-07-14 13:15:33 -04:00
Tyler Goodlet 6dc5f4c914 Use collapser around root tn in `.async_main()`
Seems to cause the following test suites to fail however..

- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'

Also tweak some ctxc request logging content.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 5f6240939f Drop msging-err patt from `subactor_breakpoint` ex
Since the `bdb` module was added to the namespace lookup set in
`._exceptions.get_err_type()` we can now relay a RAE-boxed
`bdb.BdbQuit`.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 135e9b40b1 Switch to strict-eg nurseries almost everywhere
That is just throughout the core library, not the tests yet. Again, we
simply change over to using our (nearly equivalent?)
`.trionics.collapse_eg()` in place of the already deprecated
`strict_exception_groups=False` flag in the following internals,
- the conc-fan-out tn use in `._discovery.find_actor()`.
- `._portal.open_portal()`'s internal tn used to spawn a bg rpc-msg-loop
  task.
- the daemon and "run-in-actor" layered tn pair allocated in
  `._supervise._open_and_supervise_one_cancels_all_nursery()`.

The remaining loose-eg usage in `._root` and `._runtime` seem to be
necessary to keep the test suite green?? For the moment these are left
out.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 0388eead6a Use collapser in rent side of `Context` 2025-07-14 13:15:33 -04:00
Tyler Goodlet 006ed72aea Flip to `collapse_eg()` use in `.trionics.gather_contexts()` 2025-07-14 13:15:33 -04:00
Tyler Goodlet 88b55d868f Always `Cancelled`-unmask ctx endpoint excs
To resolve the recently added and failing
`test_remote_exc_relay::test_unmasked_remote_exc`: never allow
`trio.Cancelled` to mask an underlying user-code exception, ever.

Our first real-world (runtime internal) use case for the new
`.trionics.maybe_raise_from_masking_exc()` such that the failing
test now passes with an properly relayed remote RTE unmasking B)

Details,
- flip the `Context._scope_nursery` to the default strict-eg behaviour
  and instead stack its outer scope with a `.trionics.collapse_eg()`.
- wrap the inner-most scope (after `msgops.maybe_limit_plds()`) with
  a `maybe_raise_from_masking_exc()` to ensure user-code errors are
  never masked by `trio.Cancelled`s.

Some err-reporting refinement,
- always capture any `scope_err` from the entire block for debug
  purposes; report it in the `finally` block's log.
- always capture any suppressed `maybe_re`, output from
  `ctx.maybe_raise()`, and `log.cancel()` report it.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 8b1094a8d3 Adjust ep-masking-suite for the real-use-case
Namely that the more common-and-pertinent case is when
a `@context`-ep-fn contains the `finally`-footgun but without
a surrounding embedded `tn` (which currently still requires its own
scope embedded `trionics.maybe_raise_from_masking_exc()`) which can't
be compensated-for by `._rpc._invoke()` easily. Instead the test is
composed where the `._invoke()`-internal `tn` is the machinery being
addressed in terms of masking user-code excs with `trio.Cancelled`.

Deats,
- rename the test -> `test_unmasked_remote_exc` to reflect what the
  runtime should actually be addressing/solving.
- drop the embedded `tn` from `sleep_n_chkpt_in_finally()` (for now)
  since that case can't currently easily be addressed without the user
  code using its own `trionics.maybe_raise_from_masking_exc()` inside
  the nursery scope.
- as such drop all `tn` related params/logic/usage from the ep.
- add in a `Cancelled` handler block which checks for RTE masking and
  always prints the occurrence loudly.

Follow up,
- obvi this suite will currently fail until the appropriate adjustment
  is made to `._rpc._invoke()` to do the unmasking; coming next.
- we probably still need a case with an embedded user `tn` where if
  the default strict-eg mode is used then a ctxc from the parent might
  cause a non-graceful `Context.cancel()` outcome?
 |_since the embedded user-`tn` will raise
   `ExceptionGroup[trio.Cancelled]` upward despite the parent nursery's
   scope being the canceller, or will a `collapse_eg()` inside the
   `._invoke()` scope handle this as well?
2025-07-14 13:15:33 -04:00
Tyler Goodlet eddbedb97d Extend `._taskc.maybe_raise_from_masking_exc()`
To handle captured non-egs (when the now optional `tn` isn't provided)
as well as yield up a `BoxedMaybeException` which contains any detected
and un-masked `exc_ctx` as its `.value`.

Also add some additional tooling,
- a `raise_unmasked: bool` toggle for when the caller just wants to
  report the masked exc and not raise-it-in-place of the masker.
- `extra_note: str` which by default is tuned to the default
  `unmask_from = (trio.Cancelled,)` but which can be used to deliver
  custom exception msg content.
- `always_warn_on: tuple[BaseException]` which will always emit
  a warning log of what would have been the raised-in-place-of
  `ctx_exc`'s msg for special cases where you want to report
  a masking case that might not be otherwise noticed by the runtime
  (cough like a `Cancelled` masking another `Cancelled) but which
  you'd still like to warn the caller about.
- factor out the masked-`ext_ctx` predicate logic into
  a `find_masked_excs()` and also use it for non-eg cases.

Still maybe todo?
- rewrapping multiple masked sub-excs in an eg back into an eg? left in
  #TODOs and a pause-point where applicable.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 9a587c6edb Mv `maybe_raise_from_masking_exc()` to `.trionics`
Factor the `@acm`-closure it out of the
`test_trioisms::test_acm_embedded_nursery_propagates_enter_err` suite
for real use internally.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 537b77d74e Add ctx-ep suite for `trio`'s *finally-footgun*
Deats are documented within, but basically a subtlety we already track
with `trio`'s masking of excs by a checkpoint-in-`finally` can cause
compounded issues with our `@context` endpoints, mostly in terms of
remote error and cancel-ack relay semantics.
2025-07-14 13:15:33 -04:00
Tyler Goodlet faec496686 Add some tooling params to `collapse_eg()` 2025-07-14 13:15:33 -04:00
Tyler Goodlet d262926773 Move `.is_multi_cancelled()` to `.trioniics._beg`
Since it's for beg filtering, the current impl should be renamed anyway;
it's not just for filtering cancelled excs.

Deats,
- added a real doc string, links to official eg docs and fixed the
  return typing.
- adjust all internal imports to match.
2025-07-14 13:15:33 -04:00
Tyler Goodlet 93802486bb Drop stale comment from inter-peer suite 2025-07-14 13:15:27 -04:00
Tyler Goodlet 41877c476e Use `nest_from_op()` in some runtime logs for actor-state-repring 2025-07-14 13:15:27 -04:00
30 changed files with 1596 additions and 625 deletions

View File

@ -317,7 +317,6 @@ def test_subactor_breakpoint(
assert in_prompt_msg( assert in_prompt_msg(
child, [ child, [
'MessagingError:',
'RemoteActorError:', 'RemoteActorError:',
"('breakpoint_forever'", "('breakpoint_forever'",
'bdb.BdbQuit', 'bdb.BdbQuit',

View File

@ -116,9 +116,11 @@ def test_shield_pause(
child.pid, child.pid,
signal.SIGINT, signal.SIGINT,
) )
from tractor._supervise import _shutdown_msg
expect( expect(
child, child,
'Shutting down actor runtime', # 'Shutting down actor runtime',
_shutdown_msg,
timeout=6, timeout=6,
) )
assert_before( assert_before(

View File

@ -252,7 +252,7 @@ def test_simple_context(
pass pass
except BaseExceptionGroup as beg: except BaseExceptionGroup as beg:
# XXX: on windows it seems we may have to expect the group error # XXX: on windows it seems we may have to expect the group error
from tractor._exceptions import is_multi_cancelled from tractor.trionics import is_multi_cancelled
assert is_multi_cancelled(beg) assert is_multi_cancelled(beg)
else: else:
trio.run(main) trio.run(main)

View File

@ -410,7 +410,6 @@ def test_peer_canceller(
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
canceller: Portal = await an.start_actor( canceller: Portal = await an.start_actor(

View File

@ -0,0 +1,237 @@
'''
Special case testing for issues not (dis)covered in the primary
`Context` related functional/scenario suites.
**NOTE: this mod is a WIP** space for handling
odd/rare/undiscovered/not-yet-revealed faults which either
loudly (ideal case) breakl our supervision protocol
or (worst case) result in distributed sys hangs.
Suites here further try to clarify (if [partially] ill-defined) and
verify our edge case semantics for inter-actor-relayed-exceptions
including,
- lowlevel: what remote obj-data is interchanged for IPC and what is
native-obj form is expected from unpacking in the the new
mem-domain.
- which kinds of `RemoteActorError` (and its derivs) are expected by which
(types of) peers (parent, child, sibling, etc) with what
particular meta-data set such as,
- `.src_uid`: the original (maybe) peer who raised.
- `.relay_uid`: the next-hop-peer who sent it.
- `.relay_path`: the sequence of peer actor hops.
- `.is_inception`: a predicate that denotes multi-hop remote errors.
- when should `ExceptionGroup`s be relayed from a particular
remote endpoint, they should never be caused by implicit `._rpc`
nursery machinery!
- various special `trio` edge cases around its cancellation semantics
and how we (currently) leverage `trio.Cancelled` as a signal for
whether a `Context` task should raise `ContextCancelled` (ctx).
'''
import pytest
import trio
import tractor
from tractor import ( # typing
ActorNursery,
Portal,
Context,
ContextCancelled,
)
@tractor.context
async def sleep_n_chkpt_in_finally(
ctx: Context,
sleep_n_raise: bool,
chld_raise_delay: float,
chld_finally_delay: float,
rent_cancels: bool,
rent_ctxc_delay: float,
expect_exc: str|None = None,
) -> None:
'''
Sync, open a tn, then wait for cancel, run a chkpt inside
the user's `finally:` teardown.
This covers a footgun case that `trio` core doesn't seem to care about
wherein an exc can be masked by a `trio.Cancelled` raised inside a tn emedded
`finally:`.
Also see `test_trioisms::test_acm_embedded_nursery_propagates_enter_err`
for the down and gritty details.
Since a `@context` endpoint fn can also contain code like this,
**and** bc we currently have no easy way other then
`trio.Cancelled` to signal cancellation on each side of an IPC `Context`,
the footgun issue can compound itself as demonstrated in this suite..
Here are some edge cases codified with our WIP "sclang" syntax
(note the parent(rent)/child(chld) naming here is just
pragmatism, generally these most of these cases can occurr
regardless of the distributed-task's supervision hiearchy),
- rent c)=> chld.raises-then-taskc-in-finally
|_ chld's body raises an `exc: BaseException`.
_ in its `finally:` block it runs a chkpoint
which raises a taskc (`trio.Cancelled`) which
masks `exc` instead raising taskc up to the first tn.
_ the embedded/chld tn captures the masking taskc and then
raises it up to the ._rpc-ep-tn instead of `exc`.
_ the rent thinks the child ctxc-ed instead of errored..
'''
await ctx.started()
if expect_exc:
expect_exc: BaseException = tractor._exceptions.get_err_type(
type_name=expect_exc,
)
berr: BaseException|None = None
try:
if not sleep_n_raise:
await trio.sleep_forever()
elif sleep_n_raise:
# XXX this sleep is less then the sleep the parent
# does before calling `ctx.cancel()`
await trio.sleep(chld_raise_delay)
# XXX this will be masked by a taskc raised in
# the `finally:` if this fn doesn't terminate
# before any ctxc-req arrives AND a checkpoint is hit
# in that `finally:`.
raise RuntimeError('my app krurshed..')
except BaseException as _berr:
berr = _berr
# TODO: it'd sure be nice to be able to inject our own
# `ContextCancelled` here instead of of `trio.Cancelled`
# so that our runtime can expect it and this "user code"
# would be able to tell the diff between a generic trio
# cancel and a tractor runtime-IPC cancel.
if expect_exc:
if not isinstance(
berr,
expect_exc,
):
raise ValueError(
f'Unexpected exc type ??\n'
f'{berr!r}\n'
f'\n'
f'Expected a {expect_exc!r}\n'
)
raise berr
# simulate what user code might try even though
# it's a known boo-boo..
finally:
# maybe wait for rent ctxc to arrive
with trio.CancelScope(shield=True):
await trio.sleep(chld_finally_delay)
# !!XXX this will raise `trio.Cancelled` which
# will mask the RTE from above!!!
#
# YES, it's the same case as our extant
# `test_trioisms::test_acm_embedded_nursery_propagates_enter_err`
try:
await trio.lowlevel.checkpoint()
except trio.Cancelled as taskc:
if (scope_err := taskc.__context__):
print(
f'XXX MASKED REMOTE ERROR XXX\n'
f'ENDPOINT exception -> {scope_err!r}\n'
f'will be masked by -> {taskc!r}\n'
)
# await tractor.pause(shield=True)
raise taskc
@pytest.mark.parametrize(
'chld_callspec',
[
dict(
sleep_n_raise=None,
chld_raise_delay=0.1,
chld_finally_delay=0.1,
expect_exc='Cancelled',
rent_cancels=True,
rent_ctxc_delay=0.1,
),
dict(
sleep_n_raise='RuntimeError',
chld_raise_delay=0.1,
chld_finally_delay=1,
expect_exc='RuntimeError',
rent_cancels=False,
rent_ctxc_delay=0.1,
),
],
ids=lambda item: f'chld_callspec={item!r}'
)
def test_unmasked_remote_exc(
debug_mode: bool,
chld_callspec: dict,
tpt_proto: str,
):
expect_exc_str: str|None = chld_callspec['sleep_n_raise']
rent_ctxc_delay: float|None = chld_callspec['rent_ctxc_delay']
async def main():
an: ActorNursery
async with tractor.open_nursery(
debug_mode=debug_mode,
enable_transports=[tpt_proto],
) as an:
ptl: Portal = await an.start_actor(
'cancellee',
enable_modules=[__name__],
)
ctx: Context
async with (
ptl.open_context(
sleep_n_chkpt_in_finally,
**chld_callspec,
) as (ctx, sent),
):
assert not sent
await trio.sleep(rent_ctxc_delay)
await ctx.cancel()
# recv error or result from chld
ctxc: ContextCancelled = await ctx.wait_for_result()
assert (
ctxc is ctx.outcome
and
isinstance(ctxc, ContextCancelled)
)
# always graceful terminate the sub in non-error cases
await an.cancel()
if expect_exc_str:
expect_exc: BaseException = tractor._exceptions.get_err_type(
type_name=expect_exc_str,
)
with pytest.raises(
expected_exception=tractor.RemoteActorError,
) as excinfo:
trio.run(main)
rae = excinfo.value
assert expect_exc == rae.boxed_type
else:
trio.run(main)

View File

@ -112,55 +112,11 @@ def test_acm_embedded_nursery_propagates_enter_err(
''' '''
import tractor import tractor
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery,
unmask_from: BaseException|None = trio.Cancelled
# TODO, maybe offer a collection?
# unmask_from: set[BaseException] = {
# trio.Cancelled,
# },
):
if not unmask_from:
yield
return
try:
yield
except* unmask_from as be_eg:
# TODO, if we offer `unmask_from: set`
# for masker_exc_type in unmask_from:
matches, rest = be_eg.split(unmask_from)
if not matches:
raise
for exc_match in be_eg.exceptions:
if (
(exc_ctx := exc_match.__context__)
and
type(exc_ctx) not in {
# trio.Cancelled, # always by default?
unmask_from,
}
):
exc_ctx.add_note(
f'\n'
f'WARNING: the above error was masked by a {unmask_from!r} !?!\n'
f'Are you always cancelling? Say from a `finally:` ?\n\n'
f'{tn!r}'
)
raise exc_ctx from exc_match
@acm @acm
async def wraps_tn_that_always_cancels(): async def wraps_tn_that_always_cancels():
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
maybe_raise_from_masking_exc( tractor.trionics.maybe_raise_from_masking_exc(
tn=tn, tn=tn,
unmask_from=( unmask_from=(
trio.Cancelled trio.Cancelled
@ -202,3 +158,60 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert_eg, rest_eg = eg.split(AssertionError) assert_eg, rest_eg = eg.split(AssertionError)
assert len(assert_eg.exceptions) == 1 assert len(assert_eg.exceptions) == 1
def test_gatherctxs_with_memchan_breaks_multicancelled(
debug_mode: bool,
):
'''
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
will break a strict-eg-tn's multi-cancelled absorption..
'''
from tractor import (
trionics,
)
@acm
async def open_memchan() -> trio.abc.ReceiveChannel:
task: trio.Task = trio.lowlevel.current_task()
print(
f'Opening {task!r}\n'
)
# 1 to force eager sending
send, recv = trio.open_memory_channel(16)
try:
async with send:
yield recv
finally:
print(
f'Closed {task!r}\n'
)
async def main():
async with (
# XXX should ensure ONLY the KBI
# is relayed upward
trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
), # as tn,
trionics.gather_contexts([
open_memchan(),
open_memchan(),
]) as recv_chans,
):
assert len(recv_chans) == 2
await trio.sleep(1)
raise KeyboardInterrupt
# tn.cancel_scope.cancel()
with pytest.raises(KeyboardInterrupt):
trio.run(main)

View File

@ -101,6 +101,9 @@ from ._state import (
debug_mode, debug_mode,
_ctxvar_Context, _ctxvar_Context,
) )
from .trionics import (
collapse_eg,
)
# ------ - ------ # ------ - ------
if TYPE_CHECKING: if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
@ -740,6 +743,8 @@ class Context:
# cancelled, NOT their reported canceller. IOW in the # cancelled, NOT their reported canceller. IOW in the
# latter case we're cancelled by someone else getting # latter case we're cancelled by someone else getting
# cancelled. # cancelled.
#
# !TODO, switching to `Actor.aid` here!
if (canc := error.canceller) == self._actor.uid: if (canc := error.canceller) == self._actor.uid:
whom: str = 'us' whom: str = 'us'
self._canceller = canc self._canceller = canc
@ -940,7 +945,7 @@ class Context:
self.cancel_called = True self.cancel_called = True
header: str = ( header: str = (
f'Cancelling ctx from {side.upper()}-side\n' f'Cancelling ctx from {side!r}-side\n'
) )
reminfo: str = ( reminfo: str = (
# ' =>\n' # ' =>\n'
@ -2023,10 +2028,8 @@ async def open_context_from_portal(
ctxc_from_callee: ContextCancelled|None = None ctxc_from_callee: ContextCancelled|None = None
try: try:
async with ( async with (
trio.open_nursery( collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as tn,
) as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
spec=ctx_meta.get('pld_spec'), spec=ctx_meta.get('pld_spec'),

View File

@ -28,7 +28,10 @@ from typing import (
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from tractor.log import get_logger from tractor.log import get_logger
from .trionics import gather_contexts from .trionics import (
gather_contexts,
collapse_eg,
)
from .ipc import _connect_chan, Channel from .ipc import _connect_chan, Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
@ -87,7 +90,6 @@ async def get_registry(
yield regstr_ptl yield regstr_ptl
@acm @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
@ -253,9 +255,12 @@ async def find_actor(
for addr in registry_addrs for addr in registry_addrs
) )
portals: list[Portal] portals: list[Portal]
async with gather_contexts( async with (
collapse_eg(),
gather_contexts(
mngrs=maybe_portals, mngrs=maybe_portals,
) as portals: ) as portals,
):
# log.runtime( # log.runtime(
# 'Gathered portals:\n' # 'Gathered portals:\n'
# f'{portals}' # f'{portals}'

View File

@ -21,7 +21,7 @@ Sub-process entry points.
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
import multiprocessing as mp import multiprocessing as mp
import os # import os
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
@ -38,6 +38,7 @@ from .devx import (
_frame_stack, _frame_stack,
pformat, pformat,
) )
# from .msg import pretty_struct
from .to_asyncio import run_as_asyncio_guest from .to_asyncio import run_as_asyncio_guest
from ._addr import UnwrappedAddress from ._addr import UnwrappedAddress
from ._runtime import ( from ._runtime import (
@ -127,20 +128,13 @@ def _trio_main(
if actor.loglevel is not None: if actor.loglevel is not None:
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
actor_info: str = (
f'|_{actor}\n'
f' uid: {actor.uid}\n'
f' pid: {os.getpid()}\n'
f' parent_addr: {parent_addr}\n'
f' loglevel: {actor.loglevel}\n'
)
log.info( log.info(
'Starting new `trio` subactor\n' f'Starting `trio` subactor from parent @ '
f'{parent_addr}\n'
+ +
pformat.nest_from_op( pformat.nest_from_op(
input_op='>(', # see syntax ideas above input_op='>(', # see syntax ideas above
text=actor_info, text=f'{actor}',
nest_indent=2, # since "complete"
) )
) )
logmeth = log.info logmeth = log.info
@ -149,7 +143,7 @@ def _trio_main(
+ +
pformat.nest_from_op( pformat.nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective input_op=')>', # like a "closed-to-play"-icon from super perspective
text=actor_info, text=f'{actor}',
nest_indent=1, nest_indent=1,
) )
) )
@ -167,7 +161,7 @@ def _trio_main(
+ +
pformat.nest_from_op( pformat.nest_from_op(
input_op='c)>', # closed due to cancel (see above) input_op='c)>', # closed due to cancel (see above)
text=actor_info, text=f'{actor}',
) )
) )
except BaseException as err: except BaseException as err:
@ -177,7 +171,7 @@ def _trio_main(
+ +
pformat.nest_from_op( pformat.nest_from_op(
input_op='x)>', # closed by error input_op='x)>', # closed by error
text=actor_info, text=f'{actor}',
) )
) )
# NOTE since we raise a tb will already be shown on the # NOTE since we raise a tb will already be shown on the

View File

@ -1246,55 +1246,6 @@ def unpack_error(
return exc return exc
def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> bool|BaseExceptionGroup:
'''
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(exc, BaseExceptionGroup):
matched_exc: BaseExceptionGroup|None = exc.subgroup(
tuple(ignore_nested),
# TODO, complain about why not allowed XD
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False
def _raise_from_unexpected_msg( def _raise_from_unexpected_msg(
ctx: Context, ctx: Context,
msg: MsgType, msg: MsgType,

View File

@ -39,7 +39,10 @@ import warnings
import trio import trio
from .trionics import maybe_open_nursery from .trionics import (
maybe_open_nursery,
collapse_eg,
)
from ._state import ( from ._state import (
current_actor, current_actor,
) )
@ -115,6 +118,10 @@ class Portal:
@property @property
def chan(self) -> Channel: def chan(self) -> Channel:
'''
Ref to this ctx's underlying `tractor.ipc.Channel`.
'''
return self._chan return self._chan
@property @property
@ -174,10 +181,17 @@ class Portal:
# not expecting a "main" result # not expecting a "main" result
if self._expect_result_ctx is None: if self._expect_result_ctx is None:
peer_id: str = f'{self.channel.aid.reprol()!r}'
log.warning( log.warning(
f"Portal for {self.channel.aid} not expecting a final" f'Portal to peer {peer_id} will not deliver a final result?\n'
" result?\nresult() should only be called if subactor" f'\n'
" was spawned with `ActorNursery.run_in_actor()`") f'Context.result() can only be called by the parent of '
f'a sub-actor when it was spawned with '
f'`ActorNursery.run_in_actor()`'
f'\n'
f'Further this `ActorNursery`-method-API will deprecated in the'
f'near fututre!\n'
)
return NoResult return NoResult
# expecting a "main" result # expecting a "main" result
@ -210,6 +224,7 @@ class Portal:
typname: str = type(self).__name__ typname: str = type(self).__name__
log.warning( log.warning(
f'`{typname}.result()` is DEPRECATED!\n' f'`{typname}.result()` is DEPRECATED!\n'
f'\n'
f'Use `{typname}.wait_for_result()` instead!\n' f'Use `{typname}.wait_for_result()` instead!\n'
) )
return await self.wait_for_result( return await self.wait_for_result(
@ -221,8 +236,10 @@ class Portal:
# terminate all locally running async generator # terminate all locally running async generator
# IPC calls # IPC calls
if self._streams: if self._streams:
log.cancel( peer_id: str = f'{self.channel.aid.reprol()!r}'
f"Cancelling all streams with {self.channel.aid}") report: str = (
f'Cancelling all msg-streams with {peer_id}\n'
)
for stream in self._streams.copy(): for stream in self._streams.copy():
try: try:
await stream.aclose() await stream.aclose()
@ -231,10 +248,18 @@ class Portal:
# (unless of course at some point down the road we # (unless of course at some point down the road we
# won't expect this to always be the case or need to # won't expect this to always be the case or need to
# detect it for respawning purposes?) # detect it for respawning purposes?)
log.debug(f"{stream} was already closed.") report += (
f'->) {stream!r} already closed\n'
)
log.cancel(report)
async def aclose(self): async def aclose(self):
log.debug(f"Closing {self}") log.debug(
f'Closing portal\n'
f'>}}\n'
f'|_{self}\n'
)
# TODO: once we move to implementing our own `ReceiveChannel` # TODO: once we move to implementing our own `ReceiveChannel`
# (including remote task cancellation inside its `.aclose()`) # (including remote task cancellation inside its `.aclose()`)
# we'll need to .aclose all those channels here # we'll need to .aclose all those channels here
@ -260,19 +285,18 @@ class Portal:
__runtimeframe__: int = 1 # noqa __runtimeframe__: int = 1 # noqa
chan: Channel = self.channel chan: Channel = self.channel
peer_id: str = f'{self.channel.aid.reprol()!r}'
if not chan.connected(): if not chan.connected():
log.runtime( log.runtime(
'This channel is already closed, skipping cancel request..' 'Peer {peer_id} is already disconnected\n'
'-> skipping cancel request..\n'
) )
return False return False
reminfo: str = (
f'c)=> {self.channel.aid}\n'
f' |_{chan}\n'
)
log.cancel( log.cancel(
f'Requesting actor-runtime cancel for peer\n\n' f'Sending actor-runtime-cancel-req to peer\n'
f'{reminfo}' f'\n'
f'c)=> {peer_id}\n'
) )
# XXX the one spot we set it? # XXX the one spot we set it?
@ -297,8 +321,9 @@ class Portal:
# may timeout and we never get an ack (obvi racy) # may timeout and we never get an ack (obvi racy)
# but that doesn't mean it wasn't cancelled. # but that doesn't mean it wasn't cancelled.
log.debug( log.debug(
'May have failed to cancel peer?\n' f'May have failed to cancel peer?\n'
f'{reminfo}' f'\n'
f'c)=?> {peer_id}\n'
) )
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
@ -316,22 +341,22 @@ class Portal:
TransportClosed, TransportClosed,
) as tpt_err: ) as tpt_err:
report: str = ( ipc_borked_report: str = (
f'IPC chan for actor already closed or broken?\n\n' f'IPC for actor already closed/broken?\n\n'
f'{self.channel.aid}\n' f'\n'
f' |_{self.channel}\n' f'c)=x> {peer_id}\n'
) )
match tpt_err: match tpt_err:
case TransportClosed(): case TransportClosed():
log.debug(report) log.debug(ipc_borked_report)
case _: case _:
report += ( ipc_borked_report += (
f'\n' f'\n'
f'Unhandled low-level transport-closed/error during\n' f'Unhandled low-level transport-closed/error during\n'
f'Portal.cancel_actor()` request?\n' f'Portal.cancel_actor()` request?\n'
f'<{type(tpt_err).__name__}( {tpt_err} )>\n' f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
) )
log.warning(report) log.warning(ipc_borked_report)
return False return False
@ -488,10 +513,13 @@ class Portal:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await ctx.cancel() await ctx.cancel()
except trio.ClosedResourceError: except trio.ClosedResourceError as cre:
# if the far end terminates before we send a cancel the # if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed. # underlying transport-channel may already be closed.
log.cancel(f'Context {ctx} was already closed?') log.cancel(
f'Context.cancel() -> {cre!r}\n'
f'cid: {ctx.cid!r} already closed?\n'
)
# XXX: should this always be done? # XXX: should this always be done?
# await recv_chan.aclose() # await recv_chan.aclose()
@ -558,14 +586,13 @@ async def open_portal(
assert actor assert actor
was_connected: bool = False was_connected: bool = False
async with maybe_open_nursery( async with (
collapse_eg(),
maybe_open_nursery(
tn, tn,
shield=shield, shield=shield,
strict_exception_groups=False, ) as tn,
# ^XXX^ TODO? soo roll our own then ?? ):
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()

View File

@ -37,16 +37,11 @@ import warnings
import trio import trio
from ._runtime import ( from . import _runtime
Actor,
Arbiter,
# TODO: rename and make a non-actor subtype?
# Arbiter as Registry,
async_main,
)
from .devx import ( from .devx import (
debug, debug,
_frame_stack, _frame_stack,
pformat as _pformat,
) )
from . import _spawn from . import _spawn
from . import _state from . import _state
@ -61,9 +56,12 @@ from ._addr import (
mk_uuid, mk_uuid,
wrap_address, wrap_address,
) )
from .trionics import (
is_multi_cancelled,
collapse_eg,
)
from ._exceptions import ( from ._exceptions import (
RuntimeFailure, RuntimeFailure,
is_multi_cancelled,
) )
@ -99,7 +97,7 @@ async def maybe_block_bp(
): ):
logger.info( logger.info(
f'Found `greenback` installed @ {maybe_mod}\n' f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n' f'Enabling `tractor.pause_from_sync()` support!\n'
) )
os.environ['PYTHONBREAKPOINT'] = ( os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx.debug._sync_pause_from_builtin' 'tractor.devx.debug._sync_pause_from_builtin'
@ -194,13 +192,19 @@ async def open_root_actor(
# read-only state to sublayers? # read-only state to sublayers?
# extra_rt_vars: dict|None = None, # extra_rt_vars: dict|None = None,
) -> Actor: ) -> _runtime.Actor:
''' '''
Runtime init entry point for ``tractor``. Initialize the `tractor` runtime by starting a "root actor" in
a parent-most Python process.
All (disjoint) actor-process-trees-as-programs are created via
this entrypoint.
''' '''
# XXX NEVER allow nested actor-trees! # XXX NEVER allow nested actor-trees!
if already_actor := _state.current_actor(err_on_no_runtime=False): if already_actor := _state.current_actor(
err_on_no_runtime=False,
):
rtvs: dict[str, Any] = _state._runtime_vars rtvs: dict[str, Any] = _state._runtime_vars
root_mailbox: list[str, int] = rtvs['_root_mailbox'] root_mailbox: list[str, int] = rtvs['_root_mailbox']
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs'] registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
@ -270,14 +274,20 @@ async def open_root_actor(
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
registry_addrs = [arbiter_addr] uw_reg_addrs = [arbiter_addr]
if not registry_addrs: uw_reg_addrs = registry_addrs
registry_addrs: list[UnwrappedAddress] = default_lo_addrs( if not uw_reg_addrs:
uw_reg_addrs: list[UnwrappedAddress] = default_lo_addrs(
enable_transports enable_transports
) )
assert registry_addrs # must exist by now since all below code is dependent
assert uw_reg_addrs
registry_addrs: list[Address] = [
wrap_address(uw_addr)
for uw_addr in uw_reg_addrs
]
loglevel = ( loglevel = (
loglevel loglevel
@ -326,10 +336,10 @@ async def open_root_actor(
enable_stack_on_sig() enable_stack_on_sig()
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[UnwrappedAddress] = [] ponged_addrs: list[Address] = []
async def ping_tpt_socket( async def ping_tpt_socket(
addr: UnwrappedAddress, addr: Address,
timeout: float = 1, timeout: float = 1,
) -> None: ) -> None:
''' '''
@ -349,17 +359,22 @@ async def open_root_actor(
# be better to eventually have a "discovery" protocol # be better to eventually have a "discovery" protocol
# with basic handshake instead? # with basic handshake instead?
with trio.move_on_after(timeout): with trio.move_on_after(timeout):
async with _connect_chan(addr): async with _connect_chan(addr.unwrap()):
ponged_addrs.append(addr) ponged_addrs.append(addr)
except OSError: except OSError:
# TODO: make this a "discovery" log level? # ?TODO, make this a "discovery" log level?
logger.info( logger.info(
f'No actor registry found @ {addr}\n' f'No root-actor registry found @ {addr!r}\n'
) )
# !TODO, this is basically just another (abstract)
# happy-eyeballs, so we should try for formalize it somewhere
# in a `.[_]discovery` ya?
#
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
for addr in registry_addrs: for uw_addr in uw_reg_addrs:
addr: Address = wrap_address(uw_addr)
tn.start_soon( tn.start_soon(
ping_tpt_socket, ping_tpt_socket,
addr, addr,
@ -381,31 +396,35 @@ async def open_root_actor(
f'Registry(s) seem(s) to exist @ {ponged_addrs}' f'Registry(s) seem(s) to exist @ {ponged_addrs}'
) )
actor = Actor( actor = _runtime.Actor(
name=name or 'anonymous', name=name or 'anonymous',
uuid=mk_uuid(), uuid=mk_uuid(),
registry_addrs=ponged_addrs, registry_addrs=ponged_addrs,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
) )
# DO NOT use the registry_addrs as the transport server # **DO NOT** use the registry_addrs as the
# addrs for this new non-registar, root-actor. # ipc-transport-server's bind-addrs as this is
# a new NON-registrar, ROOT-actor.
#
# XXX INSTEAD, bind random addrs using the same tpt
# proto.
for addr in ponged_addrs: for addr in ponged_addrs:
waddr: Address = wrap_address(addr)
trans_bind_addrs.append( trans_bind_addrs.append(
waddr.get_random(bindspace=waddr.bindspace) addr.get_random(
bindspace=addr.bindspace,
)
) )
# Start this local actor as the "registrar", aka a regular # Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of # actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors. # other process-tree-local sub-actors.
else: else:
# NOTE that if the current actor IS THE REGISTAR, the # NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken: # following init steps are taken:
# - the tranport layer server is bound to each addr # - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default. # pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs trans_bind_addrs = uw_reg_addrs
# - it is normally desirable for any registrar to stay up # - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub) # indefinitely until either all registered (child/sub)
@ -416,7 +435,8 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/pull/348 # https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296 # https://github.com/goodboy/tractor/issues/296
actor = Arbiter( # TODO: rename as `RootActor` or is that even necessary?
actor = _runtime.Arbiter(
name=name or 'registrar', name=name or 'registrar',
uuid=mk_uuid(), uuid=mk_uuid(),
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
@ -428,6 +448,16 @@ async def open_root_actor(
# `.trio.run()`. # `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio'] actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries. # Start up main task set via core actor-runtime nurseries.
try: try:
# assign process-local actor # assign process-local actor
@ -435,21 +465,27 @@ async def open_root_actor(
# start local channel-server and fake the portal API # start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery # NOTE: this won't block since we provide the nursery
ml_addrs_str: str = '\n'.join( report: str = f'Starting actor-runtime for {actor.aid.reprol()!r}\n'
f'@{addr}' for addr in trans_bind_addrs if reg_addrs := actor.registry_addrs:
report += (
'-> Opening new registry @ '
+
'\n'.join(
f'{addr}' for addr in reg_addrs
) )
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
) )
logger.info(f'{report}\n')
# start the actor runtime in a new task # start runtime in a bg sub-task, yield to caller.
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as root_tn,
) as nursery:
# ``_runtime.async_main()`` creates an internal nursery # XXX, finally-footgun below?
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process) # and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called # tree has terminated thereby conducting so called
# "end-to-end" structured concurrency throughout an # "end-to-end" structured concurrency throughout an
@ -457,9 +493,9 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all # "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as # transitively spawned actors/processes must be as
# well. # well.
await nursery.start( await root_tn.start(
partial( partial(
async_main, _runtime.async_main,
actor, actor,
accept_addrs=trans_bind_addrs, accept_addrs=trans_bind_addrs,
parent_addr=None parent_addr=None
@ -507,7 +543,7 @@ async def open_root_actor(
raise raise
finally: finally:
# NOTE: not sure if we'll ever need this but it's # NOTE/TODO?, not sure if we'll ever need this but it's
# possibly better for even more determinism? # possibly better for even more determinism?
# logger.cancel( # logger.cancel(
# f'Waiting on {len(nurseries)} nurseries in root..') # f'Waiting on {len(nurseries)} nurseries in root..')
@ -516,11 +552,20 @@ async def open_root_actor(
# for an in nurseries: # for an in nurseries:
# tempn.start_soon(an.exited.wait) # tempn.start_soon(an.exited.wait)
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op='>) ',
text=actor.pformat(),
nest_prefix='|_',
)
logger.info( logger.info(
f'Closing down root actor\n' f'Closing down root actor\n'
f'>)\n' f'{op_nested_actor_repr}'
f'|_{actor}\n'
) )
# XXX, THIS IS A *finally-footgun*!
# -> though already shields iternally it can
# taskc here and mask underlying errors raised in
# the try-block above?
with trio.CancelScope(shield=True):
await actor.cancel(None) # self cancel await actor.cancel(None) # self cancel
finally: finally:
# revert all process-global runtime state # revert all process-global runtime state
@ -534,10 +579,16 @@ async def open_root_actor(
_state._current_actor = None _state._current_actor = None
_state._last_actor_terminated = actor _state._last_actor_terminated = actor
logger.runtime( sclang_repr: str = _pformat.nest_from_op(
input_op=')>',
text=actor.pformat(),
nest_prefix='|_',
nest_indent=1,
)
logger.info(
f'Root actor terminated\n' f'Root actor terminated\n'
f')>\n' f'{sclang_repr}'
f' |_{actor}\n'
) )

View File

@ -37,6 +37,7 @@ import warnings
import trio import trio
from trio import ( from trio import (
Cancelled,
CancelScope, CancelScope,
Nursery, Nursery,
TaskStatus, TaskStatus,
@ -52,13 +53,18 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
MsgTypeError, MsgTypeError,
TransportClosed, TransportClosed,
is_multi_cancelled,
pack_error, pack_error,
unpack_error, unpack_error,
) )
from .trionics import (
collapse_eg,
is_multi_cancelled,
maybe_raise_from_masking_exc,
)
from .devx import ( from .devx import (
debug, debug,
add_div, add_div,
pformat as _pformat,
) )
from . import _state from . import _state
from .log import get_logger from .log import get_logger
@ -67,7 +73,7 @@ from .msg import (
MsgCodec, MsgCodec,
PayloadT, PayloadT,
NamespacePath, NamespacePath,
# pretty_struct, pretty_struct,
_ops as msgops, _ops as msgops,
) )
from tractor.msg.types import ( from tractor.msg.types import (
@ -215,11 +221,18 @@ async def _invoke_non_context(
task_status.started(ctx) task_status.started(ctx)
result = await coro result = await coro
fname: str = func.__name__ fname: str = func.__name__
op_nested_task: str = _pformat.nest_from_op(
input_op=f')> cid: {ctx.cid!r}',
text=f'{ctx._task}',
nest_indent=1, # under >
)
log.runtime( log.runtime(
'RPC complete:\n' f'RPC task complete\n'
f'task: {ctx._task}\n' f'\n'
f'|_cid={ctx.cid}\n' f'{op_nested_task}\n'
f'|_{fname}() -> {pformat(result)}\n' f'\n'
f')> {fname}() -> {pformat(result)}\n'
) )
# NOTE: only send result if we know IPC isn't down # NOTE: only send result if we know IPC isn't down
@ -250,7 +263,7 @@ async def _errors_relayed_via_ipc(
ctx: Context, ctx: Context,
is_rpc: bool, is_rpc: bool,
hide_tb: bool = False, hide_tb: bool = True,
debug_kbis: bool = False, debug_kbis: bool = False,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -375,9 +388,9 @@ async def _errors_relayed_via_ipc(
# they can be individually ccancelled. # they can be individually ccancelled.
finally: finally:
# if the error is not from user code and instead a failure # if the error is not from user code and instead a failure of
# of a runtime RPC or transport failure we do prolly want to # an internal-runtime-RPC or IPC-connection, we do (prolly) want
# show this frame # to show this frame!
if ( if (
rpc_err rpc_err
and ( and (
@ -616,32 +629,40 @@ async def _invoke(
# -> the below scope is never exposed to the # -> the below scope is never exposed to the
# `@context` marked RPC function. # `@context` marked RPC function.
# - `._portal` is never set. # - `._portal` is never set.
scope_err: BaseException|None = None
try: try:
tn: trio.Nursery # TODO: better `trionics` primitive/tooling usage here!
rpc_ctx_cs: CancelScope
async with (
trio.open_nursery(
strict_exception_groups=False,
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
) as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=ctx_meta.get('pld_spec'),
dec_hook=ctx_meta.get('dec_hook'),
),
):
ctx._scope_nursery = tn
rpc_ctx_cs = ctx._scope = tn.cancel_scope
task_status.started(ctx)
# TODO: better `trionics` tooling:
# -[ ] should would be nice to have our `TaskMngr` # -[ ] should would be nice to have our `TaskMngr`
# nursery here! # nursery here!
# -[ ] payload value checking like we do with # -[ ] payload value checking like we do with
# `.started()` such that the debbuger can engage # `.started()` such that the debbuger can engage
# here in the child task instead of waiting for the # here in the child task instead of waiting for the
# parent to crash with it's own MTE.. # parent to crash with it's own MTE..
#
tn: Nursery
rpc_ctx_cs: CancelScope
async with (
collapse_eg(),
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=ctx_meta.get('pld_spec'),
dec_hook=ctx_meta.get('dec_hook'),
),
# XXX NOTE, this being the "most embedded"
# scope ensures unasking of the `await coro` below
# *should* never be interfered with!!
maybe_raise_from_masking_exc(
tn=tn,
unmask_from=Cancelled,
) as _mbme, # maybe boxed masked exc
):
ctx._scope_nursery = tn
rpc_ctx_cs = ctx._scope = tn.cancel_scope
task_status.started(ctx)
# invoke user endpoint fn.
res: Any|PayloadT = await coro res: Any|PayloadT = await coro
return_msg: Return|CancelAck = return_msg_type( return_msg: Return|CancelAck = return_msg_type(
cid=cid, cid=cid,
@ -651,7 +672,8 @@ async def _invoke(
ctx._result = res ctx._result = res
log.runtime( log.runtime(
f'Sending result msg and exiting {ctx.side!r}\n' f'Sending result msg and exiting {ctx.side!r}\n'
f'{return_msg}\n' f'\n'
f'{pretty_struct.pformat(return_msg)}\n'
) )
await chan.send(return_msg) await chan.send(return_msg)
@ -743,39 +765,48 @@ async def _invoke(
BaseExceptionGroup, BaseExceptionGroup,
BaseException, BaseException,
trio.Cancelled, trio.Cancelled,
) as _scope_err:
) as scope_error: scope_err = _scope_err
if ( if (
isinstance(scope_error, RuntimeError) isinstance(scope_err, RuntimeError)
and scope_error.args and
and 'Cancel scope stack corrupted' in scope_error.args[0] scope_err.args
and
'Cancel scope stack corrupted' in scope_err.args[0]
): ):
log.exception('Cancel scope stack corrupted!?\n') log.exception('Cancel scope stack corrupted!?\n')
# debug.mk_pdb().set_trace() # debug.mk_pdb().set_trace()
# always set this (child) side's exception as the # always set this (child) side's exception as the
# local error on the context # local error on the context
ctx._local_error: BaseException = scope_error ctx._local_error: BaseException = scope_err
# ^-TODO-^ question, # ^-TODO-^ question,
# does this matter other then for # does this matter other then for
# consistentcy/testing? # consistentcy/testing?
# |_ no user code should be in this scope at this point # |_ no user code should be in this scope at this point
# AND we already set this in the block below? # AND we already set this in the block below?
# if a remote error was set then likely the # XXX if a remote error was set then likely the
# exception group was raised due to that, so # exc group was raised due to that, so
# and we instead raise that error immediately! # and we instead raise that error immediately!
ctx.maybe_raise() maybe_re: (
ContextCancelled|RemoteActorError
) = ctx.maybe_raise()
if maybe_re:
log.cancel(
f'Suppressing remote-exc from peer,\n'
f'{maybe_re!r}\n'
)
# maybe TODO: pack in come kinda # maybe TODO: pack in come kinda
# `trio.Cancelled.__traceback__` here so they can be # `trio.Cancelled.__traceback__` here so they can be
# unwrapped and displayed on the caller side? no se.. # unwrapped and displayed on the caller side? no se..
raise raise scope_err
# `@context` entrypoint task bookeeping. # `@context` entrypoint task bookeeping.
# i.e. only pop the context tracking if used ;) # i.e. only pop the context tracking if used ;)
finally: finally:
assert chan.uid assert chan.aid
# don't pop the local context until we know the # don't pop the local context until we know the
# associated child isn't in debug any more # associated child isn't in debug any more
@ -802,16 +833,19 @@ async def _invoke(
descr_str += ( descr_str += (
f'\n{merr!r}\n' # needed? f'\n{merr!r}\n' # needed?
f'{tb_str}\n' f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
) )
else: else:
descr_str += f'\n{merr!r}\n' descr_str += f'\n{merr!r}\n'
else: else:
descr_str += f'\nand final result {ctx.outcome!r}\n' descr_str += f'\nwith final result {ctx.outcome!r}\n'
logmeth( logmeth(
message f'{message}\n'
+ f'\n'
descr_str f'{descr_str}\n'
) )
@ -978,8 +1012,6 @@ async def process_messages(
cid=cid, cid=cid,
kwargs=kwargs, kwargs=kwargs,
): ):
kwargs |= {'req_chan': chan}
# XXX NOTE XXX don't start entire actor # XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is # runtime cancellation if this actor is
# currently in debug mode! # currently in debug mode!
@ -998,14 +1030,14 @@ async def process_messages(
cid, cid,
chan, chan,
actor.cancel, actor.cancel,
kwargs, kwargs | {'req_chan': chan},
is_rpc=False, is_rpc=False,
return_msg_type=CancelAck, return_msg_type=CancelAck,
) )
log.runtime( log.runtime(
'Cancelling IPC transport msg-loop with peer:\n' 'Cancelling RPC-msg-loop with peer\n'
f'|_{chan}\n' f'->c}} {chan.aid.reprol()}@[{chan.maddr}]\n'
) )
loop_cs.cancel() loop_cs.cancel()
break break
@ -1018,7 +1050,7 @@ async def process_messages(
): ):
target_cid: str = kwargs['cid'] target_cid: str = kwargs['cid']
kwargs |= { kwargs |= {
'requesting_uid': chan.uid, 'requesting_aid': chan.aid,
'ipc_msg': msg, 'ipc_msg': msg,
# XXX NOTE! ONLY the rpc-task-owning # XXX NOTE! ONLY the rpc-task-owning
@ -1054,21 +1086,34 @@ async def process_messages(
ns=ns, ns=ns,
func=funcname, func=funcname,
kwargs=kwargs, # type-spec this? see `msg.types` kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid, uid=actor_uuid,
): ):
if actor_uuid != chan.aid.uid:
raise RuntimeError(
f'IPC <Start> msg <-> chan.aid mismatch!?\n'
f'Channel.aid = {chan.aid!r}\n'
f'Start.uid = {actor_uuid!r}\n'
)
# await debug.pause()
op_repr: str = 'Start <=) '
req_repr: str = _pformat.nest_from_op(
input_op=op_repr,
op_suffix='',
nest_prefix='',
text=f'{chan}',
nest_indent=len(op_repr)-1,
rm_from_first_ln='<',
# ^XXX, subtract -1 to account for
# <Channel
# ^_chevron to be stripped
)
start_status: str = ( start_status: str = (
'Handling RPC `Start` request\n' 'Handling RPC request\n'
f'<= peer: {actorid}\n\n' f'{req_repr}\n'
f' |_{chan}\n' f'\n'
f' |_cid: {cid}\n\n' f'->{{ ipc-context-id: {cid!r}\n'
# f' |_{ns}.{funcname}({kwargs})\n' f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n'
f'>> {actor.uid}\n'
f' |_{actor}\n'
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
# f' |_{ns}.{funcname}({kwargs})\n\n'
# f'{pretty_struct.pformat(msg)}\n'
) )
# runtime-internal endpoint: `Actor.<funcname>` # runtime-internal endpoint: `Actor.<funcname>`
@ -1097,10 +1142,6 @@ async def process_messages(
await chan.send(err_msg) await chan.send(err_msg)
continue continue
start_status += (
f' -> func: {func}\n'
)
# schedule a task for the requested RPC function # schedule a task for the requested RPC function
# in the actor's main "service nursery". # in the actor's main "service nursery".
# #
@ -1108,7 +1149,7 @@ async def process_messages(
# supervision isolation? would avoid having to # supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks` # manage RPC tasks individually in `._rpc_tasks`
# table? # table?
start_status += ' -> scheduling new task..\n' start_status += '->( scheduling new task..\n'
log.runtime(start_status) log.runtime(start_status)
try: try:
ctx: Context = await actor._service_n.start( ctx: Context = await actor._service_n.start(
@ -1192,12 +1233,24 @@ async def process_messages(
# END-OF `async for`: # END-OF `async for`:
# IPC disconnected via `trio.EndOfChannel`, likely # IPC disconnected via `trio.EndOfChannel`, likely
# due to a (graceful) `Channel.aclose()`. # due to a (graceful) `Channel.aclose()`.
chan_op_repr: str = '<=x] '
chan_repr: str = _pformat.nest_from_op(
input_op=chan_op_repr,
op_suffix='',
nest_prefix='',
text=chan.pformat(),
nest_indent=len(chan_op_repr)-1,
rm_from_first_ln='<',
)
log.runtime( log.runtime(
f'channel for {chan.uid} disconnected, cancelling RPC tasks\n' f'IPC channel disconnected\n'
f'|_{chan}\n' f'{chan_repr}\n'
f'\n'
f'->c) cancelling RPC tasks.\n'
) )
await actor.cancel_rpc_tasks( await actor.cancel_rpc_tasks(
req_uid=actor.uid, req_aid=actor.aid,
# a "self cancel" in terms of the lifetime of the # a "self cancel" in terms of the lifetime of the
# IPC connection which is presumed to be the # IPC connection which is presumed to be the
# source of any requests for spawned tasks. # source of any requests for spawned tasks.
@ -1269,13 +1322,37 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
if msg is None: if msg is None:
message: str = 'Exiting IPC msg loop without receiving a msg?' message: str = 'Exiting RPC-loop without receiving a msg?'
else: else:
task_op_repr: str = ')>'
task: trio.Task = trio.lowlevel.current_task()
# maybe add cancelled opt prefix
if task._cancel_status.effectively_cancelled:
task_op_repr = 'c' + task_op_repr
task_repr: str = _pformat.nest_from_op(
input_op=task_op_repr,
text=f'{task!r}',
nest_indent=1,
)
# chan_op_repr: str = '<=} '
# chan_repr: str = _pformat.nest_from_op(
# input_op=chan_op_repr,
# op_suffix='',
# nest_prefix='',
# text=chan.pformat(),
# nest_indent=len(chan_op_repr)-1,
# rm_from_first_ln='<',
# )
message: str = ( message: str = (
'Exiting IPC msg loop with final msg\n\n' f'Exiting RPC-loop with final msg\n'
f'<= peer: {chan.uid}\n' f'\n'
f' |_{chan}\n\n' # f'{chan_repr}\n'
# f'{pretty_struct.pformat(msg)}' f'{task_repr}\n'
f'\n'
f'{pretty_struct.pformat(msg)}'
f'\n'
) )
log.runtime(message) log.runtime(message)

View File

@ -55,6 +55,7 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
import uuid import uuid
import textwrap
from types import ModuleType from types import ModuleType
import warnings import warnings
@ -73,6 +74,9 @@ from tractor.msg import (
pretty_struct, pretty_struct,
types as msgtypes, types as msgtypes,
) )
from .trionics import (
collapse_eg,
)
from .ipc import ( from .ipc import (
Channel, Channel,
# IPCServer, # causes cycles atm.. # IPCServer, # causes cycles atm..
@ -97,7 +101,10 @@ from ._exceptions import (
MsgTypeError, MsgTypeError,
unpack_error, unpack_error,
) )
from .devx import debug from .devx import (
debug,
pformat as _pformat
)
from ._discovery import get_registry from ._discovery import get_registry
from ._portal import Portal from ._portal import Portal
from . import _state from . import _state
@ -206,7 +213,7 @@ class Actor:
*, *,
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: str|None = None, loglevel: str|None = None,
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[Address]|None = None,
spawn_method: str|None = None, spawn_method: str|None = None,
# TODO: remove! # TODO: remove!
@ -227,7 +234,7 @@ class Actor:
# state # state
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called_by: tuple[str, tuple]|None = None
self._cancel_called: bool = False self._cancel_called: bool = False
# retreive and store parent `__main__` data which # retreive and store parent `__main__` data which
@ -249,11 +256,12 @@ class Actor:
if arbiter_addr is not None: if arbiter_addr is not None:
warnings.warn( warnings.warn(
'`Actor(arbiter_addr=<blah>)` is now deprecated.\n' '`Actor(arbiter_addr=<blah>)` is now deprecated.\n'
'Use `registry_addrs: list[tuple]` instead.', 'Use `registry_addrs: list[Address]` instead.',
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -292,8 +300,10 @@ class Actor:
# input via the validator. # input via the validator.
self._reg_addrs: list[UnwrappedAddress] = [] self._reg_addrs: list[UnwrappedAddress] = []
if registry_addrs: if registry_addrs:
self.reg_addrs: list[UnwrappedAddress] = registry_addrs _state._runtime_vars['_registry_addrs'] = self.reg_addrs = [
_state._runtime_vars['_registry_addrs'] = registry_addrs addr.unwrap()
for addr in registry_addrs
]
@property @property
def aid(self) -> msgtypes.Aid: def aid(self) -> msgtypes.Aid:
@ -339,46 +349,125 @@ class Actor:
def pid(self) -> int: def pid(self) -> int:
return self._aid.pid return self._aid.pid
def pformat(self) -> str: @property
ds: str = '=' def repr_state(self) -> str:
parent_uid: tuple|None = None if self.cancel_complete:
if rent_chan := self._parent_chan: return 'cancelled'
parent_uid = rent_chan.uid
elif canceller := self.cancel_caller:
return f' and cancel-called by {canceller}'
else:
return 'running'
def pformat(
self,
ds: str = ': ',
indent: int = 0,
privates: bool = False,
) -> str:
fmtstr: str = f'|_id: {self.aid.reprol()!r}\n'
if privates:
aid_nest_prefix: str = '|_aid='
aid_field_repr: str = _pformat.nest_from_op(
input_op='',
text=pretty_struct.pformat(
struct=self.aid,
field_indent=2,
),
op_suffix='',
nest_prefix=aid_nest_prefix,
nest_indent=0,
)
fmtstr: str = f'{aid_field_repr}'
if rent_chan := self._parent_chan:
fmtstr += (
f"|_parent{ds}{rent_chan.aid.reprol()}\n"
)
peers: list = []
server: _server.IPCServer = self.ipc_server server: _server.IPCServer = self.ipc_server
if server: if server:
peers: list[tuple] = list(server._peer_connected) if privates:
server_repr: str = self._ipc_server.pformat(
privates=privates,
)
# create field ln as a key-header indented under
# and up to the section's key prefix.
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = _pformat.nest_from_op(
input_op='', # nest as sub-obj
op_suffix='',
text=server_repr,
)
fmtstr += (
f"{server_repr}"
)
else:
fmtstr += (
f'|_ipc: {server.repr_state!r}\n'
)
fmtstr: str = ( fmtstr += (
f' |_id: {self.aid!r}\n' f'|_rpc: {len(self._rpc_tasks)} active tasks\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f" ipc_server{ds}{self._ipc_server}\n"
f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
) )
return (
'<Actor(\n' # TODO, actually fix the .repr_state impl/output?
+ # append ipc-ctx state summary
fmtstr # ctxs: dict = self._contexts
+ # if ctxs:
')>\n' # ctx_states: dict[str, int] = {}
# for ctx in self._contexts.values():
# ctx_state: str = ctx.repr_state
# cnt = ctx_states.setdefault(ctx_state, 0)
# ctx_states[ctx_state] = cnt + 1
# fmtstr += (
# f" ctxs{ds}{ctx_states}\n"
# )
# runtime-state
task_name: str = '<dne>'
if task := self._task:
task_name: str = task.name
fmtstr += (
# TODO, this just like ctx?
f'|_state: {self.repr_state!r}\n'
f' task: {task_name}\n'
f' loglevel: {self.loglevel!r}\n'
f' subactors_spawned: {len(self._actoruid2nursery)}\n'
) )
if not _state.is_root_process():
fmtstr += f' spawn_method: {self._spawn_method!r}\n'
if privates:
fmtstr += (
# f' actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' cancel_complete{ds}{self._cancel_complete}\n'
f' cancel_called_by_remote{ds}{self._cancel_called_by}\n'
f' cancel_called{ds}{self._cancel_called}\n'
)
if fmtstr:
fmtstr: str = textwrap.indent(
text=fmtstr,
prefix=' '*(1 + indent),
)
_repr: str = (
f'<{type(self).__name__}(\n'
f'{fmtstr}'
f')>\n'
)
if indent:
_repr: str = textwrap.indent(
text=_repr,
prefix=' '*indent,
)
return _repr
__repr__ = pformat __repr__ = pformat
@ -386,7 +475,11 @@ class Actor:
def reg_addrs(self) -> list[UnwrappedAddress]: def reg_addrs(self) -> list[UnwrappedAddress]:
''' '''
List of (socket) addresses for all known (and contactable) List of (socket) addresses for all known (and contactable)
registry actors. registry-service actors in "unwrapped" (i.e. IPC interchange
wire-compat) form.
If you are looking for the "wrapped" address form, use
`.registry_addrs` instead.
''' '''
return self._reg_addrs return self._reg_addrs
@ -405,8 +498,14 @@ class Actor:
self._reg_addrs = addrs self._reg_addrs = addrs
@property
def registry_addrs(self) -> list[Address]:
return [wrap_address(uw_addr)
for uw_addr in self.reg_addrs]
def load_modules( def load_modules(
self, self,
) -> None: ) -> None:
''' '''
Load explicitly enabled python modules from local fs after Load explicitly enabled python modules from local fs after
@ -453,6 +552,14 @@ class Actor:
) )
raise raise
# ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
# - _get_rpc_func(),
# - _deliver_ctx_payload(),
# - get_context(),
# - start_remote_task(),
# - cancel_rpc_tasks(),
# - _cancel_task(),
#
def _get_rpc_func(self, ns, funcname): def _get_rpc_func(self, ns, funcname):
''' '''
Try to lookup and return a target RPC func from the Try to lookup and return a target RPC func from the
@ -496,11 +603,11 @@ class Actor:
queue. queue.
''' '''
uid: tuple[str, str] = chan.uid aid: msgtypes.Aid = chan.aid
assert uid, f"`chan.uid` can't be {uid}" assert aid, f"`chan.aid` can't be {aid}"
try: try:
ctx: Context = self._contexts[( ctx: Context = self._contexts[(
uid, aid.uid,
cid, cid,
# TODO: how to determine this tho? # TODO: how to determine this tho?
@ -511,7 +618,7 @@ class Actor:
'Ignoring invalid IPC msg!?\n' 'Ignoring invalid IPC msg!?\n'
f'Ctx seems to not/no-longer exist??\n' f'Ctx seems to not/no-longer exist??\n'
f'\n' f'\n'
f'<=? {uid}\n' f'<=? {aid.reprol()!r}\n'
f' |_{pretty_struct.pformat(msg)}\n' f' |_{pretty_struct.pformat(msg)}\n'
) )
match msg: match msg:
@ -560,6 +667,7 @@ class Actor:
msging session's lifetime. msging session's lifetime.
''' '''
# ?TODO, use Aid here as well?
actor_uid = chan.uid actor_uid = chan.uid
assert actor_uid assert actor_uid
try: try:
@ -908,6 +1016,22 @@ class Actor:
None, # self cancel all rpc tasks None, # self cancel all rpc tasks
) )
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@property
def cancel_called(self) -> bool:
'''
Was this actor requested to cancel by a remote peer actor.
'''
return self._cancel_called_by is not None
@property
def cancel_caller(self) -> msgtypes.Aid|None:
return self._cancel_called_by
async def cancel( async def cancel(
self, self,
@ -932,20 +1056,18 @@ class Actor:
''' '''
( (
requesting_uid, requesting_aid, # Aid
requester_type, requester_type, # str
req_chan, req_chan,
log_meth, log_meth,
) = ( ) = (
req_chan.uid, req_chan.aid,
'peer', 'peer',
req_chan, req_chan,
log.cancel, log.cancel,
) if req_chan else ( ) if req_chan else (
# a self cancel of ALL rpc tasks # a self cancel of ALL rpc tasks
self.uid, self.aid,
'self', 'self',
self, self,
log.runtime, log.runtime,
@ -953,14 +1075,14 @@ class Actor:
# TODO: just use the new `Context.repr_rpc: str` (and # TODO: just use the new `Context.repr_rpc: str` (and
# other) repr fields instead of doing this all manual.. # other) repr fields instead of doing this all manual..
msg: str = ( msg: str = (
f'Actor-runtime cancel request from {requester_type}\n\n' f'Actor-runtime cancel request from {requester_type!r}\n'
f'<=c) {requesting_uid}\n'
f' |_{self}\n'
f'\n' f'\n'
f'<=c)\n'
f'{self}'
) )
# TODO: what happens here when we self-cancel tho? # TODO: what happens here when we self-cancel tho?
self._cancel_called_by_remote: tuple = requesting_uid self._cancel_called_by: tuple = requesting_aid
self._cancel_called = True self._cancel_called = True
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
@ -988,7 +1110,7 @@ class Actor:
# self-cancel **all** ongoing RPC tasks # self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks( await self.cancel_rpc_tasks(
req_uid=requesting_uid, req_aid=requesting_aid,
parent_chan=None, parent_chan=None,
) )
@ -1005,19 +1127,11 @@ class Actor:
self._cancel_complete.set() self._cancel_complete.set()
return True return True
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def _cancel_task( async def _cancel_task(
self, self,
cid: str, cid: str,
parent_chan: Channel, parent_chan: Channel,
requesting_uid: tuple[str, str]|None, requesting_aid: msgtypes.Aid|None,
ipc_msg: dict|None|bool = False, ipc_msg: dict|None|bool = False,
@ -1055,7 +1169,7 @@ class Actor:
log.runtime( log.runtime(
'Cancel request for invalid RPC task.\n' 'Cancel request for invalid RPC task.\n'
'The task likely already completed or was never started!\n\n' 'The task likely already completed or was never started!\n\n'
f'<= canceller: {requesting_uid}\n' f'<= canceller: {requesting_aid}\n'
f'=> {cid}@{parent_chan.uid}\n' f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n' f' |_{parent_chan}\n'
) )
@ -1063,9 +1177,12 @@ class Actor:
log.cancel( log.cancel(
'Rxed cancel request for RPC task\n' 'Rxed cancel request for RPC task\n'
f'<=c) {requesting_uid}\n' f'{ctx._task!r} <=c) {requesting_aid}\n'
f' |_{ctx._task}\n' f'|_>> {ctx.repr_rpc}\n'
f' >> {ctx.repr_rpc}\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# f'=> {ctx._task}\n' # f'=> {ctx._task}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n'
# f' |_ {ctx._task}\n\n' # f' |_ {ctx._task}\n\n'
@ -1086,9 +1203,9 @@ class Actor:
) )
if ( if (
ctx._canceller is None ctx._canceller is None
and requesting_uid and requesting_aid
): ):
ctx._canceller: tuple = requesting_uid ctx._canceller: tuple = requesting_aid.uid
# TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and # TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and
# then raise and pack it here? # then raise and pack it here?
@ -1114,7 +1231,7 @@ class Actor:
# wait for _invoke to mark the task complete # wait for _invoke to mark the task complete
flow_info: str = ( flow_info: str = (
f'<= canceller: {requesting_uid}\n' f'<= canceller: {requesting_aid}\n'
f'=> ipc-parent: {parent_chan}\n' f'=> ipc-parent: {parent_chan}\n'
f'|_{ctx}\n' f'|_{ctx}\n'
) )
@ -1131,7 +1248,7 @@ class Actor:
async def cancel_rpc_tasks( async def cancel_rpc_tasks(
self, self,
req_uid: tuple[str, str], req_aid: msgtypes.Aid,
# NOTE: when None is passed we cancel **all** rpc # NOTE: when None is passed we cancel **all** rpc
# tasks running in this actor! # tasks running in this actor!
@ -1148,7 +1265,7 @@ class Actor:
if not tasks: if not tasks:
log.runtime( log.runtime(
'Actor has no cancellable RPC tasks?\n' 'Actor has no cancellable RPC tasks?\n'
f'<= canceller: {req_uid}\n' f'<= canceller: {req_aid.reprol()}\n'
) )
return return
@ -1188,7 +1305,7 @@ class Actor:
) )
log.cancel( log.cancel(
f'Cancelling {descr} RPC tasks\n\n' f'Cancelling {descr} RPC tasks\n\n'
f'<=c) {req_uid} [canceller]\n' f'<=c) {req_aid} [canceller]\n'
f'{rent_chan_repr}' f'{rent_chan_repr}'
f'c)=> {self.uid} [cancellee]\n' f'c)=> {self.uid} [cancellee]\n'
f' |_{self} [with {len(tasks)} tasks]\n' f' |_{self} [with {len(tasks)} tasks]\n'
@ -1216,7 +1333,7 @@ class Actor:
await self._cancel_task( await self._cancel_task(
cid, cid,
task_caller_chan, task_caller_chan,
requesting_uid=req_uid, requesting_aid=req_aid,
) )
if tasks: if tasks:
@ -1244,25 +1361,13 @@ class Actor:
''' '''
return self.accept_addrs[0] return self.accept_addrs[0]
def get_parent(self) -> Portal: # TODO, this should delegate ONLY to the
''' # `._spawn_spec._runtime_vars: dict` / `._state` APIs?
Return a `Portal` to our parent. #
# XXX, AH RIGHT that's why..
''' # it's bc we pass this as a CLI flag to the child.py precisely
assert self._parent_chan, "No parent channel for this actor?" # bc we need the bootstrapping pre `async_main()`.. but maybe
return Portal(self._parent_chan) # keep this as an impl deat and not part of the pub iface impl?
def get_chans(
self,
uid: tuple[str, str],
) -> list[Channel]:
'''
Return all IPC channels to the actor with provided `uid`.
'''
return self._peers[uid]
def is_infected_aio(self) -> bool: def is_infected_aio(self) -> bool:
''' '''
If `True`, this actor is running `trio` in guest mode on If `True`, this actor is running `trio` in guest mode on
@ -1273,6 +1378,23 @@ class Actor:
''' '''
return self._infected_aio return self._infected_aio
# ?TODO, is this the right type for this method?
def get_parent(self) -> Portal:
'''
Return a `Portal` to our parent.
'''
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def async_main( async def async_main(
actor: Actor, actor: Actor,
@ -1316,6 +1438,8 @@ async def async_main(
# establish primary connection with immediate parent # establish primary connection with immediate parent
actor._parent_chan: Channel|None = None actor._parent_chan: Channel|None = None
# is this a sub-actor?
# get runtime info from parent.
if parent_addr is not None: if parent_addr is not None:
( (
actor._parent_chan, actor._parent_chan,
@ -1350,18 +1474,18 @@ async def async_main(
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in # cancellation steps have (mostly) occurred in
# a deterministic way. # a deterministic way.
async with trio.open_nursery( root_tn: trio.Nursery
strict_exception_groups=False, async with (
) as root_nursery: collapse_eg(),
actor._root_n = root_nursery trio.open_nursery() as root_tn,
):
actor._root_n = root_tn
assert actor._root_n assert actor._root_n
ipc_server: _server.IPCServer ipc_server: _server.IPCServer
async with ( async with (
trio.open_nursery( collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as service_nursery,
) as service_nursery,
_server.open_ipc_server( _server.open_ipc_server(
parent_tn=service_nursery, parent_tn=service_nursery,
stream_handler_tn=service_nursery, stream_handler_tn=service_nursery,
@ -1412,9 +1536,6 @@ async def async_main(
# TODO: why is this not with the root nursery? # TODO: why is this not with the root nursery?
try: try:
log.runtime(
'Booting IPC server'
)
eps: list = await ipc_server.listen_on( eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs, accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery, stream_handler_nursery=service_nursery,
@ -1446,18 +1567,6 @@ async def async_main(
# TODO, just read direct from ipc_server? # TODO, just read direct from ipc_server?
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
# all sub-actors should be able to speak to
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
for addr in accept_addrs:
waddr: Address = wrap_address(addr)
raddrs.append(addr)
else:
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Register with the arbiter if we're told its addr # Register with the arbiter if we're told its addr
log.runtime( log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
@ -1475,6 +1584,7 @@ async def async_main(
except AssertionError: except AssertionError:
await debug.pause() await debug.pause()
# !TODO, get rid of the local-portal crap XD
async with get_registry(addr) as reg_portal: async with get_registry(addr) as reg_portal:
for accept_addr in accept_addrs: for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr) accept_addr = wrap_address(accept_addr)
@ -1499,7 +1609,7 @@ async def async_main(
# start processing parent requests until our channel # start processing parent requests until our channel
# server is 100% up and running. # server is 100% up and running.
if actor._parent_chan: if actor._parent_chan:
await root_nursery.start( await root_tn.start(
partial( partial(
_rpc.process_messages, _rpc.process_messages,
chan=actor._parent_chan, chan=actor._parent_chan,
@ -1511,8 +1621,9 @@ async def async_main(
# 'Blocking on service nursery to exit..\n' # 'Blocking on service nursery to exit..\n'
) )
log.runtime( log.runtime(
"Service nursery complete\n" 'Service nursery complete\n'
"Waiting on root nursery to complete" '\n'
'->} waiting on root nursery to complete..\n'
) )
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
@ -1567,6 +1678,7 @@ async def async_main(
finally: finally:
teardown_report: str = ( teardown_report: str = (
'Main actor-runtime task completed\n' 'Main actor-runtime task completed\n'
'\n'
) )
# ?TODO? should this be in `._entry`/`._root` mods instead? # ?TODO? should this be in `._entry`/`._root` mods instead?
@ -1608,7 +1720,8 @@ async def async_main(
# Unregister actor from the registry-sys / registrar. # Unregister actor from the registry-sys / registrar.
if ( if (
is_registered is_registered
and not actor.is_registrar and
not actor.is_registrar
): ):
failed: bool = False failed: bool = False
for addr in actor.reg_addrs: for addr in actor.reg_addrs:
@ -1643,7 +1756,8 @@ async def async_main(
ipc_server.has_peers(check_chans=True) ipc_server.has_peers(check_chans=True)
): ):
teardown_report += ( teardown_report += (
f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n' f'-> Waiting for remaining peers to clear..\n'
f' {pformat(ipc_server._peers)}'
) )
log.runtime(teardown_report) log.runtime(teardown_report)
await ipc_server.wait_for_no_more_peers( await ipc_server.wait_for_no_more_peers(
@ -1651,15 +1765,23 @@ async def async_main(
) )
teardown_report += ( teardown_report += (
'-> All peer channels are complete\n' '-]> all peer channels are complete.\n'
) )
# op_nested_actor_repr: str = _pformat.nest_from_op(
# input_op=')>',
# text=actor.pformat(),
# nest_prefix='|_',
# nest_indent=1, # under >
# )
teardown_report += ( teardown_report += (
'Actor runtime exiting\n' '-)> actor runtime main task exit.\n'
f'>)\n' # f'{op_nested_actor_repr}'
f'|_{actor}\n'
) )
log.info(teardown_report) # if _state._runtime_vars['_is_root']:
# log.info(teardown_report)
# else:
log.runtime(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`! # TODO: rename to `Registry` and move to `.discovery._registry`!

View File

@ -34,9 +34,9 @@ from typing import (
import trio import trio
from trio import TaskStatus from trio import TaskStatus
from .devx.debug import ( from .devx import (
maybe_wait_for_debugger, debug,
acquire_debug_lock, pformat as _pformat
) )
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
@ -51,14 +51,17 @@ from tractor._portal import Portal
from tractor._runtime import Actor from tractor._runtime import Actor
from tractor._entry import _mp_main from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure from tractor._exceptions import ActorFailure
from tractor.msg.types import ( from tractor.msg import (
Aid, types as msgtypes,
SpawnSpec, pretty_struct,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ipc import IPCServer from ipc import (
_server,
Channel,
)
from ._supervise import ActorNursery from ._supervise import ActorNursery
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
@ -328,12 +331,13 @@ async def soft_kill(
see `.hard_kill()`). see `.hard_kill()`).
''' '''
peer_aid: Aid = portal.channel.aid chan: Channel = portal.channel
peer_aid: msgtypes.Aid = chan.aid
try: try:
log.cancel( log.cancel(
f'Soft killing sub-actor via portal request\n' f'Soft killing sub-actor via portal request\n'
f'\n' f'\n'
f'(c=> {peer_aid}\n' f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n'
f' |_{proc}\n' f' |_{proc}\n'
) )
# wait on sub-proc to signal termination # wait on sub-proc to signal termination
@ -341,7 +345,7 @@ async def soft_kill(
except trio.Cancelled: except trio.Cancelled:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await maybe_wait_for_debugger( await debug.maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False '_debug_mode', False
), ),
@ -465,7 +469,7 @@ async def trio_proc(
"--uid", "--uid",
# TODO, how to pass this over "wire" encodings like # TODO, how to pass this over "wire" encodings like
# cmdline args? # cmdline args?
# -[ ] maybe we can add an `Aid.min_tuple()` ? # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
str(subactor.uid), str(subactor.uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
@ -483,12 +487,13 @@ async def trio_proc(
cancelled_during_spawn: bool = False cancelled_during_spawn: bool = False
proc: trio.Process|None = None proc: trio.Process|None = None
ipc_server: IPCServer = actor_nursery._actor.ipc_server ipc_server: _server.Server = actor_nursery._actor.ipc_server
try: try:
try: try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
log.runtime( log.runtime(
'Started new child\n' f'Started new child subproc\n'
f'(>\n'
f' |_{proc}\n' f' |_{proc}\n'
) )
@ -507,10 +512,10 @@ async def trio_proc(
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if is_root_process(): if is_root_process():
await maybe_wait_for_debugger() await debug.maybe_wait_for_debugger()
elif proc is not None: elif proc is not None:
async with acquire_debug_lock(subactor.uid): async with debug.acquire_debug_lock(subactor.uid):
# soft wait on the proc to terminate # soft wait on the proc to terminate
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
@ -528,14 +533,19 @@ async def trio_proc(
# send a "spawning specification" which configures the # send a "spawning specification" which configures the
# initial runtime state of the child. # initial runtime state of the child.
sspec = SpawnSpec( sspec = msgtypes.SpawnSpec(
_parent_main_data=subactor._parent_main_data, _parent_main_data=subactor._parent_main_data,
enable_modules=subactor.enable_modules, enable_modules=subactor.enable_modules,
reg_addrs=subactor.reg_addrs, reg_addrs=subactor.reg_addrs,
bind_addrs=bind_addrs, bind_addrs=bind_addrs,
_runtime_vars=_runtime_vars, _runtime_vars=_runtime_vars,
) )
log.runtime(f'Sending spawn spec: {str(sspec)}') log.runtime(
f'Sending spawn spec to child\n'
f'{{}}=> {chan.aid.reprol()!r}\n'
f'\n'
f'{pretty_struct.pformat(sspec)}\n'
)
await chan.send(sspec) await chan.send(sspec)
# track subactor in current nursery # track subactor in current nursery
@ -563,7 +573,7 @@ async def trio_proc(
# condition. # condition.
await soft_kill( await soft_kill(
proc, proc,
trio.Process.wait, trio.Process.wait, # XXX, uses `pidfd_open()` below.
portal portal
) )
@ -571,8 +581,7 @@ async def trio_proc(
# tandem if not done already # tandem if not done already
log.cancel( log.cancel(
'Cancelling portal result reaper task\n' 'Cancelling portal result reaper task\n'
f'>c)\n' f'c)> {subactor.aid.reprol()!r}\n'
f' |_{subactor.uid}\n'
) )
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
@ -581,21 +590,24 @@ async def trio_proc(
# allowed! Do this **after** cancellation/teardown to avoid # allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early. # killing the process too early.
if proc: if proc:
reap_repr: str = _pformat.nest_from_op(
input_op='>x)',
text=subactor.pformat(),
)
log.cancel( log.cancel(
f'Hard reap sequence starting for subactor\n' f'Hard reap sequence starting for subactor\n'
f'>x)\n' f'{reap_repr}'
f' |_{subactor}@{subactor.uid}\n'
) )
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if cancelled_during_spawn: if cancelled_during_spawn:
# Try again to avoid TTY clobbering. # Try again to avoid TTY clobbering.
async with acquire_debug_lock(subactor.uid): async with debug.acquire_debug_lock(subactor.uid):
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
await maybe_wait_for_debugger( await debug.maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False '_debug_mode', False
), ),
@ -624,7 +636,7 @@ async def trio_proc(
# acquire the lock and get notified of who has it, # acquire the lock and get notified of who has it,
# check that uid against our known children? # check that uid against our known children?
# this_uid: tuple[str, str] = current_actor().uid # this_uid: tuple[str, str] = current_actor().uid
# await acquire_debug_lock(this_uid) # await debug.acquire_debug_lock(this_uid)
if proc.poll() is None: if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}") log.cancel(f"Attempting to hard kill {proc}")
@ -727,7 +739,7 @@ async def mp_proc(
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
ipc_server: IPCServer = actor_nursery._actor.ipc_server ipc_server: _server.Server = actor_nursery._actor.ipc_server
try: try:
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the

View File

@ -21,7 +21,6 @@
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import inspect import inspect
from pprint import pformat
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -31,7 +30,10 @@ import warnings
import trio import trio
from .devx.debug import maybe_wait_for_debugger from .devx import (
debug,
pformat as _pformat,
)
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
mk_uuid, mk_uuid,
@ -40,8 +42,11 @@ from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._runtime import Actor from ._runtime import Actor
from ._portal import Portal from ._portal import Portal
from ._exceptions import ( from .trionics import (
is_multi_cancelled, is_multi_cancelled,
collapse_eg,
)
from ._exceptions import (
ContextCancelled, ContextCancelled,
) )
from ._root import ( from ._root import (
@ -197,7 +202,7 @@ class ActorNursery:
loglevel=loglevel, loglevel=loglevel,
# verbatim relay this actor's registrar addresses # verbatim relay this actor's registrar addresses
registry_addrs=current_actor().reg_addrs, registry_addrs=current_actor().registry_addrs,
) )
parent_addr: UnwrappedAddress = self._actor.accept_addr parent_addr: UnwrappedAddress = self._actor.accept_addr
assert parent_addr assert parent_addr
@ -322,9 +327,10 @@ class ActorNursery:
server: IPCServer = self._actor.ipc_server server: IPCServer = self._actor.ipc_server
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
) as tn: trio.open_nursery() as tn,
):
subactor: Actor subactor: Actor
proc: trio.Process proc: trio.Process
@ -417,10 +423,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# `ActorNursery.start_actor()`). # `ActorNursery.start_actor()`).
# errors from this daemon actor nursery bubble up to caller # errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as da_nursery,
) as da_nursery: ):
try: try:
# This is the inner level "run in actor" nursery. It is # This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using # awaited first since actors spawned in this way (using
@ -430,11 +436,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# immediately raised for handling by a supervisor strategy. # immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards # As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified. # the above "daemon actor" nursery will be notified.
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as ria_nursery,
) as ria_nursery: ):
an = ActorNursery( an = ActorNursery(
actor, actor,
ria_nursery, ria_nursery,
@ -451,7 +456,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# the "hard join phase". # the "hard join phase".
log.runtime( log.runtime(
'Waiting on subactors to complete:\n' 'Waiting on subactors to complete:\n'
f'{pformat(an._children)}\n' f'>}} {len(an._children)}\n'
) )
an._join_procs.set() an._join_procs.set()
@ -465,7 +470,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# will make the pdb repl unusable. # will make the pdb repl unusable.
# Instead try to wait for pdb to be released before # Instead try to wait for pdb to be released before
# tearing down. # tearing down.
await maybe_wait_for_debugger( await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug child_in_debug=an._at_least_one_child_in_debug
) )
@ -541,7 +546,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# XXX: yet another guard before allowing the cancel # XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug. # sequence in case a (single) child is in debug.
await maybe_wait_for_debugger( await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug child_in_debug=an._at_least_one_child_in_debug
) )
@ -590,9 +595,14 @@ async def _open_and_supervise_one_cancels_all_nursery(
# final exit # final exit
@acm _shutdown_msg: str = (
'Actor-runtime-shutdown'
)
# @api_frame # @api_frame
@acm
async def open_nursery( async def open_nursery(
*, # named params only!
hide_tb: bool = True, hide_tb: bool = True,
**kwargs, **kwargs,
# ^TODO, paramspec for `open_root_actor()` # ^TODO, paramspec for `open_root_actor()`
@ -677,17 +687,26 @@ async def open_nursery(
): ):
__tracebackhide__: bool = False __tracebackhide__: bool = False
msg: str = (
'Actor-nursery exited\n' op_nested_an_repr: str = _pformat.nest_from_op(
f'|_{an}\n' input_op=')>',
text=f'{an}',
# nest_prefix='|_',
nest_indent=1, # under >
) )
an_msg: str = (
f'Actor-nursery exited\n'
f'{op_nested_an_repr}\n'
)
# keep noise low during std operation.
log.runtime(an_msg)
if implicit_runtime: if implicit_runtime:
# shutdown runtime if it was started and report noisly # shutdown runtime if it was started and report noisly
# that we're did so. # that we're did so.
msg += '=> Shutting down actor runtime <=\n' msg: str = (
'\n'
'\n'
f'{_shutdown_msg} )>\n'
)
log.info(msg) log.info(msg)
else:
# keep noise low during std operation.
log.runtime(msg)

View File

@ -59,7 +59,7 @@ from tractor._state import (
debug_mode, debug_mode,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor._exceptions import ( from tractor.trionics import (
is_multi_cancelled, is_multi_cancelled,
) )
from ._trace import ( from ._trace import (

View File

@ -171,10 +171,22 @@ class Channel:
) )
assert transport.raddr == addr assert transport.raddr == addr
chan = Channel(transport=transport) chan = Channel(transport=transport)
# ?TODO, compact this into adapter level-methods?
# -[ ] would avoid extra repr-calcs if level not active?
# |_ how would the `calc_if_level` look though? func?
if log.at_least_level('runtime'):
from tractor.devx import (
pformat as _pformat,
)
chan_repr: str = _pformat.nest_from_op(
input_op='[>',
text=chan.pformat(),
nest_indent=1,
)
log.runtime( log.runtime(
f'Connected channel IPC transport\n' f'Connected channel IPC transport\n'
f'[>\n' f'{chan_repr}'
f' |_{chan}\n'
) )
return chan return chan
@ -196,9 +208,12 @@ class Channel:
self._transport.codec = orig self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs? # TODO: do a .src/.dst: str for maddrs?
def pformat(self) -> str: def pformat(
self,
privates: bool = False,
) -> str:
if not self._transport: if not self._transport:
return '<Channel with inactive transport?>' return '<Channel( with inactive transport? )>'
tpt: MsgTransport = self._transport tpt: MsgTransport = self._transport
tpt_name: str = type(tpt).__name__ tpt_name: str = type(tpt).__name__
@ -206,26 +221,35 @@ class Channel:
'connected' if self.connected() 'connected' if self.connected()
else 'closed' else 'closed'
) )
return ( repr_str: str = (
f'<Channel(\n' f'<Channel(\n'
f' |_status: {tpt_status!r}\n' f' |_status: {tpt_status!r}\n'
) + (
f' _closed={self._closed}\n' f' _closed={self._closed}\n'
f' _cancel_called={self._cancel_called}\n' f' _cancel_called={self._cancel_called}\n'
f'\n' if privates else ''
f' |_peer: {self.aid}\n' ) + ( # peer-actor (processs) section
f'\n' f' |_peer: {self.aid.reprol()!r}\n'
if self.aid else ' |_peer: <unknown>\n'
) + (
f' |_msgstream: {tpt_name}\n' f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n' f' maddr: {tpt.maddr!r}\n'
f' layer={tpt.layer_key!r}\n' f' proto: {tpt.laddr.proto_key!r}\n'
f' laddr={tpt.laddr}\n' f' layer: {tpt.layer_key!r}\n'
f' raddr={tpt.raddr}\n' f' codec: {tpt.codec_key!r}\n'
f' codec={tpt.codec_key!r}\n' f' .laddr={tpt.laddr}\n'
f' stream={tpt.stream}\n' f' .raddr={tpt.raddr}\n'
f' maddr={tpt.maddr!r}\n' ) + (
f' drained={tpt.drained}\n' f' ._transport.stream={tpt.stream}\n'
f' ._transport.drained={tpt.drained}\n'
if privates else ''
) + (
f' _send_lock={tpt._send_lock.statistics()}\n' f' _send_lock={tpt._send_lock.statistics()}\n'
f')>\n' if privates else ''
) + (
')>\n'
) )
return repr_str
# NOTE: making this return a value that can be passed to # NOTE: making this return a value that can be passed to
# `eval()` is entirely **optional** FYI! # `eval()` is entirely **optional** FYI!
@ -247,6 +271,10 @@ class Channel:
def raddr(self) -> Address|None: def raddr(self) -> Address|None:
return self._transport.raddr if self._transport else None return self._transport.raddr if self._transport else None
@property
def maddr(self) -> str:
return self._transport.maddr if self._transport else '<no-tpt>'
# TODO: something like, # TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])` # `pdbp.hideframe_on(errors=[MsgTypeError])`
# instead of the `try/except` hack we have rn.. # instead of the `try/except` hack we have rn..
@ -434,8 +462,8 @@ class Channel:
await self.send(aid) await self.send(aid)
peer_aid: Aid = await self.recv() peer_aid: Aid = await self.recv()
log.runtime( log.runtime(
f'Received hanshake with peer actor,\n' f'Received hanshake with peer\n'
f'{peer_aid}\n' f'<= {peer_aid.reprol(sin_uuid=False)}\n'
) )
# NOTE, we always are referencing the remote peer! # NOTE, we always are referencing the remote peer!
self.aid = peer_aid self.aid = peer_aid

View File

@ -17,9 +17,16 @@
Utils to tame mp non-SC madeness Utils to tame mp non-SC madeness
''' '''
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
# a flag,
# - [ ] soo if it works like this, drop this module entirely for
# 3.13+ B)
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
#
def disable_mantracker(): def disable_mantracker():
''' '''
Disable all ``multiprocessing``` "resource tracking" machinery since Disable all `multiprocessing` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness. it's an absolute multi-threaded mess of non-SC madness.
''' '''

View File

@ -26,7 +26,7 @@ from contextlib import (
from functools import partial from functools import partial
from itertools import chain from itertools import chain
import inspect import inspect
from pprint import pformat import textwrap
from types import ( from types import (
ModuleType, ModuleType,
) )
@ -43,7 +43,10 @@ from trio import (
SocketListener, SocketListener,
) )
# from ..devx import debug from ..devx.pformat import (
ppfmt,
nest_from_op,
)
from .._exceptions import ( from .._exceptions import (
TransportClosed, TransportClosed,
) )
@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs(
): ):
log.cancel( log.cancel(
'Waiting on cancel request to peer..\n' 'Waiting on cancel request to peer\n'
f'c)=>\n' f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n'
f' |_{chan.aid}\n'
) )
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs(
log.warning( log.warning(
'Draining msg from disconnected peer\n' 'Draining msg from disconnected peer\n'
f'{chan_info}' f'{chan_info}'
f'{pformat(msg)}\n' f'{ppfmt(msg)}\n'
) )
# cid: str|None = msg.get('cid') # cid: str|None = msg.get('cid')
cid: str|None = msg.cid cid: str|None = msg.cid
@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs(
if children := local_nursery._children: if children := local_nursery._children:
# indent from above local-nurse repr # indent from above local-nurse repr
report += ( report += (
f' |_{pformat(children)}\n' f' |_{ppfmt(children)}\n'
) )
log.warning(report) log.warning(report)
@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs(
log.runtime( log.runtime(
f'Peer IPC broke but subproc is alive?\n\n' f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.aid}@{chan.raddr}\n' f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n'
f' |_{proc}\n' f'\n'
f'{proc}\n'
) )
return local_nursery return local_nursery
@ -324,9 +327,10 @@ async def handle_stream_from_peer(
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
con_status: str = ( con_status: str = (
'New inbound IPC connection <=\n' f'New inbound IPC transport connection\n'
f'|_{chan}\n' f'<=( {stream!r}\n'
) )
con_status_steps: str = ''
# initial handshake with peer phase # initial handshake with peer phase
try: try:
@ -372,7 +376,7 @@ async def handle_stream_from_peer(
if _pre_chan := server._peers.get(uid): if _pre_chan := server._peers.get(uid):
familiar: str = 'pre-existing-peer' familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += ( con_status_steps += (
f' -> Handshake with {familiar} `{uid_short}` complete\n' f' -> Handshake with {familiar} `{uid_short}` complete\n'
) )
@ -397,7 +401,7 @@ async def handle_stream_from_peer(
None, None,
) )
if event: if event:
con_status += ( con_status_steps += (
' -> Waking subactor spawn waiters: ' ' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n' f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
@ -408,7 +412,7 @@ async def handle_stream_from_peer(
event.set() event.set()
else: else:
con_status += ( con_status_steps += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore ) # type: ignore
@ -422,8 +426,15 @@ async def handle_stream_from_peer(
# TODO: can we just use list-ref directly? # TODO: can we just use list-ref directly?
chans.append(chan) chans.append(chan)
con_status += ' -> Entering RPC msg loop..\n' con_status_steps += ' -> Entering RPC msg loop..\n'
log.runtime(con_status) log.runtime(
con_status
+
textwrap.indent(
con_status_steps,
prefix=' '*3, # align to first-ln
)
)
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
@ -456,41 +467,67 @@ async def handle_stream_from_peer(
disconnected=disconnected, disconnected=disconnected,
) )
# ``Channel`` teardown and closure sequence # `Channel` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected # drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = ( #
f'IPC channel disconnected:\n' # -[x]TODO mk this be like
f'<=x uid: {chan.aid}\n' # <=x Channel(
f' |_{pformat(chan)}\n\n' # |_field: blah
# )>
op_repr: str = '<=x '
chan_repr: str = nest_from_op(
input_op=op_repr,
op_suffix='',
nest_prefix='',
text=chan.pformat(),
nest_indent=len(op_repr)-1,
rm_from_first_ln='<',
) )
con_teardown_status: str = (
f'IPC channel disconnect\n'
f'\n'
f'{chan_repr}\n'
f'\n'
)
chans.remove(chan) chans.remove(chan)
# TODO: do we need to be this pedantic? # TODO: do we need to be this pedantic?
if not chans: if not chans:
con_teardown_status += ( con_teardown_status += (
f'-> No more channels with {chan.aid}' f'-> No more channels with {chan.aid.reprol()!r}\n'
) )
server._peers.pop(uid, None) server._peers.pop(uid, None)
peers_str: str = '' if peers := list(server._peers.values()):
for uid, chans in server._peers.items(): peer_cnt: int = len(peers)
peers_str += ( if (
f'uid: {uid}\n' (first := peers[0][0]) is not chan
) and
for i, chan in enumerate(chans): not disconnected
peers_str += ( and
f' |_[{i}] {pformat(chan)}\n' peer_cnt > 1
) ):
con_teardown_status += ( con_teardown_status += (
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n' f'-> Remaining IPC {peer_cnt-1!r} peers:\n'
)
for chans in server._peers.values():
first: Channel = chans[0]
if not (
first is chan
and
disconnected
):
con_teardown_status += (
f' |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n'
) )
# No more channels to other actors (at all) registered # No more channels to other actors (at all) registered
# as connected. # as connected.
if not server._peers: if not server._peers:
con_teardown_status += ( con_teardown_status += (
'Signalling no more peer channel connections' '-> Signalling no more peer connections!\n'
) )
server._no_more_peers.set() server._no_more_peers.set()
@ -579,10 +616,10 @@ async def handle_stream_from_peer(
class Endpoint(Struct): class Endpoint(Struct):
''' '''
An instance of an IPC "bound" address where the lifetime of the An instance of an IPC "bound" address where the lifetime of an
"ability to accept connections" (from clients) and then handle "ability to accept connections" and handle the subsequent
those inbound sessions or sequences-of-packets is determined by sequence-of-packets (maybe oriented as sessions) is determined by
a (maybe pair of) nurser(y/ies). the underlying nursery scope(s).
''' '''
addr: Address addr: Address
@ -600,6 +637,24 @@ class Endpoint(Struct):
MsgTransport, # handle to encoded-msg transport stream MsgTransport, # handle to encoded-msg transport stream
] = {} ] = {}
def pformat(
self,
indent: int = 0,
privates: bool = False,
) -> str:
type_repr: str = type(self).__name__
fmtstr: str = (
# !TODO, always be ns aware!
# f'|_netns: {netns}\n'
f' |.addr: {self.addr!r}\n'
f' |_peers: {len(self.peer_tpts)}\n'
)
return (
f'<{type_repr}(\n'
f'{fmtstr}'
f')>'
)
async def start_listener(self) -> SocketListener: async def start_listener(self) -> SocketListener:
tpt_mod: ModuleType = inspect.getmodule(self.addr) tpt_mod: ModuleType = inspect.getmodule(self.addr)
lstnr: SocketListener = await tpt_mod.start_listener( lstnr: SocketListener = await tpt_mod.start_listener(
@ -639,11 +694,13 @@ class Endpoint(Struct):
class Server(Struct): class Server(Struct):
_parent_tn: Nursery _parent_tn: Nursery
_stream_handler_tn: Nursery _stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently # level-triggered sig for whether "no peers are currently
# connected"; field is **always** set to an instance but # connected"; field is **always** set to an instance but
# initialized with `.is_set() == True`. # initialized with `.is_set() == True`.
_no_more_peers: trio.Event _no_more_peers: trio.Event
# active eps as allocated by `.listen_on()`
_endpoints: list[Endpoint] = [] _endpoints: list[Endpoint] = []
# connection tracking & mgmt # connection tracking & mgmt
@ -651,12 +708,19 @@ class Server(Struct):
str, # uaid str, # uaid
list[Channel], # IPC conns from peer list[Channel], # IPC conns from peer
] = defaultdict(list) ] = defaultdict(list)
# events-table with entries registered unset while the local
# actor is waiting on a new actor to inbound connect, often
# a parent waiting on its child just after spawn.
_peer_connected: dict[ _peer_connected: dict[
tuple[str, str], tuple[str, str],
trio.Event, trio.Event,
] = {} ] = {}
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
# - null when not yet booted,
# - unset when active,
# - set when fully shutdown with 0 eps active.
_shutdown: trio.Event|None = None _shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[Endpoint]` and # TODO, maybe just make `._endpoints: list[Endpoint]` and
@ -664,7 +728,6 @@ class Server(Struct):
# @property # @property
# def addrs2eps(self) -> dict[Address, Endpoint]: # def addrs2eps(self) -> dict[Address, Endpoint]:
# ... # ...
@property @property
def proto_keys(self) -> list[str]: def proto_keys(self) -> list[str]:
return [ return [
@ -690,7 +753,7 @@ class Server(Struct):
# TODO: obvi a different server type when we eventually # TODO: obvi a different server type when we eventually
# support some others XD # support some others XD
log.runtime( log.runtime(
f'Cancelling server(s) for\n' f'Cancelling server(s) for tpt-protos\n'
f'{self.proto_keys!r}\n' f'{self.proto_keys!r}\n'
) )
self._parent_tn.cancel_scope.cancel() self._parent_tn.cancel_scope.cancel()
@ -717,6 +780,14 @@ class Server(Struct):
f'protos: {tpt_protos!r}\n' f'protos: {tpt_protos!r}\n'
) )
def len_peers(
self,
) -> int:
return len([
chan.connected()
for chan in chain(*self._peers.values())
])
def has_peers( def has_peers(
self, self,
check_chans: bool = False, check_chans: bool = False,
@ -730,13 +801,11 @@ class Server(Struct):
has_peers has_peers
and and
check_chans check_chans
and
(peer_cnt := self.len_peers())
): ):
has_peers: bool = ( has_peers: bool = (
any(chan.connected() peer_cnt > 0
for chan in chain(
*self._peers.values()
)
)
and and
has_peers has_peers
) )
@ -803,30 +872,66 @@ class Server(Struct):
return ev.is_set() return ev.is_set()
def pformat(self) -> str: @property
def repr_state(self) -> str:
'''
A `str`-status describing the current state of this
IPC server in terms of the current operating "phase".
'''
status = 'server is active'
if self.has_peers():
peer_cnt: int = self.len_peers()
status: str = (
f'{peer_cnt!r} peer chans'
)
else:
status: str = 'No peer chans'
if self.is_shutdown():
status: str = 'server-shutdown'
return status
def pformat(
self,
privates: bool = False,
) -> str:
eps: list[Endpoint] = self._endpoints eps: list[Endpoint] = self._endpoints
state_repr: str = ( # state_repr: str = (
f'{len(eps)!r} IPC-endpoints active' # f'{len(eps)!r} endpoints active'
) # )
fmtstr = ( fmtstr = (
f' |_state: {state_repr}\n' f' |_state: {self.repr_state!r}\n'
f' no_more_peers: {self.has_peers()}\n'
) )
if privates:
fmtstr += f' no_more_peers: {self.has_peers()}\n'
if self._shutdown is not None: if self._shutdown is not None:
shutdown_stats: EventStatistics = self._shutdown.statistics() shutdown_stats: EventStatistics = self._shutdown.statistics()
fmtstr += ( fmtstr += (
f' task_waiting_on_shutdown: {shutdown_stats}\n' f' task_waiting_on_shutdown: {shutdown_stats}\n'
) )
if eps := self._endpoints:
addrs: list[tuple] = [
ep.addr for ep in eps
]
repr_eps: str = ppfmt(addrs)
fmtstr += ( fmtstr += (
# TODO, use the `ppfmt()` helper from `modden`! f' |_endpoints: {repr_eps}\n'
f' |_endpoints: {pformat(self._endpoints)}\n' # ^TODO? how to indent closing ']'..
f' |_peers: {len(self._peers)} connected\n' )
if peers := self._peers:
fmtstr += (
f' |_peers: {len(peers)} connected\n'
) )
return ( return (
f'<IPCServer(\n' f'<Server(\n'
f'{fmtstr}' f'{fmtstr}'
f')>\n' f')>\n'
) )
@ -885,8 +990,8 @@ class Server(Struct):
) )
log.runtime( log.runtime(
f'Binding to endpoints for,\n' f'Binding endpoints\n'
f'{accept_addrs}\n' f'{ppfmt(accept_addrs)}\n'
) )
eps: list[Endpoint] = await self._parent_tn.start( eps: list[Endpoint] = await self._parent_tn.start(
partial( partial(
@ -896,13 +1001,19 @@ class Server(Struct):
listen_addrs=accept_addrs, listen_addrs=accept_addrs,
) )
) )
self._endpoints.extend(eps)
serv_repr: str = nest_from_op(
input_op='(>',
text=self.pformat(),
nest_indent=1,
)
log.runtime( log.runtime(
f'Started IPC endpoints\n' f'Started IPC server\n'
f'{eps}\n' f'{serv_repr}'
) )
self._endpoints.extend(eps) # XXX, a little sanity on new ep allocations
# XXX, just a little bit of sanity
group_tn: Nursery|None = None group_tn: Nursery|None = None
ep: Endpoint ep: Endpoint
for ep in eps: for ep in eps:
@ -956,9 +1067,13 @@ async def _serve_ipc_eps(
stream_handler_tn=stream_handler_tn, stream_handler_tn=stream_handler_tn,
) )
try: try:
ep_sclang: str = nest_from_op(
input_op='>[',
text=f'{ep.pformat()}',
)
log.runtime( log.runtime(
f'Starting new endpoint listener\n' f'Starting new endpoint listener\n'
f'{ep}\n' f'{ep_sclang}\n'
) )
listener: trio.abc.Listener = await ep.start_listener() listener: trio.abc.Listener = await ep.start_listener()
assert listener is ep._listener assert listener is ep._listener
@ -996,17 +1111,6 @@ async def _serve_ipc_eps(
handler_nursery=stream_handler_tn handler_nursery=stream_handler_tn
) )
) )
# TODO, wow make this message better! XD
log.runtime(
'Started server(s)\n'
+
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
log.runtime(
f'Started IPC endpoints\n'
f'{eps}\n'
)
task_status.started( task_status.started(
eps, eps,
) )
@ -1049,8 +1153,7 @@ async def open_ipc_server(
try: try:
yield ipc_server yield ipc_server
log.runtime( log.runtime(
f'Waiting on server to shutdown or be cancelled..\n' 'Server-tn running until terminated\n'
f'{ipc_server}'
) )
# TODO? when if ever would we want/need this? # TODO? when if ever would we want/need this?
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):

View File

@ -789,6 +789,11 @@ def open_shm_list(
readonly=readonly, readonly=readonly,
) )
# TODO, factor into a @actor_fixture acm-API?
# -[ ] also `@maybe_actor_fixture()` which inludes
# the .current_actor() convenience check?
# |_ orr can that just be in the sin-maybe-version?
#
# "close" attached shm on actor teardown # "close" attached shm on actor teardown
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()

View File

@ -160,10 +160,9 @@ async def start_listener(
Start a TCP socket listener on the given `TCPAddress`. Start a TCP socket listener on the given `TCPAddress`.
''' '''
log.info( log.runtime(
f'Attempting to bind TCP socket\n' f'Trying socket bind\n'
f'>[\n' f'>[ {addr}\n'
f'|_{addr}\n'
) )
# ?TODO, maybe we should just change the lower-level call this is # ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener? # using internall per-listener?
@ -178,11 +177,10 @@ async def start_listener(
assert len(listeners) == 1 assert len(listeners) == 1
listener = listeners[0] listener = listeners[0]
host, port = listener.socket.getsockname()[:2] host, port = listener.socket.getsockname()[:2]
bound_addr: TCPAddress = type(addr).from_addr((host, port))
log.info( log.info(
f'Listening on TCP socket\n' f'Listening on TCP socket\n'
f'[>\n' f'[> {bound_addr}\n'
f' |_{addr}\n'
) )
return listener return listener

View File

@ -81,10 +81,35 @@ BOLD_PALETTE = {
} }
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = CUSTOM_LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False
# TODO: this isn't showing the correct '{filename}' # TODO: this isn't showing the correct '{filename}'
# as it did before.. # as it did before..
class StackLevelAdapter(LoggerAdapter): class StackLevelAdapter(LoggerAdapter):
def at_least_level(
self,
level: str,
) -> bool:
return at_least_level(
log=self,
level=level,
)
def transport( def transport(
self, self,
msg: str, msg: str,
@ -401,19 +426,3 @@ def get_loglevel() -> str:
# global module logger for tractor itself # global module logger for tractor itself
log: StackLevelAdapter = get_logger('tractor') log: StackLevelAdapter = get_logger('tractor')
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = CUSTOM_LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False

View File

@ -210,12 +210,14 @@ class PldRx(Struct):
match msg: match msg:
case Return()|Error(): case Return()|Error():
log.runtime( log.runtime(
f'Rxed final outcome msg\n' f'Rxed final-outcome msg\n'
f'\n'
f'{msg}\n' f'{msg}\n'
) )
case Stop(): case Stop():
log.runtime( log.runtime(
f'Rxed stream stopped msg\n' f'Rxed stream stopped msg\n'
f'\n'
f'{msg}\n' f'{msg}\n'
) )
if passthrough_non_pld_msgs: if passthrough_non_pld_msgs:
@ -261,8 +263,9 @@ class PldRx(Struct):
if ( if (
type(msg) is Return type(msg) is Return
): ):
log.info( log.runtime(
f'Rxed final result msg\n' f'Rxed final result msg\n'
f'\n'
f'{msg}\n' f'{msg}\n'
) )
return self.decode_pld( return self.decode_pld(
@ -304,10 +307,13 @@ class PldRx(Struct):
try: try:
pld: PayloadT = self._pld_dec.decode(pld) pld: PayloadT = self._pld_dec.decode(pld)
log.runtime( log.runtime(
'Decoded msg payload\n\n' f'Decoded payload for\n'
# f'\n'
f'{msg}\n' f'{msg}\n'
f'where payload decoded as\n' # ^TODO?, ideally just render with `,
f'|_pld={pld!r}\n' # pld={decode}` in the `msg.pformat()`??
f'where, '
f'{type(msg).__name__}.pld={pld!r}\n'
) )
return pld return pld
except TypeError as typerr: except TypeError as typerr:
@ -494,7 +500,8 @@ def limit_plds(
finally: finally:
log.runtime( log.runtime(
'Reverted to previous payload-decoder\n\n' f'Reverted to previous payload-decoder\n'
f'\n'
f'{orig_pldec}\n' f'{orig_pldec}\n'
) )
# sanity on orig settings # sanity on orig settings
@ -629,7 +636,8 @@ async def drain_to_final_msg(
(local_cs := rent_n.cancel_scope).cancel_called (local_cs := rent_n.cancel_scope).cancel_called
): ):
log.cancel( log.cancel(
'RPC-ctx cancelled by local-parent scope during drain!\n\n' f'RPC-ctx cancelled by local-parent scope during drain!\n'
f'\n'
f'c}}>\n' f'c}}>\n'
f' |_{rent_n}\n' f' |_{rent_n}\n'
f' |_.cancel_scope = {local_cs}\n' f' |_.cancel_scope = {local_cs}\n'
@ -663,7 +671,8 @@ async def drain_to_final_msg(
# final result arrived! # final result arrived!
case Return(): case Return():
log.runtime( log.runtime(
'Context delivered final draining msg:\n' f'Context delivered final draining msg\n'
f'\n'
f'{pretty_struct.pformat(msg)}' f'{pretty_struct.pformat(msg)}'
) )
ctx._result: Any = pld ctx._result: Any = pld
@ -697,12 +706,14 @@ async def drain_to_final_msg(
): ):
log.cancel( log.cancel(
'Cancelling `MsgStream` drain since ' 'Cancelling `MsgStream` drain since '
f'{reason}\n\n' f'{reason}\n'
f'\n'
f'<= {ctx.chan.uid}\n' f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n' f' |_{ctx._nsf}()\n'
f'\n'
f'=> {ctx._task}\n' f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n' f' |_{ctx._stream}\n'
f'\n'
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
break break
@ -739,7 +750,8 @@ async def drain_to_final_msg(
case Stop(): case Stop():
pre_result_drained.append(msg) pre_result_drained.append(msg)
log.runtime( # normal/expected shutdown transaction log.runtime( # normal/expected shutdown transaction
'Remote stream terminated due to "stop" msg:\n\n' f'Remote stream terminated due to "stop" msg\n'
f'\n'
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
continue continue
@ -814,7 +826,8 @@ async def drain_to_final_msg(
else: else:
log.cancel( log.cancel(
'Skipping `MsgStream` drain since final outcome is set\n\n' f'Skipping `MsgStream` drain since final outcome is set\n'
f'\n'
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )

View File

@ -154,6 +154,39 @@ class Aid(
# should also include at least `.pid` (equiv to port for tcp) # should also include at least `.pid` (equiv to port for tcp)
# and/or host-part always? # and/or host-part always?
@property
def uid(self) -> tuple[str, str]:
'''
Legacy actor "unique-id" pair format.
'''
return (
self.name,
self.uuid,
)
def reprol(
self,
sin_uuid: bool = True,
) -> str:
if not sin_uuid:
return (
f'{self.name}[{self.uuid[:6]}]@{self.pid!r}'
)
return (
f'{self.name}@{self.pid!r}'
)
# mk hashable via `.uuid`
def __hash__(self) -> int:
return hash(self.uuid)
def __eq__(self, other: Aid) -> bool:
return self.uuid == other.uuid
# use pretty fmt since often repr-ed for console/log
__repr__ = pretty_struct.Struct.__repr__
class SpawnSpec( class SpawnSpec(
pretty_struct.Struct, pretty_struct.Struct,

View File

@ -38,7 +38,6 @@ from typing import (
import tractor import tractor
from tractor._exceptions import ( from tractor._exceptions import (
InternalError, InternalError,
is_multi_cancelled,
TrioTaskExited, TrioTaskExited,
TrioCancelled, TrioCancelled,
AsyncioTaskExited, AsyncioTaskExited,
@ -59,6 +58,9 @@ from tractor.log import (
# from tractor.msg import ( # from tractor.msg import (
# pretty_struct, # pretty_struct,
# ) # )
from tractor.trionics import (
is_multi_cancelled,
)
from tractor.trionics._broadcast import ( from tractor.trionics._broadcast import (
broadcast_receiver, broadcast_receiver,
BroadcastReceiver, BroadcastReceiver,

View File

@ -32,4 +32,8 @@ from ._broadcast import (
from ._beg import ( from ._beg import (
collapse_eg as collapse_eg, collapse_eg as collapse_eg,
maybe_collapse_eg as maybe_collapse_eg, maybe_collapse_eg as maybe_collapse_eg,
is_multi_cancelled as is_multi_cancelled,
)
from ._taskc import (
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
) )

View File

@ -22,11 +22,16 @@ first-class-`trio` from a historical perspective B)
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from typing import (
Literal,
)
import trio
def maybe_collapse_eg( def maybe_collapse_eg(
beg: BaseExceptionGroup, beg: BaseExceptionGroup,
) -> BaseException: ) -> BaseException|bool:
''' '''
If the input beg can collapse to a single non-eg sub-exception, If the input beg can collapse to a single non-eg sub-exception,
return it instead. return it instead.
@ -35,11 +40,13 @@ def maybe_collapse_eg(
if len(excs := beg.exceptions) == 1: if len(excs := beg.exceptions) == 1:
return excs[0] return excs[0]
return beg return False
@acm @acm
async def collapse_eg(): async def collapse_eg(
hide_tb: bool = True,
):
''' '''
If `BaseExceptionGroup` raised in the body scope is If `BaseExceptionGroup` raised in the body scope is
"collapse-able" (in the same way that "collapse-able" (in the same way that
@ -47,12 +54,75 @@ async def collapse_eg():
only raise the lone emedded non-eg in in place. only raise the lone emedded non-eg in in place.
''' '''
__tracebackhide__: bool = hide_tb
try: try:
yield yield
except* BaseException as beg: except* BaseException as beg:
if ( if (
exc := maybe_collapse_eg(beg) exc := maybe_collapse_eg(beg)
) is not beg: ):
if cause := exc.__cause__:
raise exc from cause
raise exc raise exc
raise beg raise beg
def is_multi_cancelled(
beg: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> Literal[False]|BaseExceptionGroup:
'''
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled not in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(beg, BaseExceptionGroup):
# https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
# |_ "The condition can be an exception type or tuple of
# exception types, in which case each exception is checked
# for a match using the same check that is used in an
# except clause. The condition can also be a callable
# (other than a type object) that accepts an exception as
# its single argument and returns true for the exceptions
# that should be in the subgroup."
matched_exc: BaseExceptionGroup|None = beg.subgroup(
tuple(ignore_nested),
# ??TODO, complain about why not allowed to use
# named arg style calling???
# XD .. wtf?
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False

View File

@ -40,6 +40,8 @@ from typing import (
import trio import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
from ._beg import collapse_eg
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor import ActorNursery from tractor import ActorNursery
@ -112,17 +114,19 @@ async def gather_contexts(
None, None,
]: ]:
''' '''
Concurrently enter a sequence of async context managers (acms), Concurrently enter a sequence of async context managers (`acm`s),
each from a separate `trio` task and deliver the unwrapped each scheduled in a separate `trio.Task` and deliver their
`yield`-ed values in the same order once all managers have entered. unwrapped `yield`-ed values in the same order once all `@acm`s
in every task have entered.
On exit, all acms are subsequently and concurrently exited. On exit, all `acm`s are subsequently and concurrently exited with
**no order guarantees**.
This function is somewhat similar to a batch of non-blocking This function is somewhat similar to a batch of non-blocking
calls to `contextlib.AsyncExitStack.enter_async_context()` calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the (inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both `.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation just works*(R). concurrently entered and exited and *cancellation-just-works*.
''' '''
seed: int = id(mngrs) seed: int = id(mngrs)
@ -142,16 +146,15 @@ async def gather_contexts(
if not mngrs: if not mngrs:
raise ValueError( raise ValueError(
'`.trionics.gather_contexts()` input mngrs is empty?\n' '`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n'
'Did try to use inline generator syntax?\n' 'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence type intead!' 'Use a non-lazy iterator or sequence-type intead!\n'
) )
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? soo roll our own then ?? trio.open_nursery() as tn,
# -> since we kinda want the "if only one `.exception` then ):
# just raise that" interface?
) as tn:
for mngr in mngrs: for mngr in mngrs:
tn.start_soon( tn.start_soon(
_enter_and_wait, _enter_and_wait,
@ -168,7 +171,7 @@ async def gather_contexts(
try: try:
yield tuple(unwrapped.values()) yield tuple(unwrapped.values())
finally: finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug: # the following wacky bug:
# <tractorbugurlhere> # <tractorbugurlhere>
parent_exit.set() parent_exit.set()

View File

@ -0,0 +1,185 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`trio.Task` cancellation helpers, extensions and "holsters".
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from typing import TYPE_CHECKING
import trio
from tractor.log import get_logger
log = get_logger(__name__)
if TYPE_CHECKING:
from tractor.devx.debug import BoxedMaybeException
def find_masked_excs(
maybe_masker: BaseException,
unmask_from: set[BaseException],
) -> BaseException|None:
''''
Deliver any `maybe_masker.__context__` provided
it a declared masking exc-type entry in `unmask_from`.
'''
if (
type(maybe_masker) in unmask_from
and
(exc_ctx := maybe_masker.__context__)
# TODO? what about any cases where
# they could be the same type but not same instance?
# |_i.e. a cancel masking a cancel ??
# or (
# exc_ctx is not maybe_masker
# )
):
return exc_ctx
return None
# XXX, relevant ish discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455#issuecomment-2785122216
#
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None,
unmask_from: (
BaseException|
tuple[BaseException]
) = (trio.Cancelled,),
raise_unmasked: bool = True,
extra_note: str = (
'This can occurr when,\n'
' - a `trio.Nursery` scope embeds a `finally:`-block '
'which executes a checkpoint!'
#
# ^TODO? other cases?
),
always_warn_on: tuple[BaseException] = (
trio.Cancelled,
),
# ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc
# is always expected to be consumed.
) -> BoxedMaybeException:
'''
Maybe un-mask and re-raise exception(s) suppressed by a known
error-used-as-signal type (cough namely `trio.Cancelled`).
Though this unmasker targets cancelleds, it can be used more
generally to capture and unwrap masked excs detected as
`.__context__` values which were suppressed by any error type
passed in `unmask_from`.
-------------
STILL-TODO ??
-------------
-[ ] support for egs which have multiple masked entries in
`maybe_eg.exceptions`, in which case we should unmask the
individual sub-excs but maintain the eg-parent's form right?
'''
from tractor.devx.debug import (
BoxedMaybeException,
pause,
)
boxed_maybe_exc = BoxedMaybeException(
raise_on_exit=raise_unmasked,
)
matching: list[BaseException]|None = None
maybe_eg: ExceptionGroup|None
maybe_eg: ExceptionGroup|None
if tn:
try: # handle egs
yield boxed_maybe_exc
return
except* unmask_from as _maybe_eg:
maybe_eg = _maybe_eg
matches: ExceptionGroup
matches, _ = maybe_eg.split(
unmask_from
)
if not matches:
raise
matching: list[BaseException] = matches.exceptions
else:
try: # handle non-egs
yield boxed_maybe_exc
return
except unmask_from as _maybe_exc:
maybe_exc = _maybe_exc
matching: list[BaseException] = [
maybe_exc
]
# XXX, only unmask-ed for debuggin!
# TODO, remove eventually..
except BaseException as _berr:
berr = _berr
await pause(shield=True)
raise berr
if matching is None:
raise
masked: list[tuple[BaseException, BaseException]] = []
for exc_match in matching:
if exc_ctx := find_masked_excs(
maybe_masker=exc_match,
unmask_from={unmask_from},
):
masked.append((exc_ctx, exc_match))
boxed_maybe_exc.value = exc_match
note: str = (
f'\n'
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
)
if extra_note:
note += (
f'\n'
f'{extra_note}\n'
)
exc_ctx.add_note(note)
if type(exc_match) in always_warn_on:
log.warning(note)
# await tractor.pause(shield=True)
if raise_unmasked:
if len(masked) < 2:
raise exc_ctx from exc_match
else:
# ?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
await pause(shield=True)
else:
raise