Compare commits

...

67 Commits

Author SHA1 Message Date
Tyler Goodlet c91373148a Comment-tag pause points in `asycnio_bp.py`
Thought i already did this but, obvi needed these to make the expect
matches pass in our test.
2025-03-27 13:24:25 -04:00
Tyler Goodlet f1af87007e Add equiv of `AsyncioCancelled` for aio side
Such that a `TrioCancelled` is raised in the aio task via
`.set_exception()` to explicitly indicate and allow that task to handle
a taskc request from the parent `trio.Task`.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 13adaa110a Drop `asyncio`-canc error from `._exceptions` 2025-03-27 13:24:25 -04:00
Tyler Goodlet 9e10064bda Continue supporting py3.11+
Apparently the only thing needing a guard was use of
`asyncio.Queue.shutdown()` and the paired `QueueShutDown` exception?

Cool.
2025-03-27 13:24:25 -04:00
Tyler Goodlet bde355dcd5 Fix an `aio_err` ref bug 2025-03-27 13:24:25 -04:00
Tyler Goodlet b021772a1e Mask ctlc borked REPL tests
Namely the `tractor.pause_from_sync()` examples using both bg threads
and `asyncio` which seem to go into bad states where SIGINT is ignored..

Deats,
- add `maybe_expect_timeout()` cm to ensure the EOF hangs get
  `.xfail()`ed instead.
- @pytest.mark.ctlcs_bish` `test_pause_from_sync` and don't expect the
  greenback prompt msg.
- also mark `test_sync_pause_from_aio_task`.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 03406e020c Repair/update `stackscope` test
Seems that on 3.13 it's not showing our script code in the output now?
Gotta get an example for @oremanj to see what's up but really it'd be
nice to just custom format stuff above `trio`'s runtime by def..

Anyway, update the `.devx._stackscope`,
- log formatting to be a little more "sclangy" lookin.
- change the per-actor "delimiter" lines style.
- report the `signal.getsignal(SIGINT)` which i needed in the
  `sync_bp.py` with ctl-c causing a hang..
- mask the `_tree_dumped` duplicator log report as well as the "dumped
  fine" one.
- add an example `pkill --signal SIGUSR1` cmdline.

Tweak the test to cope with,
- not showing our script lines now.. which i've commented in the
  `assert_before()` patts..
- to expect the newly formatted delimiter (ascii) lines to separate the
  root vs. hanger sub-actor sections.
2025-03-27 13:24:25 -04:00
Tyler Goodlet b0acc9ffe8 Add a mark to `pytest.xfail()` questionably conc py stuff (ur mam `.xfail()`s bish!) 2025-03-27 13:24:25 -04:00
Tyler Goodlet fc325a621b Be extra sure to re-raise EoCs from translator
That is whenever `trio.EndOfChannel` is raised (presumably from the
`._to_trio.receive()` call inside `LinkedTaskChannel.receive()`) we need
to be extra certain that we let it bubble upward transparently DESPITE
special exc-as-signal handling that is normally suppressed from the aio
side; REPEAT we want to ALWAYS bubble any `trio_err ==
trio.EndOfChannel` in the `finally:` handler of `translate_aio_errors()`
despite `chan._trio_to_raise == AsyncioTaskExited` such that the
caller's iterable machinery will operate as normal when the inter-task
stream is stopped (again, presumably by the aio side task terminating
the inter-task stream).

Main impl deats for this,
- in the EoC handler block ensure we assign both `chan._trio_err` and
  the local `trio_err` as well as continue to re-raise.
- add a case to the match block in the `finally:` handler which FOR SURE
  re-raises any `type(trio_err) is EndOfChannel`!

Additionally fix a bad bug,
- a ref bug where we were NOT using the
  `except BaseException as _trio_err` to assign to `chan._trio_err` (by
  accident was missing the leading `_`..)

Unrelated impl tweak,
- move all `maybe_raise_aio_side_err()` content back to inline with its
  parent func - makes it easier to use `tractor.pause()` mostly Bp
- go back to trying to use `aio_task.set_exception(aio_taskc)` for now
  even though i'm pretty sure we're going to move to a try-fute-first
  style helper for this in the future.

Adjust some tests to match/mk-them-green,
- break from `aio_echo_server()` recv loop on
  `to_asyncio.TrioTaskExited` much like how you'd expect to (implicitly
  with a `for`) with a `trio.EndOfChannel`.
- toss in a masked `value is None` pause point i needed for debugging
  inf looping caused by not re-raising EoCs per the main patch
  description.
- add a debug-mode sized delay to root-infected test.
2025-03-27 13:24:25 -04:00
Tyler Goodlet d5ba9be3a9 More `debug_mode` test support, better nursery var names 2025-03-27 13:24:25 -04:00
Tyler Goodlet 639186aa37 Add per-side graceful-exit/cancel excs-as-signals
Such that any combination of task terminations/exits can be explicitly
handled and "dual side independent" crash cases re-raised in egs.

The main error-or-exit impl changes include,

- use of new per-side "signaling exceptions":
  - TrioTaskExited|TrioCancelled for signalling aio.
  - AsyncioTaskExited|AsyncioCancelled for signalling trio.

- NOT overloading the `LinkedTaskChannel._trio/aio_err` fields for
  err-as-signal relay and instead add a new pair of
  `._trio/aio_to_raise` maybe-exc-attrs which allow each side's
  task to specify what it would want the other side to raise to signal
  its/a termination outcome:
  - `._trio_to_raise: AsyncioTaskExited|AsyncioCancelled` to signal,
    |_ the aio task having returned while the trio side was still reading
       from the `asyncio.Queue` or is just not `.done()`.
    |_ the aio task being self or trio-request cancelled where
       a `asyncio.CancelledError` is raised and caught but NOT relayed
       as is back to trio; instead signal a "more explicit" exc type.
  - `._aio_to_raise: TrioTaskExited|TrioCancelled` to signal,
    |_ the trio task having returned while the aio side was still reading
       from the mem chan and indicating that the trio side might not
       care any more about future streamed values (like the
       `Stop/EndOfChannel` equivs for ipc `Context`s).
    |_ when the trio task canceld we do
        a `asyncio.Future.set_exception(TrioTaskExited())` to indicate
        to the aio side verbosely that it should cancel due to the trio
        parent.
  - `_aio/trio_err` are now left to only capturing the **actual**
    per-side task excs for introspection / other side's handling logic.

- supporting "graceful exits" depending on API in use from
  `translate_aio_errors()` such that if either side exits but the other
  side isn't expect to consume the final `return`ed value, we just exit
  silently, which required:
  - adding a `suppress_graceful_exits: bool` flag.
  - adjusting the `maybe_raise_aio_side_err()` logic to use that flag
    and suppress only on certain combos of `._trio_to_raise/._trio_err`.
  - prefer to raise `._trio_to_raise` when the aio-side is the src and
    vice versa.

- filling out pedantic logging for cancellation cases indicating which
  side is the cause.

- add a `LinkedTaskChannel._aio_result` modelled after our
  `Context._result` a a similar `.wait_for_result()` interface which
  allows maybe accessing the aio task's final return value if desired
  when using the `open_channel_from()` API.

- rename `cancel_trio()` done handler -> `signal_trio_when_done()`

Also some fairly major test suite updates,
- add a `delay: int` producing fixture which delivers a much larger
  timeout whenever `debug_mode` is set so that the REPL can be used
  without a surrounding cancel firing.
- add a new `test_aio_exits_early_relays_AsyncioTaskExited` including
  a paired `exit_early: bool` flag to `push_from_aio_task()`.
- adjust `test_trio_closes_early_causes_aio_checkpoint_raise` to expect
  a `to_asyncio.TrioTaskExited`.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 182218a776 Another `is` fix.. 2025-03-27 13:24:25 -04:00
Tyler Goodlet 6de17a3949 Unset `$PYTHON_COLORS` for test debugger suite..
Since obvi all our `pexpect` patterns aren't going to match with
a heck-ton of terminal color escape sequences in the output XD
2025-03-27 13:24:25 -04:00
Tyler Goodlet 41a3297b9f Tweak some test asserts to better `is` style 2025-03-27 13:24:25 -04:00
Tyler Goodlet 255db4b127 Save an MIA `breakpoint()`-restore test from prior!?
It appears that during the reorg commit
a356233b47 this was intended to be moved
(presumably where i have here) to `test_tooling` but was somehow just
never pasted over XD

Good thing this was caught while going through the remaining TODO
bullets in #2 !!

Also includes fixed relative `.conftest` imports!
2025-03-27 13:24:25 -04:00
Tyler Goodlet 66a7d660f6 Draft test-doc for "out-of-band" `asyncio.Task`..
Since there's no way to activate `greenback`'s portal in such cases, we
should at least have a test verifying our very loud error about the
inability to support this usage..
2025-03-27 13:24:25 -04:00
Tyler Goodlet f199cac5e8 Raise "independent" task errors in an eg
The (rare) condition is heavily detailed in new comments in
the `cancel_trio()` callback but, more or less the idea here is to be
extra pedantic in raising an `Exceptiongroup` of errors from each task
(both `asyncio` and `trio`) whenever the 2 tasks raise "independently"
- in the sense that it's not obviously one side's task causing an error
(or cancellation) in the other. In this case we set the error for each
side on the `LinkedTaskChannel` (via new attrs described later).

As a synopsis, most of this work was refined out of supporting
`infected_aio=True` mode in the **root actor** and in particular as part
of getting that to work inside the `modden` daemon which at the time of
writing was still using the `i3ipc` lib and thus `asyncio`.

Impl deats,
- extend the `LinkedTaskChannel` field/API set (and type it),
  - `._trio_task: trio.Task` for test/user introspection.
- also "stage" some ideas for a more refined interface,
  - `.started()` to deliver the value yielded to the `trio.Task` parent.
   |_ also includes some todos for how to implement this design
      underneath.
  - `._aio_first: Any|None = None` to hold that value ^.
  - `.wait_aio_complete()` for syncing to the asyncio task.
- some detailed logging around "asyncio cancelled trio" case.
- Move `AsyncioCancelled` in this module.

Styling changes,
- generally more explicit var naming.
- some todos for getting modern and fancy with typing..

NB, Let it be known this commit msg was written on a friday with the
help of various "mr. white" solns.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 9b393338ca Add a `tests/test_root_infect_asyncio`
Might as well break apart the specific test set since there are some
(minor) subtleties and the orig test mod is already getting pretty big
XD

Includes both the new "independent"-event-loops test as well as the std
usage base case suite.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 4edf36a895 Impl a proto "unmasker" `@acm` alongside our test
Such that the suite verifies the wip `maybe_raise_from_masking_exc()`
will raise from a `trio.Cancelled.__context__` since I can't think of
any reason a `Cancelled` should ever be raised in-place of
a non-`Cancelled` XD

Not sure what should be raised instead (or maybe just a `log.warning()`
emitted?) but this starts a draft for refinement at the least. Use the
new `@pytest.mark.parametrize` explicit tuple-of-params form with an
`pytest.param + `.mark.xfail()` for the default behaviour case.
2025-03-27 13:24:25 -04:00
Tyler Goodlet bfd1864180 Add a "raise-from-`finally:`" example test
Since i wasted 2 days just to find an example of this inside an `@acm`,
figured I better reproduce for the purposes of maybe implementing
a warning sys (inside our wip proto `open_taskman()`) when a nursery
detects a single `Cancelled` in an eg where the `.__context__` is set to
some non-cancel error (which likely means a cancel-causing source
exception was suppressed by accident).

Left in a buncha commented code using `maybe_open_nursery()` which
i thought might be part of the issue but didn't end up being required;
will likely remove on a follow up refinement.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 3345962253 Yield a boxed-maybe-error from `open_crash_handler()`
Along the lines of something like `pytest.raises()` where the handled
exception can be inspected from the `pdbp` REPL using its `.value` field
B)

This is super handy in particular for understanding
`BaseException[Group]`s without manually adding surrounding handler code
to assign the `except[*] Exception as exc_var:` particularly when trying
to understand multi-cancelled eg trees.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 3c8b1aa888 Add an inter-leaved-task error test
Trying to replicate cases where errors are raised in both `trio` and
`asyncio` tasks independently (at least in `.to_asyncio` API terms) with
a new `test_trio_prestarted_task_bubbles` that generates 3 cases inside
a `@acm` calls stack composing a `trio.Nursery` with
a `to_asyncio.open_channel_from()` call where a set of `trio` tasks are
started in a loop using `.start()` with various exc raising sequences,
- the aio task raising *before* the last `trio` task spawns.
- the aio task raising just after the last trio task spawns, but before
  it starts.
- after the last trio task `.start()` call returns control to the
  parent - but (for now) did not error.

TODO, still more cases to discover as i'm still fighting a `modden` bug
of this sort atm..

Other,
- tweak some other tests to have timeouts since some recent hangs were
  found..
- started mucking with py3.13 and thus adjustments for strict egs in
  some tests; full patchset to test suite likely coming soon!
2025-03-27 13:24:25 -04:00
Tyler Goodlet d4f1a02f43 Hm, `asyncio.Task._fut_waiter.set_exception()`?
Since we can't use it to `Task.set_exception()` (since that task method never
seems to work.. XD) and setting the private/internal always seems to do
the desired raising in the task? I realize it's an internal `asyncio`
runtime field but i'd rather take the risk of it breaking then having to
rely on our own equivalent hack..

Also, it seems like the case where the task's associated (and internal)
future-waiter field is null, we won't run into the (same?) prior hanging
issues (maybe since there's nothing for `asyncio` internals to use to
wait XD ??) when `Task.cancel()` is used..??

Main deats,
- add and `Future.set_exception()` a new signal-exception
  `class TrioTaskExited(AsyncioCancelled):` whenever the trio-task exits
  gracefully and the asyncio-side task is still doing blocking work (of
  some sort) which *seem to* be predicated by a check that
  `._fut_waiter is not None`.
- always call `asyncio.Queue.shutdown()` for the same^ as well as
  whenever we decide to call `Task.cancel()`; in that case the shutdown
  relays correctly?

Some further refinements,
- only warn about `Task.cancel()` usage when actually used ;)
- more local scope vars setting in the exit phase of
  `translate_aio_errors()`.
- also in ^ use explicit caught-exc var names for each error-type.
2025-03-27 13:24:25 -04:00
Tyler Goodlet c5291b7f33 Much more limited `asyncio.Task.cancel()` use
Since it can not only cause the guest-mode run to abandon but also in
some edge cases prevent `trio`-errors from propagating (at least on
py3.12-13?) as discovered as part of supporting this mode officially
in the *root actor*.

As such try to avoid that method as much as possible instead opting to
pass the `trio`-side error via the iter-task channel ref.

Deats,
- add a `LinkedTaskChannel._trio_err: BaseException|None` which gets set
  whenver the `trio.Task` error is caught; ONLY set `AsyncioCancelled`
  when the `trio` task was for sure the cause, whether itself cancelled
  or errored.
- always check for this error when exiting the `asyncio` side (even when
  terminated via a call to `asyncio.Task.cancel()` or during any other
  `CancelledError` handling such that the `asyncio`-task can expect to
  handle `AsyncioCancelled` due to the above^^ cases.
- never `cs.cancel()` the `trio` side unless that cancel scope has not
  yet been `.cancel_called` whatsoever; it's a noop anyway.
- only raise any exc from `asyncio.Task.result()` when `chan._aio_err`
  does not already match it since the existence of the pre-existing
  `task_err` means `asyncio` prolly intends (or has already) raised and
  interrupted the task elsewhere.

Various supporting tweaks,
- don't bother maybe-init-ing `greenback` from the actor entrypoint
  since we already need to (and do) bestow the portals to each `asyncio`
  task spawned using the `run_task()`/`open_channel_from()` API; further
  the init-ing should be done already by client code that enables
  infected mode (even in the root actor).
 |_we should prolly also codify it from any
   `run_daemon(infected_aio=True, debug_mode=True)` usage we offer.
- pass all the `_<field>`s to `Linked TaskChannel` explicitly in named
  kwarg style.
- better sclang-style log reports throughout, particularly on teardowns.
- generally more/better comments and docs around (not well understood)
  edge cases.
- prep to just inline `maybe_raise_aio_side_err()` closure..
2025-03-27 13:24:25 -04:00
Tyler Goodlet 8f0ca44b79 Expose `debug_filter` from `open_root_actor()` also
Such that actor-runtime graceful cancel handling can be used throughout
any process tree.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 2fd9c0044b Drop extra nl from boxed error fmt 2025-03-27 13:24:25 -04:00
Tyler Goodlet 79f4197d26 Raise explicitly on missing `greenback` portal
When `.pause_from_sync()` is called from an `asyncio.Task` which was
never bestowed a portal we want to be mega pedantic about it; indicate
that the task was NOT spawned from our `.to_asyncio` API and likely by
some out-of-our-control code (normally using
`asyncio.ensure_future()/.create_task()`). Though `greenback` already
errors on such usage, it's not always clear why no portal exists;
explaining the situation of a 3rd-party-bg-spawned-task should avoid
dev confusion for most cases.

Impl deats,
- distinguish between an actor in infected mode versus the actual caller
  of `.pause_from_sync()` being an `asyncio.Task` with more explicit
  `asyncio_task` and `is_infected_aio` vars.
- ONLY in the case of being both an infected-mode-actor AND detecting
  that the caller is an `asyncio.Task`, check `greenback.has_portal()`
  such that when not bestowed we presume the aforementioned
  3rd-party-bg-task case above and raise a new explicit RTE with
  a detailed explanatory message.
- add some masked draft code for handling the speical case of a root
  actor `asyncio.Task` caller which could (in theory) not actually
  require gb portal use since the `Lock` can be acquired directly
  without IPC.
 |_this will likely require factoring of various pause machinery funcs
   into a `_pause_from_root_task()` to mk the impl sane XD

Other,
- expose a new `debug_filter: Callable` which can be provided by the
  caller of `_maybe_enter_pm()` to predicate whether to enter the
  debugger REPL based on the caught `BaseException|BaseExceptionGroup`;
  this is handy for customizing the meaning of "graceful cancellations"
  so as to avoid crash handling on expected egs of more then
  `trioCancelled`.
|_ make the default as it was implemented: `not is_multi_cancelled(err)`
- pass-through a new `ignore: set[BaseException]` as
  `open_crash_handler(ignore_nested=ignore)` to allow for the same
  silent-cancellation-egs-swallowing as desired from outside the actor
  runtime.
2025-03-27 13:24:25 -04:00
Tyler Goodlet b71d96fdee Accept err-type override in `is_multi_cancelled()`
Such that equivalents of `trio.Cancelled` from other runtimes such as
`asyncio.CancelledError` and `subprocess.CalledProcessError` (with
a `.returncode == -2`) can be gracefully ignored as needed by the
caller.

For example this is handy if you want to avoid debug-mode REPL entry on
an exception-group full of only some subset of exception types since you
expect certain tasks to raise such errors after having been cancelled by
a request from some parent supervision sys (some "higher up"
`trio.CancelScope`, a remote triggered `ContextCancelled` or just from
and OS SIGINT).

Impl deats,
- offer a new `ignore_nested: set[BaseException]` param which by
  default we add `trio.Cancelled` to when no other types are provided.
- use `ExceptionGroup.subgroup(tuple(ignore_nested)` to filter to egs of
  the "ignored sub-errors set" and return any such match (instead of
  `True`).
- detail a comment on exclusion case.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 4a8e1f56ae Support passing pre-conf-ed `Logger`
Such that we can hook into 3rd-party-libs more easily to monkey them and
use our (prettier/hipper) console logging with something like (an
example from the client project `modden`),

```python
    connection_mod = i3ipc.connection
    tractor_style_i3ipc_logger: logging.LoggingAdapter = tractor.log.get_console_log(
        _root_name=connection_mod.__name__,
        logger=i3ipc.connection_mod.logger,
        level='info',
    )
    # monkey the instance-ref in 3rd-party module
    connection_mod.logger = our_logger
```

Impl deats,
- expose as `get_console_log(logger: logging.Logger)` and add default
  failover logic.
- toss in more typing, also for mod-global instance.
2025-03-27 13:24:25 -04:00
Tyler Goodlet a283d8c05a Support and test infected-`asyncio`-mode for root
Such that you can use,

```python

    tractor.to_asyncio.run_as_asyncio_guest(
        trio_main=_trio_main,
    )
```

to boostrap the root actor (and thus main parent process) to embed
the actor-rumtime into an `asyncio` loop. Prove it all works with an
subactor-free version of the aio echo-server test suite B)
2025-03-27 13:24:25 -04:00
Tyler Goodlet c2bbb7e259 TOSQUASH: 9002f60 howtorelease.md file 2025-03-27 13:24:25 -04:00
Tyler Goodlet 2764d82c1a Draft a (pretty)`Struct.fields_diff()`
For comparing a `msgspec.Struct` against an input `dict` presumably to
be used as input for struct instantiation. The main diff with
`.__sub__()` is that non-existing fields on either are reported
(loudly).
2025-03-27 13:24:25 -04:00
Tyler Goodlet 824801d2ba Spitballing how to expose custom `msgspec` type hooks
Such that maybe we can eventually offer a nicer higher-level API which
implements much of the boilerplate required by `msgspec` (like
type-matched branching to serialization logic) via a type-table
interface or something?

Not sure if the idea is that useful so leaving it all as TODOs for now
obviously.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 0fe6f63012 Add `notes_to_self/howtorelease.md` reminder doc 2025-03-27 13:24:25 -04:00
Tyler Goodlet 8d190bb505 Add TODO for a runtime-vars passing mechanism 2025-03-27 13:24:25 -04:00
Tyler Goodlet 514fb1a4ac Change masked `.pause()` line 2025-03-27 13:24:25 -04:00
Tyler Goodlet 684253ab11 Type the inter-loop chans 2025-03-27 13:24:25 -04:00
Tyler Goodlet 9af2a4e739 Add TODO for a tb frame "filterer" sys.. 2025-03-27 13:24:25 -04:00
Tyler Goodlet 141a842d3d Set `RemoteActorError.pformat(boxer_header=self.relay_uid)` by def 2025-03-27 13:24:25 -04:00
Tyler Goodlet 61c5613943 Support custom `boxer_header: str` provided by `pformat_boxed_tb()` caller 2025-03-27 13:24:25 -04:00
Tyler Goodlet 5b29dd5d2b Expose a `_ctlc_ignore_header: str` for use in `sigint_shield()` 2025-03-27 13:24:25 -04:00
Tyler Goodlet a58c1cad91 Change `tractor.breakpoint()` to new `.pause()` in test suite 2025-03-27 13:24:25 -04:00
Tyler Goodlet e1d96099fc Wrap `asyncio_bp.py` ex into test suite
Ensuring we can at least use `breakpoint()` from an infected actor's
`asyncio.Task` spawned via a `.to_asyncio` API.

Also includes a little `tests/devx/` reorging,
- start splitting out non-`tractor.pause()` tests into a new
  `test_pause_from_non_trio.py` for all the `.pause_from_sync()`
  use in bg-threaded or `asyncio` applications.
- factor harness commonalities to the `devx/conftest` (namely
  the `do_ctlc()` masher).
- mv `test_pause_from_sync` to the new non`-trio` mod.

NOTE, the `ctlc=True` is still failing for
`test_pause_from_asyncio_task` which is a user-happiness bug but not
anything fundamentally broken - just need to handle the `asyncio` case
in `.devx._debug.sigint_shield()`!
2025-03-27 13:24:25 -04:00
Tyler Goodlet ccd60b0c6e Add `breakpoint()` hook restoration example + test 2025-03-27 13:24:25 -04:00
Tyler Goodlet c1c93e08a2 Rename `n: trio.Nursery` -> `tn` (task nursery) 2025-03-27 13:24:25 -04:00
Tyler Goodlet bb60a6d623 Messy-teardown `DebugStatus` related fixes
Mostly fixing edge cases with `asyncio` and/or bg threads where the
`.repl_release: trio.Event` needs to be used from the main `trio`
thread OW confusing-but-valid teardown tracebacks can show under various
races.

Also improve,
- log reporting for such internal bugs to make them more obvious on
  console via `log.exception()`.
- only restore the SIGINT handler when runtime is (still) active.
- reporting when `tractor.pause(shield=True)` should be used and
  unhiding the internal frames from the tb in that case.
- for `pause_from_sync()` some deep fixes..
 |_add a `allow_no_runtime: bool = False` flag to allow
   **not** requiring the actor runtime to be active.
 |_fix the `greenback` case-branch to only trigger on `not
   is_trio_thread`.
 |_add a scope-global `repl_owner: Task|Thread|None = None` to
   avoid ref errors..
2025-03-27 13:24:25 -04:00
Tyler Goodlet 6ef06be6d0 More `.pause_from_sync()` in bg-threads "polish"
Various `try`/`except` blocks around external APIs that raise when not
running inside an `tractor` and/or some async framework (mostly to avoid
too-late/benign error tbs on certain classes of actor tree teardown):
- for the `log.pdb()` prompts emitted before REPL console entry.
- inside `DebugStatus.is_main_trio_thread()`'s call to `sniffio`.
- in `_post_mortem()` by catching `NoRuntime` when called from a thread
  still active after the `.open_root_actor()` has already exited.

Also,
- create a dedicated `DebugStateError` for raising instead of `assert`s
  when we have actual debug-request inconsistencies (as seem to be most
  likely with bg thread usage of `breakpoint()`).
- show the `open_crash_handler()` frame on `bdb.BdbQuit` (for now?)
2025-03-27 13:24:25 -04:00
Tyler Goodlet f8222356ce Hide `[maybe]_open_crash_handler()` frame by default 2025-03-27 13:24:25 -04:00
Tyler Goodlet 4b9d638be9 Use our `._post_mortem` from `open_crash_handler()`
Since it seems that `pdbp.xpm()` can sometimes lose the up-stack
traceback info/frames? Not sure why but ours seems to work just fine
from a `asyncio`-handler in `modden`'s use of `i3ipc` B)

Also call `DebugStatus.shield_sigint()` from `pause_from_sync()` in the
infected-`asyncio` case to get the same shielding behaviour as in all
other usage!
2025-03-27 13:24:25 -04:00
Tyler Goodlet 35ebc087dd Drop `asyncio_bp` loglevel setting by default 2025-03-27 13:24:25 -04:00
Tyler Goodlet 6b18fcd437 First draft, `asyncio`-task, sync-pausing Bo
Mostly due to magic from @oremanj where we slap in a little bit of
`.from_asyncio`-type stuff to run a `trio`-task from `asyncio.Task`
code!

I'm not gonna go into tooo too much detail but basically the primary
thing needed was a way to (blocking-ly) invoke a `trio.lowlevel.Task`
from an `asyncio` one (which we now have with a new
`run_trio_task_in_future()` thanks to draft code from the aforementioned
jefe) which we now invoke from a dedicated aio case-branch inside
`.devx._debug.pause_from_sync()`. Further include a case inside
`DebugStatus.release()` to handle using the same func to set the
`repl_release: trio.Event` from the aio side when releasing the REPL on
exit cmds.

Prolly more refinements to come ;{o
2025-03-27 13:24:25 -04:00
Tyler Goodlet 00d1c8ea29 Fix multi-daemon debug test `break` signal..
It was expecting `AssertionError` as a proceed-in-test signal (by
breaking from a continue loop), but `in_prompt_msg(raise_on_err=True)`
was changed to raise `ValueError`; so instead just use as a predicate
for the `break`.

Also rework `in_prompt_msg()` to accept the `child: BaseSpawn` as input
instead of `before: str` remove the casting boilerplate, and adjust all
usage to match.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 8da7a1ca36 Use "sclang"-style syntax in `to_asyncio` task logging
Just like we've started doing throughout the rest of the actor runtime
for reporting (and where "sclang" = "structured conc (s)lang", our
little supervision-focused operations syntax i've been playing with in
log msg content).

Further tweaks:
- report the `trio_done_fute` alongside the `main_outcome` value.
- add a todo list for supporting `greenback` for pause points.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 5cdfee3bcf Pass `infect_asyncio` setting via runtime-vars
The reason for this "duplication" with the `--asyncio` CLI flag (passed
to the child during spawn) is 2-fold:
- allows verifying inside `Actor._from_parent()` that the `trio` runtime was
  started via `.start_guest_run()` as well as if the
  `Actor._infected_aio` spawn-entrypoint value has been set (by the
  `._entry.<spawn-backend>_main()` whenever `--asyncio` is passed)
  such that any mismatch can be signaled via an `InternalError`.
- enables checking the `._state._runtime_vars['_is_infected_aio']` value
  directly (say from a non-actor/`trio`-thread) instead of calling
  `._state.current_actor(err_on_no_runtime=False)` in certain edge
  cases.

Impl/testing deats:
- add `._state._runtime_vars['_is_infected_aio'] = False` default.
- raise `InternalError` on any `--asyncio`-flag-passed vs.
  `_runtime_vars`-value-relayed-from-parent inside
  `Actor._from_parent()` and include a `Runner.is_guest` assert for good
  measure B)
- set and relay `infect_asyncio: bool` via runtime-vars to child in
  `ActorNursery.start_actor()`.
- verify `actor.is_infected_aio()`, `actor._infected_aio` and
  `_state._runtime_vars['_is_infected_aio']` are all set in test suite's
  `asyncio_actor()` endpoint.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 64d506970a Officially test proto-ed `stackscope` integration
By re-purposing our `pexpect`-based console matching with a new
`debugging/shield_hang_in_sub.py` example, this tests a few "hanging
actor" conditions more formally:

- that despite a hanging actor's task we can dump
  a `stackscope.extract()` tree on relay of `SIGUSR1`.
- the actor tree will terminate despite a shielded forever-sleep by our
  "T-800" zombie reaper machinery activating and hard killing the
  underlying subprocess.

Some test deats:
- simulates the expect actions of a real user by manually using
  `os.kill()` to send both signals to the actor-tree program.
- `pexpect`-matches against `log.devx()` emissions under normal
  `debug_mode == True` usage.
- ensure we get the actual "T-800 deployed" `log.error()` msg and
  that the actor tree eventually terminates!

Surrounding (re-org/impl/test-suite) changes:
- allow disabling usage via a `maybe_enable_greenback: bool` to
  `open_root_actor()` but enable by def.
- pretty up the actual `.devx()` content from `.devx._stackscope`
  including be extra pedantic about the conc-primitives for each signal
  event.
- try to avoid double handles of `SIGUSR1` even though it seems the
  original (what i thought was a) problem was actually just double
  logging in the handler..
  |_ avoid double applying the handler func via `signal.signal()`,
  |_ use a global to avoid double handle func calls and,
  |_ a `threading.RLock` around handling.
- move common fixtures and helper routines from `test_debugger` to
  `tests/devx/conftest.py` and import them for use in both test mods.
2025-03-27 13:24:25 -04:00
Tyler Goodlet de7b114303 Start a new `tests/devx/` tooling-subsuite-pkg 2025-03-27 13:24:25 -04:00
Tyler Goodlet f195c5ec47 Move `mk_cmd()` to `._testing`
Since we're going to need it more generally for `.devx` sub-sys tooling
tests.

Also, up the sync-pause ctl-c delay another 10ms..
2025-03-27 13:24:25 -04:00
Tyler Goodlet 92713af63e Get multi-threaded sync-pausing fully workin!
The final issue was making sure we do the same thing on ctl-c/SIGINT
from the user. That is, if there's already a bg-thread in REPL, we
`log.pdb()` about SIGINT shielding and re-draw the prompt; the same UX
as normal actor-runtime-task behaviour.

Reasons this wasn't workin.. and the fix:
- `.pause_from_sync()` was overriding the local `repl` var with `None`
  delivered by (transitive) calls to `_pause(debug_func=None)`.. so
  remove all that and only assign it OAOO prior to thread-type case
  branching.
- always call `DebugStatus.shield_sigint()` as needed from all requesting
  threads/tasks:
  - in `_pause_from_bg_root_thread()` BEFORE calling `._pause()` AND BEFORE
    yielding back to the bg-thread via `.started(out)` to ensure we're
    definitely overriding the handler in the `trio`-main-thread task
    before unblocking the requesting bg-thread.
  - from any requesting bg-thread in the root actor such that both its
    main-`trio`-thread scheduled task (as per above bullet) AND it are
    SIGINT shielded.
  - always call `.shield_sigint()` BEFORE any `greenback._await()` case
    don't entirely grok why yet, but it works)?
  - for `greenback._await()` case always set `bg_task` to the current one..
- tweaks to the `SIGINT` handler, now renamed `sigint_shield()` so as
  not to name-collide with the methods when editor-searching:
  - always try to `repr()` the REPL thread/task "owner" as well as the
    active `PdbREPL` instance.
  - add `.devx()` notes around the prompt flushing deats and comments
    for any root-actor-bg-thread edge cases.

Related/supporting refinements:
- add `get_lock()`/`get_debug_req()` factory funcs since the plan is to
  eventually implement both as `@singleton` instances per actor.
- fix `acquire_debug_lock()`'s call-sig-bug for scheduling
  `request_root_stdio_lock()`..
- in `._pause()` only call `mk_pdb()` when `debug_func != None`.
- add some todo/warning notes around the `cls.repl = None` in
  `DebugStatus.release()`

`test_pause_from_sync()` tweaks:
- don't use a `attach_patts.copy()`, since we always `break` on match.
- do `pytest.fail()` on that ^ loop's fallthrough..
- pass `do_ctlc(child, patt=attach_key)` such that we always match the
  the current thread's name with the ctl-c triggered `.pdb()` emission.
- oh yeah, return the last `before: str` from `do_ctlc()`.
- in the script, flip `abandon_on_cancel=True` since when `False` it
  seems to cause `trio.run()` to hang on exit from the last bg-thread
  case?!?
2025-03-27 13:24:25 -04:00
Tyler Goodlet 4a08d586cd Another tweak to REPL entry `.pdb()` headers 2025-03-27 13:24:25 -04:00
Tyler Goodlet 607e1dcf45 More failed REPL-lock-request refinements
In `lock_stdio_for_peer()` better internal-error handling/reporting:
- only `Lock._blocked.remove(ctx.cid)` if that same cid was added on
  entry to avoid needless key-errors.
- drop all `Lock.release(force: bool)` usage remnants.
- if `req_ctx.cancel()` fails mention it with `ctx_err.add_note()`.
- add more explicit internal-failed-request log messaging via a new
  `fail_reason: str`.
- use and use new `x)<=\n|_` annots in any failure logging.

Other cleanups/niceties:
- drop `force: bool` flag entirely from the `Lock.release()`.
- use more supervisor-op-annots in `.pdb()` logging
  with both `_pause/crash_msg: str` instead of double '|' lines when
  `.pdb()`-reported from `._set_trace()`/`._post_mortem()`.
2025-03-27 13:24:25 -04:00
Tyler Goodlet b057a1681c Todo a test for sync-pausing from non-main-root-tasks 2025-03-27 13:24:25 -04:00
Tyler Goodlet 82bee3c55b Use `delay=0` in pump loop..
Turns out it does work XD

Prior presumption was from before I had the fute poll-loop so makes
sense we needed more then one sched-tick's worth of context switch vs.
now we can just keep looping-n-pumping as fast possible until the
guest-run's main task completes.

Also,
- minimize the preface commentary (as per todo) now that we have tests
  codifying all the edge cases :finger_crossed:
- parameter-ize the pump-loop-cycle delay and default it to 0.
2025-03-27 13:24:25 -04:00
Tyler Goodlet 4afab9ca47 Solve our abandonment issues..
To make the recent set of tests pass this (hopefully) finally solves all
`asyncio` embedded `trio` guest-run abandonment by ensuring we "pump the
event loop" until the guest-run future is fully complete.

Accomplished via simple poll loop of the form `while not
trio_done_fut.done(): await asyncio.sleep(.1)` in the `aio_main()`
task's exception teardown sequence. The loop does a naive 10ms
"pump-via-sleep & poll" for the `trio` side to complete before finally
exiting (and presumably raising) from the SIGINT cancellation.

Other related cleanups and refinements:
- use `asyncio.Task.result()` inside `cancel_trio()` since it also
  inline-raises any exception outcome and we can also log-report the
  result in non-error cases.
- comment out buncha not-sure-we-need-it stuff in `cancel_trio()`.
- remove the botched `AsyncioCancelled(CancelledError):` idea obvi XD
- comment `greenback` init for now in `aio_main()` since (pretty sure)
  we don't ever want to actually REPL in that specific func-as-task?
- always capture any `fute_err: BaseException` from the `main_outcome:
  Outcome` delivered by the `trio` side guest-run task.
- add and raise a new super noisy `AsyncioRuntimeTranslationError`
  whenever we detect that the guest-run `trio_done_fut` has not
  completed before task exit; should avoid abandonment issues ever
  happening again without knowing!
2025-03-27 13:24:25 -04:00
Tyler Goodlet 53409f2942 Demo-abandonment on shielded `trio`-side work
Finally this reproduces the issue as it (originally?) exhibited inside
`piker` where the `Actor.lifetime_stack` wasn't closed in cases where
during `infected_aio`-actor cancellation/shutdown `trio` side tasks
which are doing shielded (teardown) work are NOT being watched/waited on
from the `aio_main()` task-closure inside `run_as_asyncio_guest()`!

This is then the root cause of the guest-run being abandoned since if
our `aio_main()` task-closure doesn't know it should allow the run to
finish, it's going to call `loop.close()` eventually resulting in the
`GeneratorExit` thrown into `trio._core._run.unrolled_run()`..

So, this extends the `test_sigint_closes_lifetime_stack()` suite to
include cases for such shielded `trio`-task ops:
- add a new `trio_side_is_shielded: bool` which will toggle whether to
  add a shielded 0.5s `trio.sleep()` loop to `manage_file()` which
  should outlive the `asyncio` event-loop shutdown sequence and result
  in an abandoned guest-run and thus a leaked file.
- parametrize the existing suite with this case resulting in a total 16
  test set B)

This patch demonstrates the problem with our `aio_main()` task-closure
impl via the now 4 failing tests, a fix is coming in a follow up commit!
2025-03-27 13:24:25 -04:00
Tyler Goodlet 7f00921be1 Lel, revert `AsyncioCancelled` inherit, module..
Turns out it somehow breaks our `to_asyncio` error relay since obvi
`asyncio`'s runtime seems to specially handle it (prolly via
`isinstance()` ?) and it caused our
`test_aio_cancelled_from_aio_causes_trio_cancelled()` to hang..
Further, obvi `unpack_error()` won't be able to find the type def if not
kept inside `._exceptions`..

So given all that, revert the change/move as well as:
- tweak the aio-from-aio cancel test to timeout.
- do `trio.sleep()` conc with any bg aio task by moving out nursery
  block.
- add a `send_sigint_to: str` parameter to
  `test_sigint_closes_lifetime_stack()` such that we test the SIGINT
  being relayed to just the parent or the child.
2025-03-27 13:24:25 -04:00
Tyler Goodlet a9b3336318 Hack `asyncio` to not abandon a guest-mode run?
Took me a while to figure out what the heck was going on but, turns out
`asyncio` changed their SIGINT handling in 3.11 as per:

https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption

I'm not entirely sure if it's the 3.11 changes or possibly wtv further
updates were made in 3.12  but more or less due to the way
our current main task was written the `trio` guest-run was getting
abandoned on SIGINTs sent from the OS to the infected child proc..

Note that much of the bug and soln cases are layed out in very detailed
comment-notes both in the new test and `run_as_asyncio_guest()`, right
above the final "fix" lines.

Add new `test_infected_aio.test_sigint_closes_lifetime_stack()` test suite
which reliably triggers all abandonment issues with multiple cases
of different parent behaviour post-sending-SIGINT-to-child:
 1. briefly sleep then raise a KBI in the parent which was originally
    demonstrating the file leak not being cleaned up by `Actor.lifetime_stack.close()`
    and simulates a ctl-c from the console (relayed in tandem by
    the OS to the parent and child processes).
 2. do `Context.wait_for_result()` on the child context which would
    hang and timeout since the actor runtime would never complete and
    thus never relay a `ContextCancelled`.
 3. both with and without running a `asyncio` task in the `manage_file`
    child actor; originally it seemed that with an aio task scheduled in
    the child actor the guest-run abandonment always was the "loud" case
    where there seemed to be some actor teardown but with tbs from
    python failing to gracefully exit the `trio` runtime..

The (seemingly working) "fix" required 2 lines of code to be run inside
a `asyncio.CancelledError` handler around the call to `await trio_done_fut`:
- `Actor.cancel_soon()` which schedules the actor runtime to cancel on
  the next `trio` runner cycle and results in a "self cancellation" of
  the actor.
- "pumping the `asyncio` event loop" with a non-0 `.sleep(0.1)` XD
 |_ seems that a "shielded" pump with some actual `delay: float >= 0`
   did the trick to get `asyncio` to allow the `trio` runner/loop to
   fully complete its guest-run without abandonment.

Other supporting changes:
- move `._exceptions.AsyncioCancelled`, our renamed
  `asyncio.CancelledError` error-sub-type-wrapper, to `.to_asyncio` and make
  it derive from `CancelledError` so as to be sure when raised by our
  `asyncio` x-> `trio` exception relay machinery that `asyncio` is
  getting the specific type it expects during cancellation.
- do "summary status" style logging in `run_as_asyncio_guest()` wherein
  we compile the eventual `startup_msg: str` emitted just before waiting
  on the `trio_done_fut`.
- shield-wait with `out: Outcome = await asyncio.shield(trio_done_fut)`
  even though it seems to do nothing in the SIGINT handling case..(I
  presume it might help avoid abandonment in a `asyncio.Task.cancel()`
  case maybe?)
2025-03-27 13:24:25 -04:00
goodboy 978691c668 Merge pull request 'Rework low-level-runtime to enforce a `msgspec`-defined, SC-supervision-protocol for IPC `Context`s' (#7) from runtime_to_msgspec into main
Reviewed-kinda-on: #7
2025-03-27 02:14:16 +00:00
43 changed files with 4503 additions and 1103 deletions

View File

@ -1,8 +1,16 @@
'''
Examples of using the builtin `breakpoint()` from an `asyncio.Task`
running in a subactor spawned with `infect_asyncio=True`.
'''
import asyncio
import trio
import tractor
from tractor import to_asyncio
from tractor import (
to_asyncio,
Portal,
)
async def aio_sleep_forever():
@ -17,21 +25,21 @@ async def bp_then_error(
) -> None:
# sync with ``trio``-side (caller) task
# sync with `trio`-side (caller) task
to_trio.send_nowait('start')
# NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `._debug._set_trace()` but
# we set `Lock.local_task_in_debug = 'sync'`, we probably want
# some further, at least, meta-data about the task/actoq in debug
# in terms of making it clear it's asyncio mucking about.
breakpoint()
# some further, at least, meta-data about the task/actor in debug
# in terms of making it clear it's `asyncio` mucking about.
breakpoint() # asyncio-side
# short checkpoint / delay
await asyncio.sleep(0.5)
await asyncio.sleep(0.5) # asyncio-side
if raise_after_bp:
raise ValueError('blah')
raise ValueError('asyncio side error!')
# TODO: test case with this so that it gets cancelled?
else:
@ -49,23 +57,21 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first"
# message, see first line in above func.
async with (
to_asyncio.open_channel_from(
bp_then_error,
raise_after_bp=not bp_before_started,
# raise_after_bp=not bp_before_started,
) as (first, chan),
trio.open_nursery() as n,
trio.open_nursery() as tn,
):
assert first == 'start'
if bp_before_started:
await tractor.breakpoint()
await tractor.pause() # trio-side
await ctx.started(first)
await ctx.started(first) # trio-side
n.start_soon(
tn.start_soon(
to_asyncio.run_task,
aio_sleep_forever,
)
@ -73,39 +79,50 @@ async def trio_ctx(
async def main(
bps_all_over: bool = False,
bps_all_over: bool = True,
# TODO, WHICH OF THESE HAZ BUGZ?
cancel_from_root: bool = False,
err_from_root: bool = False,
) -> None:
async with tractor.open_nursery(
# debug_mode=True,
) as n:
p = await n.start_actor(
debug_mode=True,
maybe_enable_greenback=True,
# loglevel='devx',
) as an:
ptl: Portal = await an.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
debug_mode=True,
loglevel='cancel',
# loglevel='cancel',
)
async with p.open_context(
async with ptl.open_context(
trio_ctx,
bp_before_started=bps_all_over,
) as (ctx, first):
assert first == 'start'
if bps_all_over:
await tractor.breakpoint()
# pause in parent to ensure no cross-actor
# locking problems exist!
await tractor.pause() # trio-root
if cancel_from_root:
await ctx.cancel()
if err_from_root:
assert 0
else:
await trio.sleep_forever()
# await trio.sleep_forever()
await ctx.cancel()
assert 0
# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?
# await p.cancel_actor()
# await ptl.cancel_actor()
if __name__ == '__main__':

View File

@ -1,5 +1,5 @@
'''
Fast fail test with a context.
Fast fail test with a `Context`.
Ensure the partially initialized sub-actor process
doesn't cause a hang on error/cancel of the parent

View File

@ -7,7 +7,7 @@ async def breakpoint_forever():
try:
while True:
yield 'yo'
await tractor.breakpoint()
await tractor.pause()
except BaseException:
tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!'
@ -25,7 +25,8 @@ async def main():
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='cancel',
# loglevel='cancel',
# loglevel='devx',
) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -10,7 +10,7 @@ async def name_error():
async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
while True:
await tractor.breakpoint()
await tractor.pause()
# NOTE: if the test never sent 'q'/'quit' commands
# on the pdb repl, without this checkpoint line the

View File

@ -6,7 +6,7 @@ async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
while True:
await trio.sleep(0.1)
await tractor.breakpoint()
await tractor.pause()
async def name_error():

View File

@ -6,19 +6,46 @@ import tractor
async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
# intially unset, no entry.
orig_pybp_var: int = os.environ.get('PYTHONBREAKPOINT')
assert orig_pybp_var in {None, "0"}
async with tractor.open_nursery(
debug_mode=True,
) as an:
assert an
assert (
(pybp_var := os.environ['PYTHONBREAKPOINT'])
==
'tractor.devx._debug._sync_pause_from_builtin'
)
# TODO: an assert that verifies the hook has indeed been, hooked
# XD
assert sys.breakpointhook is not tractor._debug._set_trace
assert (
(pybp_hook := sys.breakpointhook)
is not tractor.devx._debug._set_trace
)
print(
f'$PYTHONOBREAKPOINT: {pybp_var!r}\n'
f'`sys.breakpointhook`: {pybp_hook!r}\n'
)
breakpoint()
pass # first bp, tractor hook set.
# TODO: an assert that verifies the hook is unhooked..
# XXX AFTER EXIT (of actor-runtime) verify the hook is unset..
#
# YES, this is weird but it's how stdlib docs say to do it..
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
assert os.environ.get('PYTHONBREAKPOINT') is orig_pybp_var
assert sys.breakpointhook
# now ensure a regular builtin pause still works
breakpoint()
pass # last bp, stdlib hook restored
if __name__ == '__main__':
trio.run(main)

View File

@ -10,7 +10,7 @@ async def main():
await trio.sleep(0.1)
await tractor.breakpoint()
await tractor.pause()
await trio.sleep(0.1)

View File

@ -11,7 +11,7 @@ async def main(
# loglevel='runtime',
):
while True:
await tractor.breakpoint()
await tractor.pause()
if __name__ == '__main__':

View File

@ -0,0 +1,83 @@
'''
Verify we can dump a `stackscope` tree on a hang.
'''
import os
import signal
import trio
import tractor
@tractor.context
async def start_n_shield_hang(
ctx: tractor.Context,
):
# actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
await ctx.started(os.getpid())
print('Entering shield sleep..')
with trio.CancelScope(shield=True):
await trio.sleep_forever() # in subactor
# XXX NOTE ^^^ since this shields, we expect
# the zombie reaper (aka T800) to engage on
# SIGINT from the user and eventually hard-kill
# this subprocess!
async def main(
from_test: bool = False,
) -> None:
async with (
tractor.open_nursery(
debug_mode=True,
enable_stack_on_sig=True,
# maybe_enable_greenback=False,
loglevel='devx',
) as an,
):
ptl: tractor.Portal = await an.start_actor(
'hanger',
enable_modules=[__name__],
debug_mode=True,
)
async with ptl.open_context(
start_n_shield_hang,
) as (ctx, cpid):
_, proc, _ = an._children[ptl.chan.uid]
assert cpid == proc.pid
print(
'Yo my child hanging..?\n'
# "i'm a user who wants to see a `stackscope` tree!\n"
)
# XXX simulate the wrapping test's "user actions"
# (i.e. if a human didn't run this manually but wants to
# know what they should do to reproduce test behaviour)
if from_test:
print(
f'Sending SIGUSR1 to {cpid!r}!\n'
)
os.kill(
cpid,
signal.SIGUSR1,
)
# simulate user cancelling program
await trio.sleep(0.5)
os.kill(
os.getpid(),
signal.SIGINT,
)
else:
# actually let user send the ctl-c
await trio.sleep_forever() # in root
if __name__ == '__main__':
trio.run(main)

View File

@ -4,9 +4,9 @@ import trio
async def gen():
yield 'yo'
await tractor.breakpoint()
await tractor.pause()
yield 'yo'
await tractor.breakpoint()
await tractor.pause()
@tractor.context
@ -15,7 +15,7 @@ async def just_bp(
) -> None:
await ctx.started()
await tractor.breakpoint()
await tractor.pause()
# TODO: bps and errors in this call..
async for val in gen():

View File

@ -4,6 +4,13 @@ import time
import trio
import tractor
# TODO: only import these when not running from test harness?
# can we detect `pexpect` usage maybe?
# from tractor.devx._debug import (
# get_lock,
# get_debug_req,
# )
def sync_pause(
use_builtin: bool = False,
@ -18,7 +25,13 @@ def sync_pause(
breakpoint(hide_tb=hide_tb)
else:
# TODO: maybe for testing some kind of cm style interface
# where the `._set_trace()` call doesn't happen until block
# exit?
# assert get_lock().ctx_in_debug is None
# assert get_debug_req().repl is None
tractor.pause_from_sync()
# assert get_debug_req().repl is None
if error:
raise RuntimeError('yoyo sync code error')
@ -41,10 +54,11 @@ async def start_n_sync_pause(
async def main() -> None:
async with (
tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True,
# loglevel='cancel',
maybe_enable_greenback=True,
enable_stack_on_sig=True,
# loglevel='warning',
# loglevel='devx',
) as an,
trio.open_nursery() as tn,
):
@ -138,7 +152,9 @@ async def main() -> None:
# the case 2. from above still exists!
use_builtin=True,
),
abandon_on_cancel=False,
# TODO: with this `False` we can hang!??!
# abandon_on_cancel=False,
abandon_on_cancel=True,
thread_name='inline_root_bg_thread',
)

View File

@ -0,0 +1,18 @@
First generate a built disti:
```
python -m pip install --upgrade build
python -m build --sdist --outdir dist/alpha5/
```
Then try a test ``pypi`` upload:
```
python -m twine upload --repository testpypi dist/alpha5/*
```
The push to `pypi` for realz.
```
python -m twine upload --repository testpypi dist/alpha5/*
```

View File

@ -150,6 +150,18 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize("start_method", [spawn_backend], scope='module')
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't registry addr collide!
# @pytest.fixture
# def open_test_runtime(
# reg_addr: tuple,
# ) -> AsyncContextManager:
# return partial(
# tractor.open_nursery,
# registry_addrs=[reg_addr],
# )
def sig_prog(proc, sig):
"Kill the actor-process with ``sig``."
proc.send_signal(sig)

View File

View File

@ -0,0 +1,243 @@
'''
`tractor.devx.*` tooling sub-pkg test space.
'''
import time
from typing import (
Callable,
)
import pytest
from pexpect.exceptions import (
TIMEOUT,
)
from pexpect.spawnbase import SpawnBase
from tractor._testing import (
mk_cmd,
)
from tractor.devx._debug import (
_pause_msg as _pause_msg,
_crash_msg as _crash_msg,
_repl_fail_msg as _repl_fail_msg,
_ctlc_ignore_header as _ctlc_ignore_header,
)
from ..conftest import (
_ci_env,
)
@pytest.fixture
def spawn(
start_method,
testdir: pytest.Pytester,
reg_addr: tuple[str, int],
) -> Callable[[str], None]:
'''
Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name.
'''
if start_method != 'trio':
pytest.skip(
'`pexpect` based tests only supported on `trio` backend'
)
def unset_colors():
'''
Python 3.13 introduced colored tracebacks that break patt
matching,
https://docs.python.org/3/using/cmdline.html#envvar-PYTHON_COLORS
https://docs.python.org/3/using/cmdline.html#using-on-controlling-color
'''
import os
os.environ['PYTHON_COLORS'] = '0'
def _spawn(
cmd: str,
**mkcmd_kwargs,
):
unset_colors()
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
),
expect_timeout=3,
# preexec_fn=unset_colors,
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
)
# such that test-dep can pass input script name.
return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if mark.name == 'ctlcs_bish':
pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
f'The test and/or underlying example script can *sometimes* run fine '
f'locally but more then likely until the cpython peeps get their sh#$ together, '
f'this test will definitely not behave like `trio` under SIGINT..\n'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
def expect(
child,
# normally a `pdb` prompt by default
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
PROMPT = r"\(Pdb\+\)"
def in_prompt_msg(
child: SpawnBase,
parts: list[str],
pause_on_false: bool = False,
err_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
before: str = str(child.before.decode())
for part in parts:
if part not in before:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(before)
if err_on_false:
raise ValueError(
f'Could not find pattern in `before` output?\n'
f'part: {part!r}\n'
)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child: SpawnBase,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
assert in_prompt_msg(
child=child,
parts=patts,
# since this is an "assert" helper ;)
err_on_false=True,
**kwargs
)
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> str|None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
time.sleep(delay)
child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
# return the console content up to the final prompt
return before

View File

@ -13,26 +13,25 @@ TODO:
from functools import partial
import itertools
import platform
import pathlib
import time
import pytest
import pexpect
from pexpect.exceptions import (
TIMEOUT,
EOF,
)
from tractor._testing import (
examples_dir,
)
from tractor.devx._debug import (
from .conftest import (
do_ctlc,
PROMPT,
_pause_msg,
_crash_msg,
_repl_fail_msg,
)
from .conftest import (
_ci_env,
expect,
in_prompt_msg,
assert_before,
)
# TODO: The next great debugger audit could be done by you!
@ -52,15 +51,6 @@ if platform.system() == 'Windows':
)
def mk_cmd(ex_name: str) -> str:
'''
Generate a command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
return ' '.join(['python', str(script_path)])
# TODO: was trying to this xfail style but some weird bug i see in CI
# that's happening at collect time.. pretty soon gonna dump actions i'm
# thinkin...
@ -79,142 +69,6 @@ has_nested_actors = pytest.mark.has_nested_actors
# )
@pytest.fixture
def spawn(
start_method,
testdir,
reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':
pytest.skip(
"Debugger tests are only supported on the trio backend"
)
def _spawn(cmd):
return testdir.spawn(
cmd=mk_cmd(cmd),
expect_timeout=3,
)
return _spawn
PROMPT = r"\(Pdb\+\)"
def expect(
child,
# prompt by default
patt: str = PROMPT,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
**kwargs
)
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
@pytest.mark.parametrize(
'user_in_out',
[
@ -238,14 +92,15 @@ def test_root_actor_error(
# scan for the prompt
expect(child, PROMPT)
before = str(child.before.decode())
# make sure expected logging and error arrives
assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
child,
[
_crash_msg,
"('root'",
'AssertionError',
]
)
assert 'AssertionError' in before
# send user command
child.sendline(user_input)
@ -264,8 +119,10 @@ def test_root_actor_error(
ids=lambda item: f'{item[0]} -> {item[1]}',
)
def test_root_actor_bp(spawn, user_in_out):
"""Demonstrate breakpoint from in root actor.
"""
'''
Demonstrate breakpoint from in root actor.
'''
user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint')
@ -279,7 +136,7 @@ def test_root_actor_bp(spawn, user_in_out):
child.expect('\r\n')
# process should exit
child.expect(pexpect.EOF)
child.expect(EOF)
if expect_err_str is None:
assert 'Error' not in str(child.before)
@ -287,38 +144,6 @@ def test_root_actor_bp(spawn, user_in_out):
assert expect_err_str in str(child.before)
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> None:
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
before = str(child.before.decode())
time.sleep(delay)
child.expect(PROMPT)
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
def test_root_actor_bp_forever(
spawn,
ctlc: bool,
@ -358,7 +183,7 @@ def test_root_actor_bp_forever(
# quit out of the loop
child.sendline('q')
child.expect(pexpect.EOF)
child.expect(EOF)
@pytest.mark.parametrize(
@ -380,10 +205,12 @@ def test_subactor_error(
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
child,
[
_crash_msg,
"('name_error'",
]
)
if do_next:
@ -402,17 +229,15 @@ def test_subactor_error(
child.sendline('continue')
child.expect(PROMPT)
before = str(child.before.decode())
# root actor gets debugger engaged
assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
)
# error is a remote error propagated from the subactor
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
child,
[
_crash_msg,
# root actor gets debugger engaged
"('root'",
# error is a remote error propagated from the subactor
"('name_error'",
]
)
# another round
@ -423,7 +248,7 @@ def test_subactor_error(
child.expect('\r\n')
# process should exit
child.expect(pexpect.EOF)
child.expect(EOF)
def test_subactor_breakpoint(
@ -433,14 +258,11 @@ def test_subactor_breakpoint(
"Single subactor with an infinite breakpoint loop"
child = spawn('subactor_breakpoint')
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
child,
[_pause_msg,
"('breakpoint_forever'",]
)
# do some "next" commands to demonstrate recurrent breakpoint
@ -456,9 +278,8 @@ def test_subactor_breakpoint(
for _ in range(5):
child.sendline('continue')
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
[_pause_msg, "('breakpoint_forever'"]
)
@ -471,9 +292,8 @@ def test_subactor_breakpoint(
# child process should exit but parent will capture pdb.BdbQuit
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
['RemoteActorError:',
"('breakpoint_forever'",
'bdb.BdbQuit',]
@ -486,11 +306,10 @@ def test_subactor_breakpoint(
child.sendline('c')
# process should exit
child.expect(pexpect.EOF)
child.expect(EOF)
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
['RemoteActorError:',
"('breakpoint_forever'",
'bdb.BdbQuit',]
@ -514,7 +333,7 @@ def test_multi_subactors(
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
[_pause_msg, "('breakpoint_forever'"]
)
@ -535,12 +354,14 @@ def test_multi_subactors(
# first name_error failure
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
child,
[
_crash_msg,
"('name_error'",
"NameError",
]
)
assert "NameError" in before
if ctlc:
do_ctlc(child)
@ -564,9 +385,8 @@ def test_multi_subactors(
# breakpoint loop should re-engage
child.sendline('c')
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
[_pause_msg, "('breakpoint_forever'"]
)
@ -629,7 +449,7 @@ def test_multi_subactors(
# process should exit
child.sendline('c')
child.expect(pexpect.EOF)
child.expect(EOF)
# repeat of previous multierror for final output
assert_before(child, [
@ -659,25 +479,28 @@ def test_multi_daemon_subactors(
# the root's tty lock first so anticipate either crash
# message on the first entry.
bp_forev_parts = [_pause_msg, "('bp_forever'"]
bp_forev_parts = [
_pause_msg,
"('bp_forever'",
]
bp_forev_in_msg = partial(
in_prompt_msg,
parts=bp_forev_parts,
)
name_error_msg = "NameError: name 'doggypants' is not defined"
name_error_parts = [name_error_msg]
name_error_msg: str = "NameError: name 'doggypants' is not defined"
name_error_parts: list[str] = [name_error_msg]
before = str(child.before.decode())
if bp_forev_in_msg(prompt=before):
if bp_forev_in_msg(child=child):
next_parts = name_error_parts
elif name_error_msg in before:
next_parts = bp_forev_parts
else:
raise ValueError("Neither log msg was found !?")
raise ValueError('Neither log msg was found !?')
if ctlc:
do_ctlc(child)
@ -746,14 +569,12 @@ def test_multi_daemon_subactors(
# wait for final error in root
# where it crashs with boxed error
while True:
try:
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
bp_forev_parts
)
except AssertionError:
child.sendline('c')
child.expect(PROMPT)
if not in_prompt_msg(
child,
bp_forev_parts
):
break
assert_before(
@ -769,7 +590,7 @@ def test_multi_daemon_subactors(
)
child.sendline('c')
child.expect(pexpect.EOF)
child.expect(EOF)
@has_nested_actors
@ -845,7 +666,7 @@ def test_multi_subactors_root_errors(
])
child.sendline('c')
child.expect(pexpect.EOF)
child.expect(EOF)
assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'",
@ -934,10 +755,13 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(PROMPT)
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before
assert_before(
child,
[
"NameError: name 'doggypants' is not defined",
"tractor._exceptions.RemoteActorError: ('name_error'",
],
)
time.sleep(0.5)
if ctlc:
@ -975,7 +799,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(3):
try:
child.expect(pexpect.EOF, timeout=0.5)
child.expect(EOF, timeout=0.5)
break
except TIMEOUT:
child.sendline('c')
@ -1017,7 +841,7 @@ def test_root_cancels_child_context_during_startup(
do_ctlc(child)
child.sendline('c')
child.expect(pexpect.EOF)
child.expect(EOF)
def test_different_debug_mode_per_actor(
@ -1028,9 +852,8 @@ def test_different_debug_mode_per_actor(
child.expect(PROMPT)
# only one actor should enter the debugger
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
[_crash_msg, "('debugged_boi'", "RuntimeError"],
)
@ -1038,9 +861,7 @@ def test_different_debug_mode_per_actor(
do_ctlc(child)
child.sendline('c')
child.expect(pexpect.EOF)
before = str(child.before.decode())
child.expect(EOF)
# NOTE: this debugged actor error currently WON'T show up since the
# root will actually cancel and terminate the nursery before the error
@ -1059,103 +880,6 @@ def test_different_debug_mode_per_actor(
)
def test_pause_from_sync(
spawn,
ctlc: bool
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
[
# pre-prompt line
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
before = str(child.before.decode())
assert not in_prompt_msg(
before,
['`greenback` portal opened!'],
)
# should be same root task
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
while attach_patts:
child.sendline('c')
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts.copy():
if key in before:
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg] + expected_patts
)
break
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.items():
assert not in_prompt_msg(
before,
other_patts,
)
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(pexpect.EOF)
def test_post_mortem_api(
spawn,
ctlc: bool,
@ -1258,7 +982,7 @@ def test_post_mortem_api(
# )
child.sendline('c')
child.expect(pexpect.EOF)
child.expect(EOF)
def test_shield_pause(
@ -1333,9 +1057,26 @@ def test_shield_pause(
]
)
child.sendline('c')
child.expect(pexpect.EOF)
child.expect(EOF)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead
# (in terms of `greenback` and/or extra threads) and if it's from
# a sync scope suggest that usage must first call
# `ensure_portal()` in the (eventual parent) async calling scope?
def test_sync_pause_from_bg_task_in_root_actor_():
'''
When used from the root actor, normally we can only implicitly
support `.pause_from_sync()` from the main-parent-task (that
opens the runtime via `open_root_actor()`) since `greenback`
requires a `.ensure_portal()` call per `trio.Task` where it is
used.
'''
...
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
def test_correct_frames_below_hidden():
'''

View File

@ -0,0 +1,381 @@
'''
That "foreign loop/thread" debug REPL support better ALSO WORK!
Same as `test_native_pause.py`.
All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually.
'''
from contextlib import (
contextmanager as cm,
)
# from functools import partial
# import itertools
import time
# from typing import (
# Iterator,
# )
import pytest
from pexpect.exceptions import (
TIMEOUT,
EOF,
)
from .conftest import (
# _ci_env,
do_ctlc,
PROMPT,
# expect,
in_prompt_msg,
assert_before,
_pause_msg,
_crash_msg,
_ctlc_ignore_header,
# _repl_fail_msg,
)
@cm
def maybe_expect_timeout(
ctlc: bool = False,
) -> None:
try:
yield
except TIMEOUT:
# breakpoint()
if ctlc:
pytest.xfail(
'Some kinda redic threading SIGINT bug i think?\n'
'See the notes in `examples/debugging/sync_bp.py`..\n'
)
raise
@pytest.mark.ctlcs_bish
def test_pause_from_sync(
spawn,
ctlc: bool,
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
[
# pre-prompt line
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
# assert not in_prompt_msg(
# child,
# ['`greenback` portal opened!'],
# )
# should be same root task
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
conts: int = 0 # for debugging below matching logic on failure
while attach_patts:
child.sendline('c')
conts += 1
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts:
if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg]
+
expected_patts
)
break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
child.sendline('c')
# XXX TODO, weird threading bug it seems despite the
# `abandon_on_cancel: bool` setting to
# `trio.to_thread.run_sync()`..
with maybe_expect_timeout(
ctlc=ctlc,
):
child.expect(EOF)
def expect_any_of(
attach_patts: dict[str, list[str]],
child, # what type?
ctlc: bool = False,
prompt: str = _ctlc_ignore_header,
ctlc_delay: float = .4,
) -> list[str]:
'''
Receive any of a `list[str]` of patterns provided in
`attach_patts`.
Used to test racing prompts from multiple actors and/or
tasks using a common root process' `pdbp` REPL.
'''
assert attach_patts
child.expect(PROMPT)
before = str(child.before.decode())
for attach_key in attach_patts:
if attach_key in before:
expected_patts: str = attach_patts.pop(attach_key)
assert_before(
child,
expected_patts
)
break # from for
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=prompt,
# NOTE same as comment above
delay=ctlc_delay,
)
return expected_patts
@pytest.mark.ctlcs_bish
def test_sync_pause_from_aio_task(
spawn,
ctlc: bool
# ^TODO, fix for `asyncio`!!
):
'''
Verify we can use the `pdbp` REPL from an `asyncio.Task` spawned using
APIs in `.to_asyncio`.
`examples/debugging/asycio_bp.py`
'''
child = spawn('asyncio_bp')
# RACE on whether trio/asyncio task bps first
attach_patts: dict[str, list[str]] = {
# first pause in guest-mode (aka "infecting")
# `trio.Task`.
'trio-side': [
_pause_msg,
"<Task 'trio_ctx'",
"('aio_daemon'",
],
# `breakpoint()` from `asyncio.Task`.
'asyncio-side': [
_pause_msg,
"<Task pending name='Task-2' coro=<greenback_shim()",
"('aio_daemon'",
],
}
while attach_patts:
expect_any_of(
attach_patts=attach_patts,
child=child,
ctlc=ctlc,
)
child.sendline('c')
# NOW in race order,
# - the asyncio-task will error
# - the root-actor parent task will pause
#
attach_patts: dict[str, list[str]] = {
# error raised in `asyncio.Task`
"raise ValueError('asyncio side error!')": [
_crash_msg,
"<Task 'trio_ctx'",
"@ ('aio_daemon'",
"ValueError: asyncio side error!",
# XXX, we no longer show this frame by default!
# 'return await chan.receive()', # `.to_asyncio` impl internals in tb
],
# parent-side propagation via actor-nursery/portal
# "tractor._exceptions.RemoteActorError: remote task raised a 'ValueError'": [
"remote task raised a 'ValueError'": [
_crash_msg,
"src_uid=('aio_daemon'",
"('aio_daemon'",
],
# a final pause in root-actor
"<Task '__main__.main'": [
_pause_msg,
"<Task '__main__.main'",
"('root'",
],
}
while attach_patts:
expect_any_of(
attach_patts=attach_patts,
child=child,
ctlc=ctlc,
)
child.sendline('c')
assert not attach_patts
# final boxed error propagates to root
assert_before(
child,
[
_crash_msg,
"<Task '__main__.main'",
"('root'",
"remote task raised a 'ValueError'",
"ValueError: asyncio side error!",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
child.sendline('c')
# with maybe_expect_timeout():
child.expect(EOF)
def test_sync_pause_from_non_greenbacked_aio_task():
'''
Where the `breakpoint()` caller task is NOT spawned by
`tractor.to_asyncio` and thus never activates
a `greenback.ensure_portal()` beforehand, presumably bc the task
was started by some lib/dep as in often seen in the field.
Ensure sync pausing works when the pause is in,
- the root actor running in infected-mode?
|_ since we don't need any IPC to acquire the debug lock?
|_ is there some way to handle this like the non-main-thread case?
All other cases need to error out appropriately right?
- for any subactor we can't avoid needing the repl lock..
|_ is there a way to hook into `asyncio.ensure_future(obj)`?
'''
pass

View File

@ -0,0 +1,172 @@
'''
That "native" runtime-hackin toolset better be dang useful!
Verify the funtion of a variety of "developer-experience" tools we
offer from the `.devx` sub-pkg:
- use of the lovely `stackscope` for dumping actor `trio`-task trees
during operation and hangs.
TODO:
- demonstration of `CallerInfo` call stack frame filtering such that
for logging and REPL purposes a user sees exactly the layers needed
when debugging a problem inside the stack vs. in their app.
'''
import os
import signal
import time
from .conftest import (
expect,
assert_before,
in_prompt_msg,
PROMPT,
_pause_msg,
)
from pexpect.exceptions import (
# TIMEOUT,
EOF,
)
def test_shield_pause(
spawn,
):
'''
Verify the `tractor.pause()/.post_mortem()` API works inside an
already cancelled `trio.CancelScope` and that you can step to the
next checkpoint wherein the cancelled will get raised.
'''
child = spawn(
'shield_hang_in_sub'
)
expect(
child,
'Yo my child hanging..?',
)
assert_before(
child,
[
'Entering shield sleep..',
'Enabling trace-trees on `SIGUSR1` since `stackscope` is installed @',
]
)
script_pid: int = child.pid
print(
f'Sending SIGUSR1 to {script_pid}\n'
f'(kill -s SIGUSR1 {script_pid})\n'
)
os.kill(
script_pid,
signal.SIGUSR1,
)
time.sleep(0.2)
expect(
child,
# end-of-tree delimiter
"end-of-\('root'",
)
assert_before(
child,
[
# 'Srying to dump `stackscope` tree..',
# 'Dumping `stackscope` tree for actor',
"('root'", # uid line
# TODO!? this used to show?
# -[ ] mk reproducable for @oremanj?
#
# parent block point (non-shielded)
# 'await trio.sleep_forever() # in root',
]
)
expect(
child,
# end-of-tree delimiter
"end-of-\('hanger'",
)
assert_before(
child,
[
# relay to the sub should be reported
'Relaying `SIGUSR1`[10] to sub-actor',
"('hanger'", # uid line
# TODO!? SEE ABOVE
# hanger LOC where it's shield-halted
# 'await trio.sleep_forever() # in subactor',
]
)
# simulate the user sending a ctl-c to the hanging program.
# this should result in the terminator kicking in since
# the sub is shield blocking and can't respond to SIGINT.
os.kill(
child.pid,
signal.SIGINT,
)
expect(
child,
'Shutting down actor runtime',
timeout=6,
)
assert_before(
child,
[
'raise KeyboardInterrupt',
# 'Shutting down actor runtime',
'#T-800 deployed to collect zombie B0',
"'--uid', \"('hanger',",
]
)
def test_breakpoint_hook_restored(
spawn,
):
'''
Ensures our actor runtime sets a custom `breakpoint()` hook
on open then restores the stdlib's default on close.
The hook state validation is done via `assert`s inside the
invoked script with only `breakpoint()` (not `tractor.pause()`)
calls used.
'''
child = spawn('restore_builtin_breakpoint')
child.expect(PROMPT)
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
"first bp, tractor hook set",
]
)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
"last bp, stdlib hook restored",
]
)
# since the stdlib hook was already restored there should be NO
# `tractor` `log.pdb()` content from console!
assert not in_prompt_msg(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
],
)
child.sendline('c')
child.expect(EOF)

View File

@ -130,7 +130,7 @@ def test_multierror(
try:
await portal2.result()
except tractor.RemoteActorError as err:
assert err.boxed_type == AssertionError
assert err.boxed_type is AssertionError
print("Look Maa that first actor failed hard, hehh")
raise
@ -182,7 +182,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError)
assert exc.boxed_type == AssertionError
assert exc.boxed_type is AssertionError
async def do_nothing():
@ -504,7 +504,9 @@ def test_cancel_via_SIGINT_other_task(
if is_win(): # smh
timeout += 1
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
async def spawn_and_sleep_forever(
task_status=trio.TASK_STATUS_IGNORED
):
async with tractor.open_nursery() as tn:
for i in range(3):
await tn.run_in_actor(

View File

@ -955,7 +955,7 @@ async def echo_back_sequence(
)
await ctx.started()
# await tractor.breakpoint()
# await tractor.pause()
async with ctx.open_stream(
msg_buffer_size=msg_buffer_size,

File diff suppressed because it is too large Load Diff

View File

@ -170,7 +170,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
trio.run(main)
rae = excinfo.value
assert rae.boxed_type == TypeError
assert rae.boxed_type is TypeError
@tractor.context

View File

@ -0,0 +1,248 @@
'''
Special attention cases for using "infect `asyncio`" mode from a root
actor; i.e. not using a std `trio.run()` bootstrap.
'''
import asyncio
from functools import partial
import pytest
import trio
import tractor
from tractor import (
to_asyncio,
)
from tests.test_infected_asyncio import (
aio_echo_server,
)
@pytest.mark.parametrize(
'raise_error_mid_stream',
[
False,
Exception,
KeyboardInterrupt,
],
ids='raise_error={}'.format,
)
def test_infected_root_actor(
raise_error_mid_stream: bool|Exception,
# conftest wide
loglevel: str,
debug_mode: bool,
):
'''
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
in the root actor.
'''
async def _trio_main():
with trio.fail_after(2 if not debug_mode else 999):
first: str
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan),
):
assert first == 'start'
for i in range(1000):
await chan.send(i)
out = await chan.receive()
assert out == i
print(f'asyncio echoing {i}')
if (
raise_error_mid_stream
and
i == 500
):
raise raise_error_mid_stream
if out is None:
try:
out = await chan.receive()
except trio.EndOfChannel:
break
else:
raise RuntimeError(
'aio channel never stopped?'
)
if raise_error_mid_stream:
with pytest.raises(raise_error_mid_stream):
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
else:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
async def sync_and_err(
# just signature placeholders for compat with
# ``to_asyncio.open_channel_from()``
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
ev: asyncio.Event,
):
if to_trio:
to_trio.send_nowait('start')
await ev.wait()
raise RuntimeError('asyncio-side')
@pytest.mark.parametrize(
'aio_err_trigger',
[
'before_start_point',
'after_trio_task_starts',
'after_start_point',
],
ids='aio_err_triggered={}'.format
)
def test_trio_prestarted_task_bubbles(
aio_err_trigger: str,
# conftest wide
loglevel: str,
debug_mode: bool,
):
async def pre_started_err(
raise_err: bool = False,
pre_sleep: float|None = None,
aio_trigger: asyncio.Event|None = None,
task_status=trio.TASK_STATUS_IGNORED,
):
'''
Maybe pre-started error then sleep.
'''
if pre_sleep is not None:
print(f'Sleeping from trio for {pre_sleep!r}s !')
await trio.sleep(pre_sleep)
# signal aio-task to raise JUST AFTER this task
# starts but has not yet `.started()`
if aio_trigger:
print('Signalling aio-task to raise from `trio`!!')
aio_trigger.set()
if raise_err:
print('Raising from trio!')
raise TypeError('trio-side')
task_status.started()
await trio.sleep_forever()
async def _trio_main():
# with trio.fail_after(2):
with trio.fail_after(999):
first: str
chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event()
async with (
tractor.open_root_actor(
debug_mode=False,
loglevel=loglevel,
),
):
# TODO, tests for this with 3.13 egs?
# from tractor.devx import open_crash_handler
# with open_crash_handler():
async with (
# where we'll start a sub-task that errors BEFORE
# calling `.started()` such that the error should
# bubble before the guest run terminates!
trio.open_nursery() as tn,
# THEN start an infect task which should error just
# after the trio-side's task does.
to_asyncio.open_channel_from(
partial(
sync_and_err,
ev=aio_ev,
)
) as (first, chan),
):
for i in range(5):
pre_sleep: float|None = None
last_iter: bool = (i == 4)
# TODO, missing cases?
# -[ ] error as well on
# 'after_start_point' case as well for
# another case?
raise_err: bool = False
if last_iter:
raise_err: bool = True
# trigger aio task to error on next loop
# tick/checkpoint
if aio_err_trigger == 'before_start_point':
aio_ev.set()
pre_sleep: float = 0
await tn.start(
pre_started_err,
raise_err,
pre_sleep,
(aio_ev if (
aio_err_trigger == 'after_trio_task_starts'
and
last_iter
) else None
),
)
if (
aio_err_trigger == 'after_start_point'
and
last_iter
):
aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first.
if aio_err_trigger in (
'after_trio_task_starts',
'after_start_point',
):
assert len(errs := rest_eg.exceptions) == 1
typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side.
else:
assert len(rtes := rte_eg.exceptions) == 1
assert 'asyncio-side' in rtes[0].args[0]

View File

@ -271,7 +271,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
# the faster subtask was cancelled
break
# await tractor.breakpoint()
# await tractor.pause()
# await stream.receive()
print(f'final value: {value}')

View File

@ -3,6 +3,10 @@ Reminders for oddities in `trio` that we need to stay aware of and/or
want to see changed.
'''
from contextlib import (
asynccontextmanager as acm,
)
import pytest
import trio
from trio import TaskStatus
@ -80,3 +84,115 @@ def test_stashed_child_nursery(use_start_soon):
with pytest.raises(NameError):
trio.run(main)
@pytest.mark.parametrize(
('unmask_from_canc', 'canc_from_finally'),
[
(True, False),
(True, True),
pytest.param(False, True,
marks=pytest.mark.xfail(reason="never raises!")
),
],
# TODO, ask ronny how to impl this .. XD
# ids='unmask_from_canc={0}, canc_from_finally={1}',#.format,
)
def test_acm_embedded_nursery_propagates_enter_err(
canc_from_finally: bool,
unmask_from_canc: bool,
):
'''
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
`.__context__` field when a user (by accident) re-raises from a `finally:`.
'''
import tractor
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery,
unmask_from: BaseException|None = trio.Cancelled
# TODO, maybe offer a collection?
# unmask_from: set[BaseException] = {
# trio.Cancelled,
# },
):
if not unmask_from:
yield
return
try:
yield
except* unmask_from as be_eg:
# TODO, if we offer `unmask_from: set`
# for masker_exc_type in unmask_from:
matches, rest = be_eg.split(unmask_from)
if not matches:
raise
for exc_match in be_eg.exceptions:
if (
(exc_ctx := exc_match.__context__)
and
type(exc_ctx) not in {
# trio.Cancelled, # always by default?
unmask_from,
}
):
exc_ctx.add_note(
f'\n'
f'WARNING: the above error was masked by a {unmask_from!r} !?!\n'
f'Are you always cancelling? Say from a `finally:` ?\n\n'
f'{tn!r}'
)
raise exc_ctx from exc_match
@acm
async def wraps_tn_that_always_cancels():
async with (
trio.open_nursery() as tn,
maybe_raise_from_masking_exc(
tn=tn,
unmask_from=(
trio.Cancelled
if unmask_from_canc
else None
),
)
):
try:
yield tn
finally:
if canc_from_finally:
tn.cancel_scope.cancel()
await trio.lowlevel.checkpoint()
async def _main():
with tractor.devx.open_crash_handler() as bxerr:
assert not bxerr.value
async with (
wraps_tn_that_always_cancels() as tn,
):
assert not tn.cancel_scope.cancel_called
assert 0
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
with pytest.raises(ExceptionGroup) as excinfo:
trio.run(_main)
eg: ExceptionGroup = excinfo.value
assert_eg, rest_eg = eg.split(AssertionError)
assert len(assert_eg.exceptions) == 1

View File

@ -1703,15 +1703,28 @@ class Context:
# TODO: expose as mod func instead!
structfmt = pretty_struct.Struct.pformat
if self._in_overrun:
log.warning(
f'Queueing OVERRUN msg on caller task:\n\n'
report: str = (
f'{flow_body}'
f'{structfmt(msg)}\n'
)
over_q: deque = self._overflow_q
self._overflow_q.append(msg)
if len(over_q) == over_q.maxlen:
report = (
'FAILED to queue OVERRUN msg, OVERAN the OVERRUN QUEUE !!\n\n'
+ report
)
# log.error(report)
log.debug(report)
else:
report = (
'Queueing OVERRUN msg on caller task:\n\n'
+ report
)
log.debug(report)
# XXX NOTE XXX
# overrun is the ONLY case where returning early is fine!
return False

View File

@ -20,6 +20,7 @@ Sub-process entry points.
"""
from __future__ import annotations
from functools import partial
import multiprocessing as mp
import os
import textwrap
from typing import (
@ -64,20 +65,22 @@ def _mp_main(
'''
actor._forkserver_info = forkserver_info
from ._spawn import try_set_start_method
spawn_ctx = try_set_start_method(start_method)
spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method)
assert spawn_ctx
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
f'Setting loglevel for {actor.uid} to {actor.loglevel}'
)
get_console_log(actor.loglevel)
assert spawn_ctx
# TODO: use scops headers like for `trio` below!
# (well after we libify it maybe..)
log.info(
f"Started new {spawn_ctx.current_process()} for {actor.uid}")
_state._current_actor = actor
log.debug(f"parent_addr is {parent_addr}")
f'Started new {spawn_ctx.current_process()} for {actor.uid}'
# f"parent_addr is {parent_addr}"
)
_state._current_actor: Actor = actor
trio_main = partial(
async_main,
actor=actor,
@ -94,7 +97,9 @@ def _mp_main(
pass # handle it the same way trio does?
finally:
log.info(f"Subactor {actor.uid} terminated")
log.info(
f'`mp`-subactor {actor.uid} exited'
)
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually

View File

@ -82,6 +82,48 @@ class InternalError(RuntimeError):
'''
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
NOTE: this should NOT inherit from `asyncio.CancelledError` or
tests should break!
'''
class AsyncioTaskExited(Exception):
'''
asyncio.Task "exited" translation error for use with the
`to_asyncio` APIs to be raised in the `trio` side task indicating
on `.run_task()`/`.open_channel_from()` exit that the aio side
exited early/silently.
'''
class TrioCancelled(Exception):
'''
Trio cancelled translation (non-base) error
for use with the `to_asyncio` module
to be raised in the `asyncio.Task` to indicate
that the `trio` side raised `Cancelled` or an error.
'''
class TrioTaskExited(Exception):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
This is very similar to how `trio.ClosedResource` acts as
a "clean shutdown" signal to the consumer side of a mem-chan,
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
'''
# NOTE: more or less should be close to these:
# 'boxed_type',
@ -127,8 +169,8 @@ _body_fields: list[str] = list(
def get_err_type(type_name: str) -> BaseException|None:
'''
Look up an exception type by name from the set of locally
known namespaces:
Look up an exception type by name from the set of locally known
namespaces:
- `builtins`
- `tractor._exceptions`
@ -358,6 +400,13 @@ class RemoteActorError(Exception):
self._ipc_msg.src_type_str
)
if not self._src_type:
raise TypeError(
f'Failed to lookup src error type with '
f'`tractor._exceptions.get_err_type()` :\n'
f'{self.src_type_str}'
)
return self._src_type
@property
@ -366,6 +415,9 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type.
'''
# TODO, maybe support also serializing the
# `ExceptionGroup.exeptions: list[BaseException]` set under
# certain conditions?
bt: Type[BaseException] = self.boxed_type
if bt:
return str(bt.__name__)
@ -609,6 +661,7 @@ class RemoteActorError(Exception):
# just after <Type(
# |___ ..
tb_body_indent=1,
boxer_header=self.relay_uid,
)
tail = ''
@ -651,16 +704,10 @@ class RemoteActorError(Exception):
failing actor's remote env.
'''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611
src_type_ref: Type[BaseException] = self.src_type
return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given
@ -786,8 +833,11 @@ class MsgTypeError(
'''
if (
(_bad_msg := self.msgdata.get('_bad_msg'))
and
isinstance(_bad_msg, PayloadMsg)
and (
isinstance(_bad_msg, PayloadMsg)
or
isinstance(_bad_msg, msgtypes.Start)
)
):
return _bad_msg
@ -973,15 +1023,6 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
'''
class MessagingError(Exception):
'''
IPC related msg (typing), transaction (ordering) or dialog
@ -989,7 +1030,6 @@ class MessagingError(Exception):
'''
def pack_error(
exc: BaseException|RemoteActorError,
@ -1143,19 +1183,51 @@ def unpack_error(
def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup
) -> bool:
exc: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> bool|BaseExceptionGroup:
'''
Predicate to determine if a possible ``BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks.
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(exc, BaseExceptionGroup):
return exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled)
) is not None
matched_exc: BaseExceptionGroup|None = exc.subgroup(
tuple(ignore_nested),
# TODO, complain about why not allowed XD
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False
@ -1375,7 +1447,9 @@ def _mk_recv_mte(
any_pld: Any = msgpack.decode(msg.pld)
message: str = (
f'invalid `{msg_type.__qualname__}` msg payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: '
f'{any_pld!r}\n\n'
f'has type {type(any_pld)!r}\n\n'
f'and does not match type-spec '
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
)
bad_msg = msg

View File

@ -80,7 +80,7 @@ async def open_root_actor(
# enables the multi-process debugger support
debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False,
# internal logging
@ -95,6 +95,17 @@ async def open_root_actor(
hide_tb: bool = True,
# XXX, proxied directly to `.devx._debug._maybe_enter_pm()`
# for REPL-entry logic.
debug_filter: Callable[
[BaseException|BaseExceptionGroup],
bool,
] = lambda err: not is_multi_cancelled(err),
# TODO, a way for actors to augment passing derived
# read-only state to sublayers?
# extra_rt_vars: dict|None = None,
) -> Actor:
'''
Runtime init entry point for ``tractor``.
@ -233,14 +244,8 @@ async def open_root_actor(
and
enable_stack_on_sig
):
try:
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
from .devx._stackscope import enable_stack_on_sig
enable_stack_on_sig()
# closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = []
@ -336,6 +341,10 @@ async def open_root_actor(
loglevel=loglevel,
enable_modules=enable_modules,
)
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# Start up main task set via core actor-runtime nurseries.
try:
@ -377,6 +386,7 @@ async def open_root_actor(
Exception,
BaseExceptionGroup,
) as err:
# XXX NOTE XXX see equiv note inside
# `._runtime.Actor._stream_handler()` where in the
# non-root or root-that-opened-this-mahually case we
@ -385,11 +395,15 @@ async def open_root_actor(
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
debug_filter=debug_filter,
)
if (
not entered
and
not is_multi_cancelled(err)
not is_multi_cancelled(
err,
)
):
logger.exception('Root actor crashed\n')

View File

@ -59,6 +59,7 @@ from types import ModuleType
import warnings
import trio
from trio._core import _run as trio_runtime
from trio import (
CancelScope,
Nursery,
@ -80,6 +81,7 @@ from ._context import (
from .log import get_logger
from ._exceptions import (
ContextCancelled,
InternalError,
ModuleNotExposed,
MsgTypeError,
unpack_error,
@ -98,6 +100,7 @@ from ._rpc import (
if TYPE_CHECKING:
from ._supervise import ActorNursery
from trio._channel import MemoryChannelState
log = get_logger('tractor')
@ -896,11 +899,15 @@ class Actor:
f'peer: {chan.uid}\n'
f'cid:{cid}\n'
)
ctx._allow_overruns = allow_overruns
ctx._allow_overruns: bool = allow_overruns
# adjust buffer size if specified
state = ctx._send_chan._state # type: ignore
if msg_buffer_size and state.max_buffer_size != msg_buffer_size:
state: MemoryChannelState = ctx._send_chan._state # type: ignore
if (
msg_buffer_size
and
state.max_buffer_size != msg_buffer_size
):
state.max_buffer_size = msg_buffer_size
except KeyError:
@ -1094,7 +1101,36 @@ class Actor:
'`tractor.pause_from_sync()` not available!'
)
rvs['_is_root'] = False
# XXX ensure the "infected `asyncio` mode" setting
# passed down from our spawning parent is consistent
# with `trio`-runtime initialization:
# - during sub-proc boot, the entrypoint func
# (`._entry.<spawn_backend>_main()`) should set
# `._infected_aio = True` before calling
# `run_as_asyncio_guest()`,
# - the value of `infect_asyncio: bool = True` as
# passed to `ActorNursery.start_actor()` must be
# the same as `_runtime_vars['_is_infected_aio']`
if (
(aio_rtv := rvs['_is_infected_aio'])
!=
(aio_attr := self._infected_aio)
):
raise InternalError(
'Parent sent runtime-vars that mismatch for the '
'"infected `asyncio` mode" settings ?!?\n\n'
f'rvs["_is_infected_aio"] = {aio_rtv}\n'
f'self._infected_aio = {aio_attr}\n'
)
if aio_rtv:
assert trio_runtime.GLOBAL_RUN_CONTEXT.runner.is_guest
# ^TODO^ possibly add a `sniffio` or
# `trio` pub-API for `is_guest_mode()`?
rvs['_is_root'] = False # obvi XD
# update process-wide globals
_state._runtime_vars.update(rvs)
# XXX: ``msgspec`` doesn't support serializing tuples

View File

@ -44,6 +44,8 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None),
'_registry_addrs': [],
'_is_infected_aio': False,
# for `tractor.pause_from_sync()` & `breakpoint()` support
'use_greenback': False,
}
@ -70,7 +72,8 @@ def current_actor(
'''
if (
err_on_no_runtime
and _current_actor is None
and
_current_actor is None
):
msg: str = 'No local actor has been initialized yet?\n'
from ._exceptions import NoRuntime

View File

@ -158,6 +158,7 @@ class ActorNursery:
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
_rtv['_is_infected_aio'] = infect_asyncio
# allow setting debug policy per actor
if debug_mode is not None:

View File

@ -54,6 +54,25 @@ def examples_dir() -> pathlib.Path:
return repodir() / 'examples'
def mk_cmd(
ex_name: str,
exs_subpath: str = 'debugging',
) -> str:
'''
Generate a shell command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = (
examples_dir()
/ exs_subpath
/ f'{ex_name}.py'
)
return ' '.join([
'python',
str(script_path)
])
@acm
async def expect_ctxc(
yay: bool,

View File

@ -26,7 +26,7 @@ from ._debug import (
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler,
sigint_shield as sigint_shield,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback,

File diff suppressed because it is too large Load Diff

View File

@ -234,7 +234,7 @@ def find_caller_info(
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
# TODO: -[x] move all this into new `.devx._code`!
# TODO: -[x] move all this into new `.devx._frame_stack`!
# -[ ] consider rename to _callstack?
# -[ ] prolly create a `@runtime_api` dec?
# |_ @api_frame seems better?
@ -286,3 +286,18 @@ def api_frame(
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
wrapped.__api_func__: bool = True
return wrapper(wrapped)
# TODO: something like this instead of the adhoc frame-unhiding
# blocks all over the runtime!! XD
# -[ ] ideally we can expect a certain error (set) and if something
# else is raised then all frames below the wrapped one will be
# un-hidden via `__tracebackhide__: bool = False`.
# |_ might need to dynamically mutate the code objs like
# `pdbp.hideframe()` does?
# -[ ] use this as a `@acm` decorator as introed in 3.10?
# @acm
# async def unhide_frame_when_not(
# error_set: set[BaseException],
# ) -> TracebackType:
# ...

View File

@ -24,19 +24,32 @@ disjoint, parallel executing tasks in separate actors.
'''
from __future__ import annotations
# from functools import partial
from threading import (
current_thread,
Thread,
RLock,
)
import multiprocessing as mp
from signal import (
signal,
getsignal,
SIGUSR1,
SIGINT,
)
# import traceback
from types import ModuleType
from typing import (
Callable,
TYPE_CHECKING,
)
import traceback
from typing import TYPE_CHECKING
import trio
from tractor import (
_state,
log as logmod,
)
from tractor.devx import _debug
log = logmod.get_logger(__name__)
@ -51,26 +64,68 @@ if TYPE_CHECKING:
@trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None:
import stackscope
from tractor.log import get_console_log
'''
Do a classic `stackscope.extract()` task-tree dump to console at
`.devx()` level.
'''
import stackscope
tree_str: str = str(
stackscope.extract(
trio.lowlevel.current_root_task(),
recurse_child_tasks=True
)
)
log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor()
thr: Thread = current_thread()
current_sigint_handler: Callable = getsignal(SIGINT)
if (
current_sigint_handler
is not
_debug.DebugStatus._trio_handler
):
sigint_handler_report: str = (
'The default `trio` SIGINT handler was replaced?!'
)
else:
sigint_handler_report: str = (
'The default `trio` SIGINT handler is in use?!'
)
# sclang symbology
# |_<object>
# |_(Task/Thread/Process/Actor
# |_{Supervisor/Scope
# |_[Storage/Memory/IPC-Stream/Data-Struct
log.devx(
f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
f'{tree_str}\n'
f'(>: {actor.uid!r}\n'
f' |_{mp.current_process()}\n'
f' |_{thr}\n'
f' |_{actor}\n'
f'\n'
f'{sigint_handler_report}\n'
f'signal.getsignal(SIGINT) -> {current_sigint_handler!r}\n'
# f'\n'
# start-of-trace-tree delimiter (mostly for testing)
# f'------ {actor.uid!r} ------\n'
f'\n'
f'------ start-of-{actor.uid!r} ------\n'
f'|\n'
f'{tree_str}'
# end-of-trace-tree delimiter (mostly for testing)
f'|\n'
f'|_____ end-of-{actor.uid!r} ______\n'
)
# TODO: can remove this right?
# -[ ] was original code from author
#
# print(
# 'DUMPING FROM PRINT\n'
# +
# content
# )
# import logging
# try:
# with open("/dev/tty", "w") as tty:
@ -80,58 +135,130 @@ def dump_task_tree() -> None:
# "task_tree"
# ).exception("Error printing task tree")
_handler_lock = RLock()
_tree_dumped: bool = False
def signal_handler(
def dump_tree_on_sig(
sig: int,
frame: object,
relay_to_subs: bool = True,
) -> None:
try:
trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree)
except RuntimeError:
# not in async context -- print a normal traceback
traceback.print_stack()
global _tree_dumped, _handler_lock
with _handler_lock:
# if _tree_dumped:
# log.warning(
# 'Already dumped for this actor...??'
# )
# return
_tree_dumped = True
# actor: Actor = _state.current_actor()
log.devx(
'Trying to dump `stackscope` tree..\n'
)
try:
dump_task_tree()
# await actor._service_n.start_soon(
# partial(
# trio.to_thread.run_sync,
# dump_task_tree,
# )
# )
# trio.lowlevel.current_trio_token().run_sync_soon(
# dump_task_tree
# )
except RuntimeError:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
# not in async context -- print a normal traceback
# traceback.print_stack()
raise
except BaseException:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
raise
# log.devx(
# 'Supposedly we dumped just fine..?'
# )
if not relay_to_subs:
return
an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.devx(
log.warning(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'
)
if isinstance(subproc, trio.Process):
subproc.send_signal(sig)
# bc of course stdlib can't have a std API.. XD
match subproc:
case trio.Process():
subproc.send_signal(sig)
elif isinstance(subproc, mp.Process):
subproc._send_signal(sig)
case mp.Process():
subproc._send_signal(sig)
def enable_stack_on_sig(
sig: int = SIGUSR1
) -> None:
sig: int = SIGUSR1,
) -> ModuleType:
'''
Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1.
HOT TIP: a task/ctx-tree dump can be triggered from a shell with
fancy cmds.
For ex. from `bash` using `pgrep` and cmd-sustitution
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
you could use:
>> kill -SIGUSR1 $(pgrep -f <part-of-cmd: str>)
OR without a sub-shell,
>> pkill --signal SIGUSR1 -f <part-of-cmd: str>
'''
try:
import stackscope
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
return None
handler: Callable|int = getsignal(sig)
if handler is dump_tree_on_sig:
log.devx(
'A `SIGUSR1` handler already exists?\n'
f'|_ {handler!r}\n'
)
return
signal(
sig,
signal_handler,
dump_tree_on_sig,
)
# NOTE: not the above can be triggered from
# a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>')
#
# for example if you were looking to trace a `pytest` run
# kill -SIGUSR1 @$(pgrep -f 'pytest')
log.devx(
'Enabling trace-trees on `SIGUSR1` '
'since `stackscope` is installed @ \n'
f'{stackscope!r}\n\n'
f'With `SIGUSR1` handler\n'
f'|_{dump_tree_on_sig}\n'
)
return stackscope

View File

@ -53,6 +53,7 @@ def pformat_boxed_tb(
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
boxer_header: str = '-'
) -> str:
'''
@ -88,10 +89,10 @@ def pformat_boxed_tb(
tb_box: str = (
f'|\n'
f' ------ - ------\n'
f' ------ {boxer_header} ------\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
f' ------ {boxer_header}- ------\n'
f'_|'
)
tb_box_indent: str = (
tb_box_indent

View File

@ -258,20 +258,28 @@ class ActorContextInfo(Mapping):
def get_logger(
name: str | None = None,
name: str|None = None,
_root_name: str = _proj_name,
logger: Logger|None = None,
# TODO, using `.config.dictConfig()` api?
# -[ ] SO answer with docs links
# |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
subsys_spec: str|None = None,
) -> StackLevelAdapter:
'''Return the package log or a sub-logger for ``name`` if provided.
'''
log: Logger
log = rlog = logging.getLogger(_root_name)
log = rlog = logger or logging.getLogger(_root_name)
if (
name
and name != _proj_name
and
name != _proj_name
):
# NOTE: for handling for modules that use ``get_logger(__name__)``
@ -283,7 +291,7 @@ def get_logger(
# since in python the {filename} is always this same
# module-file.
sub_name: None | str = None
sub_name: None|str = None
rname, _, sub_name = name.partition('.')
pkgpath, _, modfilename = sub_name.rpartition('.')
@ -306,7 +314,10 @@ def get_logger(
# add our actor-task aware adapter which will dynamically look up
# the actor and task names at each log emit
logger = StackLevelAdapter(log, ActorContextInfo())
logger = StackLevelAdapter(
log,
ActorContextInfo(),
)
# additional levels
for name, val in CUSTOM_LEVELS.items():
@ -319,15 +330,25 @@ def get_logger(
def get_console_log(
level: str | None = None,
level: str|None = None,
logger: Logger|None = None,
**kwargs,
) -> LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it.
) -> LoggerAdapter:
'''
log = get_logger(**kwargs) # our root logger
logger = log.logger
Get a `tractor`-style logging instance: a `Logger` wrapped in
a `StackLevelAdapter` which injects various concurrency-primitive
(process, thread, task) fields and enables a `StreamHandler` that
writes on stderr using `colorlog` formatting.
Yeah yeah, i know we can use `logging.config.dictConfig()`. You do it.
'''
log = get_logger(
logger=logger,
**kwargs
) # set a root logger
logger: Logger = log.logger
if not level:
return log
@ -346,9 +367,13 @@ def get_console_log(
None,
)
):
fmt = LOG_FORMAT
# if logger:
# fmt = None
handler = StreamHandler()
formatter = colorlog.ColoredFormatter(
LOG_FORMAT,
fmt=fmt,
datefmt=DATE_FORMAT,
log_colors=STD_PALETTE,
secondary_log_colors=BOLD_PALETTE,
@ -365,7 +390,7 @@ def get_loglevel() -> str:
# global module logger for tractor itself
log = get_logger('tractor')
log: StackLevelAdapter = get_logger('tractor')
def at_least_level(

View File

@ -41,8 +41,10 @@ import textwrap
from typing import (
Any,
Callable,
Protocol,
Type,
TYPE_CHECKING,
TypeVar,
Union,
)
from types import ModuleType
@ -181,7 +183,11 @@ def mk_dec(
dec_hook: Callable|None = None,
) -> MsgDec:
'''
Create an IPC msg decoder, normally used as the
`PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`.
'''
return MsgDec(
_dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]`
@ -227,6 +233,13 @@ def pformat_msgspec(
join_char: str = '\n',
) -> str:
'''
Pretty `str` format the `msgspec.msgpack.Decoder.type` attribute
for display in (console) log messages as a nice (maybe multiline)
presentation of all supported `Struct`s (subtypes) available for
typed decoding.
'''
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
return join_char.join(
mk_msgspec_table(
@ -630,31 +643,57 @@ def limit_msg_spec(
# # import pdbp; pdbp.set_trace()
# assert ext_codec.pld_spec == extended_spec
# yield ext_codec
#
# ^-TODO-^ is it impossible to make something like this orr!?
# TODO: make an auto-custom hook generator from a set of input custom
# types?
# -[ ] below is a proto design using a `TypeCodec` idea?
#
# type var for the expected interchange-lib's
# IPC-transport type when not available as a built-in
# serialization output.
WireT = TypeVar('WireT')
# TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort?
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
# and then call `.to_dict()` on them?
# -[x] we're going to need to re-impl all the stuff changed in the
# runtime port such that it can handle dicts or `Msg`s?
#
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# '''
# Deliver a `enc_hook()`/`dec_hook()` pair which does
# manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation.
#
# Note: this is is/was primarly used while moving the core
# runtime over to using native `Msg`-struct types wherein we
# start with the send side emitting without loading
# a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has
# been adjusted appropriately.
#
# '''
# return (
# # enc_to_dict,
# dec_from_dict,
# )
# TODO: some kinda (decorator) API for built-in subtypes
# that builds this implicitly by inspecting the `mro()`?
class TypeCodec(Protocol):
'''
A per-custom-type wire-transport serialization translator
description type.
'''
src_type: Type
wire_type: WireT
def encode(obj: Type) -> WireT:
...
def decode(
obj_type: Type[WireT],
obj: WireT,
) -> Type:
...
class MsgpackTypeCodec(TypeCodec):
...
def mk_codec_hooks(
type_codecs: list[TypeCodec],
) -> tuple[Callable, Callable]:
'''
Deliver a `enc_hook()`/`dec_hook()` pair which handle
manual convertion from an input `Type` set such that whenever
the `TypeCodec.filter()` predicate matches the
`TypeCodec.decode()` is called on the input native object by
the `dec_hook()` and whenever the
`isiinstance(obj, TypeCodec.type)` matches against an
`enc_hook(obj=obj)` the return value is taken from a
`TypeCodec.encode(obj)` callback.
'''
...

View File

@ -30,9 +30,9 @@ from msgspec import (
Struct as _Struct,
structs,
)
from pprint import (
saferepr,
)
# from pprint import (
# saferepr,
# )
from tractor.log import get_logger
@ -75,8 +75,8 @@ class DiffDump(UserList):
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
f' |_{repr(left)},\n'
f' |_{repr(right)},\n'
')\n'
)
repstr += ']\n'
@ -144,15 +144,22 @@ def pformat(
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
else:
val_str: str = repr(v)
# XXX LOL, below just seems to be f#$%in causing
# recursion errs..
#
# the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
try:
val_str: str = saferepr(v)
except Exception:
log.exception(
'Failed to `saferepr({type(struct)})` !?\n'
)
return _Struct.__repr__(struct)
# try:
# val_str: str = saferepr(v)
# except Exception:
# log.exception(
# 'Failed to `saferepr({type(struct)})` !?\n'
# )
# raise
# return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -203,12 +210,7 @@ class Struct(
return sin_props
pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def __repr__(self) -> str:
try:
return pformat(self)
@ -218,6 +220,13 @@ class Struct(
)
return _Struct.__repr__(self)
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def copy(
self,
update: dict | None = None,
@ -267,13 +276,15 @@ class Struct(
fi.type(getattr(self, fi.name)),
)
# TODO: make a mod func instead and just point to it here for
# method impl?
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
Compare fields/items key-wise and return a `DiffDump`
for easy visual REPL comparison B)
'''
@ -290,3 +301,42 @@ class Struct(
))
return diffs
@classmethod
def fields_diff(
cls,
other: dict|Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Very similar to `PrettyStruct.__sub__()` except accepts an
input `other: dict` (presumably that would normally be called
like `Struct(**other)`) which returns a `DiffDump` of the
fields of the struct and the `dict`'s fields.
'''
nullish = object()
consumed: dict = other.copy()
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(cls):
field_name: str = fi.name
# ours: Any = getattr(self, field_name)
theirs: Any = consumed.pop(field_name, nullish)
if theirs is nullish:
diffs.append((
field_name,
f'{fi.type!r}',
'NOT-DEFINED in `other: dict`',
))
# when there are lingering fields in `other` that this struct
# DOES NOT define we also append those.
if consumed:
for k, v in consumed.items():
diffs.append((
k,
f'NOT-DEFINED for `{cls.__name__}`',
f'`other: dict` has value = {v!r}',
))
return diffs

File diff suppressed because it is too large Load Diff

View File

@ -382,7 +382,7 @@ class BroadcastReceiver(ReceiveChannel):
# likely it makes sense to unwind back to the
# underlying?
# import tractor
# await tractor.breakpoint()
# await tractor.pause()
log.warning(
f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?'