Compare commits

..

23 Commits

Author SHA1 Message Date
Tyler Goodlet 343b7c9712 Even moar bitty `Context` refinements
- set `._state._ctxvar_Context` just after `StartAck` inside
  `open_context_from_portal()` so that `current_ipc_ctx()` always
  works on the 'parent' side.
- always set `.canceller` to any `MsgTypeError.src_uid` and otherwise to
  any maybe-detected `.src_uid` (i.e. for RAEs).
- always set `.canceller` to us when we rx a ctxc which reports us as
  its canceller; this is a sanity check on definite "self cancellation".
- adjust `._is_self_cancelled()` logic to only be `True` when
  `._remote_error` is both a ctxc with a `.canceller` set to us AND
  when `Context.canceller` is also set to us (since the change above)
  as a little bit of extra rigor.
- fill-in/fix some `.repr_state` edge cases:
  - merge self-vs.-peer ctxc cases to one block and distinguish via
    nested `._is_self_cancelled()` check.
  - set 'errored' for all exception matched cases despite `.canceller`.
  - add pre-`Return` phase statuses:
   |_'pre-started' and 'syncing-to-child' depending on side and when
     `._stream` has not (yet) been set.
   |_'streaming' and 'streaming-finished' depending on side when
     `._stream` is set and whether it was stopped/closed.
- tweak drainage log-message to use "outcome" instead of "result".
- use new `.devx.pformat.pformat_cs()` inside `_maybe_cancel_and_set_remote_error()`
  but, IFF the log level is at least 'cancel'.
2024-05-08 14:02:56 -04:00
Tyler Goodlet 45f37870af Add a `.log.at_least_level()` predicate 2024-05-08 13:33:59 -04:00
Tyler Goodlet 4d528b76a0 Move `_debug.pformat_cs()` into `devx.pformat` 2024-05-08 13:30:15 -04:00
Tyler Goodlet 05b143d9ef Big debugger rework, more tolerance for internal err-hangs
Since i was running into them (internal errors) during lock request
machinery dev and was getting all sorts of difficult to understand hangs
whenever i intro-ed a bug to either side of the ipc ctx; this all while
trying to get the msg-spec working for `Lock` requesting subactors..

Deats:
- hideframes for `@acm`s and `trio.Event.wait()`, `Lock.release()`.
- better detail out the `Lock.acquire/release()` impls
- drop `Lock.remote_task_in_debug`, use new `.ctx_in_debug`.
- add a `Lock.release(force: bool)`.
- move most of what was `_acquire_debug_lock_from_root_task()` and some
  of the `lock_tty_for_child().__a[enter/exit]()` logic into
  `Lock.[acquire/release]()`  including bunch more logging.
- move `lock_tty_for_child()` up in the module to below `Lock`, with
  some rework:
  - drop `subactor_uid: tuple` arg since we can just use the `ctx`..
  - add exception handler blocks for reporting internal (impl) errors
    and always force release the lock in such cases.
- extend `DebugStatus` (prolly will rename to `DebugRequest` btw):
  - add `.req_ctx: Context` for subactor side.
  - add `.req_finished: trio.Event` to sub to signal request task exit.
  - extend `.shield_sigint()` doc-str.
  - add `.release()` to encaps all the state mgmt previously strewn
    about inside `._pause()`..
- use new `DebugStatus.release()` to replace all the duplication:
  - inside `PdbREPL.set_[continue/quit]()`.
  - inside `._pause()` for the subactor branch on internal
    repl-invocation error cases,
  - in the `_enter_repl_sync()` closure on error,
- replace `apply_debug_codec()` -> `apply_debug_pldec()` in tandem with
  the new `PldRx` sub-sys  which handles the new `__pld_spec__`.
- add a new `pformat_cs()` helper orig to help debug cs stack
  a corruption; going to move to `.devx.pformat` obvi.
- rename `wait_for_parent_stdin_hijack()` -> `request_root_stdio_lock()`
  with improvements:
  - better doc-str and add todos,
  - use `DebugStatus` more stringently to encaps all subactor req state.
  - error handling blocks for cancellation and straight up impl errors
    directly around the `.open_context()` block with the latter doing
    a `ctx.cancel()` to avoid hanging in the shielded `.req_cs` scope.
  - similar exc blocks for the func's overall body with explicit
    `log.exception()` reporting.
  - only set the new `DebugStatus.req_finished: trio.Event` in `finally`.
- rename `mk_mpdb()` -> `mk_pdb()` and don't cal `.shield_sigint()`
  implicitly since the caller usage does matter for this.
- factor out `any_connected_locker_child()` from the SIGINT handler.
- rework SIGINT handler to better handle any stale-lock/hang cases:
  - use new `Lock.ctx_in_debug: Context` to detect subactor-in-debug.
    and use it to cancel any lock request instead of the lower level
  - use `problem: str` summary approach to log emissions.
- rework `_pause()` given all of the above, stuff not yet mentioned:
  - don't take `shield: bool` input and proxy to `debug_func()` (for now).
  - drop `extra_frames_up_when_async: int` usage, expect
    `**debug_func_kwargs` to passthrough an `api_frame: Frametype` (more
    on this later).
  - lotsa asserts around the request ctx vs. task-in-debug ctx using new
    `current_ipc_ctx()`.
  - asserts around `DebugStatus` state.
- rework and simplify the `debug_func` hooks,
  `_set_trace()`/`_post_mortem()`:
  - make them accept a non-optional `repl: PdbRepl` and `api_frame:
    FrameType` which should be used to set the current frame when the
    REPL engages.
  - always hide the hook frames.
  - always accept a `tb: TracebackType` to `_post_mortem()`.
   |_ copy and re-impl what was the delegation to
     `pdbp.xpm()`/`pdbp.post_mortem()` and instead call the
     underlying `Pdb.interaction()` ourselves with a `caller_frame`
     and tb instance.
- adjust the public `.pause()` impl:
  - accept optional `hide_tb` and `api_frame` inputs.
  - mask opening a cancel-scope for now (can cause `trio` stack
    corruption, see notes) and thus don't use the `shield` input other
    then to eventually passthrough to `_post_mortem()`?
   |_ thus drop `task_status` support for now as well.
   |_ pretty sure correct soln is a debug-nursery around `._invoke()`.
- since no longer using `extra_frames_up_when_async` inside
  `debug_func()`s ensure all public apis pass a `api_frame`.
- re-impl our `tractor.post_mortem()` to directly call into `._pause()`
  instead of binding in via `partial` and mk it take similar input as
  `.pause()`.
- drop `Lock.release()` from `_maybe_enter_pm()`, expose and pass
  expected frame and tb.
- use necessary changes from all the above within
  `maybe_wait_for_debugger()` and `acquire_debug_lock()`.

Lel, sorry thought that would be shorter..
There's still a lot more re-org to do particularly with `DebugStatus`
encapsulation but it's coming in follow up.
2024-05-08 11:44:55 -04:00
Tyler Goodlet a354732a9e Allow `Stop` passthrough from `PldRx.recv_msg_w_pld()`
Since we need to allow it (at the least) inside
`drain_until_final_msg()` for handling stream-phase termination races
where we don't want to have to handle a raised error from something like
`Context.result()`. Expose the passthrough option via
a `passthrough_non_pld_msgs: bool` kwarg.

Add comprehensive comment to `current_pldrx()`.
2024-05-08 08:50:16 -04:00
Tyler Goodlet fbc21a1dec Add a "current IPC `Context`" `ContextVar`
Expose it from `._state.current_ipc_ctx()` and set it inside
`._rpc._invoke()` for child and inside `Portal.open_context()` for
parent.

Still need to write a few more tests (particularly demonstrating usage
throughout multiple nested nurseries on each side) but this suffices as
a proto for testing with some debugger request-from-subactor stuff.

Other,
- use new `.devx.pformat.add_div()` for ctxc messages.
- add a block to always traceback dump on corrupted cs stacks.
- better handle non-RAEs exception output-formatting in context
  termination summary log message.
- use a summary for `start_status` for msg logging in RPC loop.
2024-05-07 15:35:45 -04:00
Tyler Goodlet b278164f83 Mk `drain_to_final_msg()` never raise from `Error`
Since we usually want them raised from some (internal) call to
`Context.maybe_raise()` and NOT directly from the drainage call, make it
possible via a new `raise_error: bool` to both `PldRx.recv_msg_w_pld()`
and `.dec_msg()`.

In support,
- rename `return_msg` -> `result_msg` since we expect to return
  `Error`s.
- do a `result_msg` assign and `break` in the `case Error()`.
- add `**dec_msg_kwargs` passthrough for other `.dec_msg()` calling
  methods.

Other,
- drop/aggregate todo-notes around the main loop's
  `ctx._pld_rx.recv_msg_w_pld()` call.
- add (configurable) frame hiding to most payload receive meths.
2024-05-06 13:43:51 -04:00
Tyler Goodlet 8ffa6a5e68 "Icons" in `._entry`'s subactor `.info()` messages
Add a little `>` or `X` supervision icon indicating the spawning or
termination of each sub-actor respectively.
2024-05-06 13:12:44 -04:00
Tyler Goodlet 7707e0e75a Woops, make `log.devx()` level 600 2024-05-06 13:07:53 -04:00
Tyler Goodlet 523c24eb72 Move pformatters into new `.devx.pformat`
Since `._code` is prolly gonna get renamed (to something "frame & stack
tools" related) and to give a bit better organization.

Also adds a new `add_div()` helper, factored out of ctxc message
creation in `._rpc._invoke()`, for adding a little "header line" divider
under a given `message: str` with a little math to center it.
2024-05-06 13:04:58 -04:00
Tyler Goodlet 544ff5ab4c Change to `RemoteActorError.pformat()`
For more sane manual calls as needed in logging purposes. Obvi remap
the dunder methods to it.

Other:
- drop `hide_tb: bool` from `unpack_error()`, shouldn't need it since
  frame won't ever be part of any tb raised from returned error.
- add a `is_invalid_payload: bool` to `_raise_from_unexpected_msg()` to
  be used from `PldRx` where we don't need to decode the IPC
  msg, just the payload; make the error message reflect this case.
- drop commented `._portal._unwrap_msg()` since we've replaced it with
  `PldRx`'s delegation to newer `._raise_from_unexpected_msg()`.
- hide the `Portal.result()` frame by default, again.
2024-05-06 13:01:56 -04:00
Tyler Goodlet 63c23d6b82 Add todo for rigorous struct-type spec of `SpawnSpec` fields 2024-04-30 13:01:07 -04:00
Tyler Goodlet cca3206fd6 Use `log.devx()` for `stackscope` messages 2024-04-30 13:00:03 -04:00
Tyler Goodlet 54530dcf94 Type annot the proc from `trio.lowlevel.open_process()` 2024-04-30 12:59:38 -04:00
Tyler Goodlet 338395346d Tweak `breakpoint()` usage error message 2024-04-30 12:56:29 -04:00
Tyler Goodlet 30c5896d26 Fix attr name error, use public `MsgDec.dec` 2024-04-30 12:55:46 -04:00
Tyler Goodlet 88a0e90f82 Reorg frames pformatters, add `Context.repr_state`
A better spot for the pretty-formatting of frame text (and thus tracebacks)
is in the new `.devx._code` module:
- move from `._exceptions` -> `.devx._code.pformat_boxed_tb()`.
- add new `pformat_caller_frame()` factored out the use case in
  `._exceptions._mk_msg_type_err()` where we dump a stack trace
  for bad `.send()` side IPC msgs.

Add some new pretty-format methods to `Context`:
- explicitly implement `.pformat()` and allow an `extra_fields: dict`
  which can be used to inject additional fields (maybe eventually by
  default) such as is now used inside
  `._maybe_cancel_and_set_remote_error()` when reporting the internal
  `._scope` state in cancel logging.
- add a new `.repr_state -> str` which provides a single string status
  depending on the internal state of the IPC ctx in terms of the shuttle
  protocol's "phase"; use it from `.pformat()` for the `|_state:`.
- set `.started(complain_no_parity=False)` now since we presume decoding
  with `.pld: Raw` now with the new `PldRx` design.
- use new `msgops.current_pldrx()` in `mk_context()`.
2024-04-30 12:53:55 -04:00
Tyler Goodlet 40c972f0ec Mk `process_messages()` return last msg; summary logging
Not sure it's **that** useful (yet) but in theory would allow avoiding
certain log level usage around transient RPC requests for discovery methods
(like `.register_actor()` and friends); can't hurt to be able to
introspect that last message for other future cases I'd imagine as well.
Adjust the calling code in `._runtime` to match; other spots are using
the `trio.Nursery.start()` schedule style and are fine as is.

Improve a bunch more log messages throughout a few mods mostly by going
to a "summary" single-emission style where possible/appropriate:
- in `._runtime` more "single summary" status style log emissions:
 |_mk `Actor.load_modules()` render a single mod loaded summary.
 |_use a summary `con_status: str` for `Actor._stream_handler()` conn
   setup and an equiv (`con_teardown_status`) for connection teardowns.
 |_similar thing in `Actor.wait_for_actor()`.
- generally more usage of `.msg.pretty_struct` apis throughout `._runtime`.
2024-04-30 12:15:46 -04:00
Tyler Goodlet f139adddca Add a `log.devx()` level 2024-04-30 11:47:26 -04:00
Tyler Goodlet 979af79588 First draft, package with `poetry` Bo 2024-04-30 11:46:56 -04:00
Tyler Goodlet a3429268ea First draft payload-spec limit API
Add new task-scope oriented `PldRx.pld_spec` management API similar to
`.msg._codec.limit_msg_spec()`, but obvi built to process and filter
`MsgType.pld` values.

New API related changes include:
- new per-task singleton getter `msg._ops.current_pldrx()` which
  delivers the current (global) payload receiver via a new
  `_ctxvar_PldRx: ContextVar` configured with a default
  `_def_any_pldec: MsgDec[Any]` decoder.
- a `PldRx.limit_plds()` which sets the decoder (`.type` underneath)
  for the specific payload rx instance.
- `.msg._ops.limit_plds()` which obtains the current task-scoped `PldRx`
  and applies the pld spec via a new `PldRx.limit_plds()`.
- rename `PldRx._msgdec` -> `._pldec`.
- add `.pld_dec` as pub attr for -^

Unrelated adjustments:
- use `.msg.pretty_struct.pformat()` where handy.
- always pass `expect_msg: MsgType`.
- add a `case Stop()` to `PldRx.dec_msg()` which will `log.warning()`
  when a stop is received by no stream was open on this receiving side
  since we rarely want that to raise since it's prolly just a runtime
  race or mistake in user code.

Other:
2024-04-26 15:29:50 -04:00
Tyler Goodlet d285a3479a Make `.msg.types.Msg.pld: Raw` only, since `PldRx`.. 2024-04-26 13:18:06 -04:00
Tyler Goodlet 61db040702 More bitty (runtime) logging tweaks 2024-04-26 13:13:04 -04:00
20 changed files with 2198 additions and 1294 deletions

View File

@ -1,3 +1,68 @@
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
# ------ - ------
[tool.poetry]
name = "tractor"
version = "0.1.0a6dev0"
description='structured concurrent `trio`-"actors"'
authors = ["Tyler Goodlet <goodboy_foss@protonmail.com>"]
license = "AGPlv3"
readme = "docs/README.rst"
# TODO: do we need this xontrib loader at all given pep420
# and xonsh's xontrib global-autoload-via-setuptools?
# https://xon.sh/tutorial_xontrib.html#authoring-xontribs
packages = [
{include = 'tractor' },
# {include = 'tractor.experimental' },
# {include = 'tractor.trionics' },
# {include = 'tractor.msg' },
# {include = 'tractor.devx' },
]
# ------ - ------
[tool.poetry.dependencies]
python = "^3.11"
# trio runtime related
# proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
trio='^0.24'
tricycle = "^0.4.1"
trio-typing = "^0.10.0"
msgspec='^0.18.5' # interchange
wrapt = "^1.16.0" # decorators
colorlog = "^6.8.2" # logging
# .devx tooling
stackscope = "^0.2.2"
pdbp = "^1.5.0"
# TODO: distributed transport using
# linux kernel networking
# 'pyroute2
# ------ - ------
xontrib-vox = "^0.0.1"
[tool.poetry.group.dev]
optional = false
[tool.poetry.group.dev.dependencies]
pytest = "^8.2.0"
# only for xonsh as sh..
xontrib-vox = "^0.0.1"
prompt-toolkit = "^3.0.43"
xonsh-vox-tabcomplete = "^0.5"
# ------ - ------
[tool.towncrier]
package = "tractor"
filename = "NEWS.rst"
@ -27,6 +92,7 @@ all_bullets = true
name = "Trivial/Internal Changes"
showcontent = true
# ------ - ------
[tool.pytest.ini_options]
minversion = '6.0'
@ -46,3 +112,26 @@ log_cli = false
# TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
# pythonpath = "src"
# ------ - ------
[project]
keywords = [
'trio',
'async',
'concurrency',
'structured concurrency',
'actor model',
'distributed',
'multiprocessing'
]
classifiers = [
"Development Status :: 3 - Alpha",
"Operating System :: POSIX :: Linux",
"Framework :: Trio",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Topic :: System :: Distributed Computing",
]

View File

@ -25,6 +25,7 @@ from tractor._exceptions import (
StreamOverrun,
ContextCancelled,
)
from tractor._state import current_ipc_ctx
from tractor._testing import (
tractor_test,
@ -144,6 +145,8 @@ async def simple_setup_teardown(
global _state
_state = True
assert current_ipc_ctx() is ctx
# signal to parent that we're up
await ctx.started(data + 1)
@ -204,6 +207,7 @@ def test_simple_context(
block_forever=callee_blocks_forever,
) as (ctx, sent),
):
assert current_ipc_ctx() is ctx
assert sent == 11
if callee_blocks_forever:

View File

@ -37,8 +37,9 @@ import inspect
from pprint import pformat
from typing import (
Any,
Callable,
AsyncGenerator,
Callable,
Mapping,
Type,
TYPE_CHECKING,
Union,
@ -59,9 +60,11 @@ from ._exceptions import (
pack_from_raise,
unpack_error,
)
from .log import get_logger
from .log import (
get_logger,
at_least_level,
)
from .msg import (
_codec,
Error,
MsgType,
MsgCodec,
@ -84,6 +87,7 @@ from ._streaming import MsgStream
from ._state import (
current_actor,
debug_mode,
_ctxvar_Context,
)
if TYPE_CHECKING:
@ -103,7 +107,6 @@ class Unresolved:
a final return value or raised error is resolved.
'''
...
# TODO: make this a .msg.types.Struct!
@ -116,19 +119,19 @@ class Context:
NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways:
- by entering ``Portal.open_context()`` which is the primary
public API for any "caller" task or,
- by entering `Portal.open_context()` which is the primary
public API for any "parent" task or,
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
to a remotely scheduled "callee" function.
to a remotely scheduled "child" function.
AND is always constructed using the below ``mk_context()``.
AND is always constructed using the below `mk_context()`.
Allows maintaining task or protocol specific state between
2 cancel-scope-linked, communicating and parallel executing
`trio.Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is
always allocated inside ``._rpc._invoke()``.
always allocated inside `._rpc._invoke()`.
TODO: more detailed writeup on cancellation, error and
streaming semantics..
@ -262,7 +265,13 @@ class Context:
_strict_started: bool = False
_cancel_on_msgerr: bool = True
def __str__(self) -> str:
def pformat(
self,
extra_fields: dict[str, Any]|None = None,
# ^-TODO-^ some built-in extra state fields
# we'll want in some devx specific cases?
) -> str:
ds: str = '='
# ds: str = ': '
@ -279,11 +288,7 @@ class Context:
outcome_str: str = self.repr_outcome(
show_error_fields=True
)
outcome_typ_str: str = self.repr_outcome(
type_only=True
)
return (
fmtstr: str = (
f'<Context(\n'
# f'\n'
# f' ---\n'
@ -304,12 +309,12 @@ class Context:
# f' -----\n'
#
# TODO: better state `str`ids?
# -[ ] maybe map err-types to strs like 'cancelled',
# -[x] maybe map err-types to strs like 'cancelled',
# 'errored', 'streaming', 'started', .. etc.
# -[ ] as well as a final result wrapper like
# `outcome.Value`?
#
f' |_state: {outcome_typ_str}\n'
f' |_state: {self.repr_state!r}\n'
f' outcome{ds}{outcome_str}\n'
f' result{ds}{self._result}\n'
@ -324,6 +329,16 @@ class Context:
# -[ ] remove this ^ right?
# f' _remote_error={self._remote_error}
)
if extra_fields:
for key, val in extra_fields.items():
fmtstr += (
f' {key}{ds}{val!r}\n'
)
return (
fmtstr
+
')>\n'
)
# NOTE: making this return a value that can be passed to
@ -335,7 +350,8 @@ class Context:
# logging perspective over `eval()`-ability since we do NOT
# target serializing non-struct instances!
# def __repr__(self) -> str:
__repr__ = __str__
__str__ = pformat
__repr__ = pformat
@property
def cancel_called(self) -> bool:
@ -373,8 +389,12 @@ class Context:
re: BaseException|None = (
remote_error
or self._remote_error
or
self._remote_error
)
# XXX we only report "this context" as self-cancelled
# once we've received a ctxc from our direct-peer task
# (aka we're `.cancel_acked`).
if not re:
return False
@ -385,10 +405,10 @@ class Context:
our_canceller = self.canceller
return bool(
isinstance(re, ContextCancelled)
isinstance((ctxc := re), ContextCancelled)
and from_uid == self.chan.uid
and re.canceller == our_uid
and our_canceller == from_uid
and ctxc.canceller == our_uid
and our_canceller == our_uid
)
@property
@ -608,52 +628,61 @@ class Context:
)
self._remote_error: BaseException = error
msgerr: bool = False
# self-cancel (ack) or,
# peer propagated remote cancellation.
msgerr: bool = False
if isinstance(error, ContextCancelled):
# NOTE in the case error is a ctxc the canceller will
# either be another peer or us. in the case where it's us
# we mark ourself as the canceller of ourselves (a ctx
# "self cancel" from this side's perspective), if instead
# the far end was cancelled by some other (inter-) peer,
# we want to mark our canceller as the actor that was
# cancelled, NOT their reported canceller. IOW in the
# latter case we're cancelled by someone else getting
# cancelled.
if (canc := error.canceller) == self._actor.uid:
whom: str = 'us'
self._canceller = canc
else:
whom = 'a remote peer (not us)'
self._canceller = error.src_uid
whom: str = (
'us' if error.canceller == self._actor.uid
else 'peer'
)
log.cancel(
f'IPC context cancelled by {whom}!\n\n'
f'IPC context was cancelled by {whom}!\n\n'
f'{error}'
)
elif isinstance(error, MsgTypeError):
msgerr = True
self._canceller = error.src_uid
log.error(
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
f'{error}\n'
f'{pformat(self)}\n'
)
else:
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
maybe_error_src_uid: tuple = getattr(
error,
'src_uid',
None,
)
# we mark the source actor as our canceller
self._canceller = maybe_error_src_uid
log.error(
f'Remote context error:\n\n'
# f'{pformat(self)}\n'
f'{error}\n'
)
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
maybe_error_src: tuple = getattr(
error,
'src_uid',
None,
)
self._canceller = (
maybe_error_src
or
# XXX: in the case we get a non-boxed error?
# -> wait but this should never happen right?
self.chan.uid
)
if self._canceller is None:
log.error('Ctx has no canceller set!?')
# Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error
@ -696,29 +725,35 @@ class Context:
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
scope_info: str = 'No `self._scope: CancelScope` was set/used ?'
if cs:
scope_info: str = (
f'self._scope: {cs}\n'
f'|_ .cancel_called: {cs.cancel_called}\n'
f'|_ .cancelled_caught: {cs.cancelled_caught}\n'
f'|_ ._cancel_status: {cs._cancel_status}\n\n'
f'{self}\n'
f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n'
f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n'
f'msgerr: {msgerr}\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
if (
cs
and
at_least_level(log=log, level='cancel')
):
fmt_str: str = self.pformat(
extra_fields={
'._is_self_cancelled()': self._is_self_cancelled(),
'._cancel_on_msgerr': self._cancel_on_msgerr,
}
)
from .devx.pformat import pformat_cs
cs_fmt: str = pformat_cs(
cs,
var_name='Context._scope',
)
fmt_str += (
'\n'
+
cs_fmt
)
log.cancel(
message
+
f'{scope_info}'
fmt_str
)
# TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs?
# TODO: add to `Channel`?
# TODO: also add to `Channel`?
@property
def dst_maddr(self) -> str:
chan: Channel = self.chan
@ -748,7 +783,7 @@ class Context:
)
return (
# f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:'
f'{self._nsf}() -> {outcome_str}:'
f'{self._nsf}() -> {outcome_str}'
)
@property
@ -836,7 +871,7 @@ class Context:
if not self._portal:
raise InternalError(
'No portal found!?\n'
'Why is this supposed caller context missing it?'
'Why is this supposed {self.side!r}-side ctx task missing it?!?'
)
cid: str = self.cid
@ -1091,7 +1126,8 @@ class Context:
f'ctx id: {self.cid}'
)
# TODO: replace all the instances of this!! XD
# TODO: replace all the `._maybe_raise_remote_err()` usage
# with instances of this!!
def maybe_raise(
self,
hide_tb: bool = True,
@ -1102,6 +1138,7 @@ class Context:
if re := self._remote_error:
return self._maybe_raise_remote_err(
re,
hide_tb=hide_tb,
**kwargs,
)
@ -1203,7 +1240,6 @@ class Context:
# runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True
raise remote_error # from None
# TODO: change to `.wait_for_result()`?
@ -1254,8 +1290,15 @@ class Context:
# wait for a final context result/error by "draining"
# (by more or less ignoring) any bi-dir-stream "yield"
# msgs still in transit from the far end.
#
# XXX NOTE XXX: this call shouldn't really ever raise
# (other then internal error), instead delivering an
# `Error`-msg and that being `.maybe_raise()`-ed below
# since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set
# any `.maybe_error`.
(
return_msg,
outcome_msg,
drained_msgs,
) = await msgops.drain_to_final_msg(
ctx=self,
@ -1273,13 +1316,18 @@ class Context:
f'{msg}\n'
)
log.cancel(
'Ctx drained pre-result msgs:\n'
f'{pformat(drained_msgs)}\n\n'
f'Final return msg:\n'
f'{return_msg}\n'
drained_status: str = (
'Ctx drained to final outcome msg\n\n'
f'{outcome_msg}\n'
)
if drained_msgs:
drained_status += (
'\n'
f'The pre-drained msgs are\n'
f'{pformat(drained_msgs)}\n'
)
log.cancel(drained_status)
self.maybe_raise(
# NOTE: obvi we don't care if we
@ -1310,7 +1358,7 @@ class Context:
@property
def maybe_error(self) -> BaseException|None:
le: Exception|None = self._local_error
le: BaseException|None = self._local_error
re: RemoteActorError|ContextCancelled|None = self._remote_error
match (le, re):
@ -1338,7 +1386,7 @@ class Context:
# ContextCancelled(canceller=),
# ):
error: Exception|None = le or re
error: BaseException|None = le or re
if error:
return error
@ -1443,6 +1491,76 @@ class Context:
repr(self._result)
)
@property
def repr_state(self) -> str:
'''
A `str`-status describing the current state of this
inter-actor IPC context in terms of the current "phase" state
of the SC shuttling dialog protocol.
'''
merr: Exception|None = self.maybe_error
outcome: Unresolved|Exception|Any = self.outcome
status: str|None = None
match (
outcome,
merr,
):
# "graceful" ctx cancellation
case (
Unresolved,
ContextCancelled(),
):
if self._is_self_cancelled():
status = 'self-cancelled'
elif (
self.canceller
and not self._cancel_called
):
status = 'peer-cancelled'
# (remote) error condition
case (
Unresolved,
BaseException(), # any error-type
):
status = 'errored'
# result already returned
case (
_, # any non-unresolved value
None,
) if self._final_result_is_set():
status = 'returned'
# normal operation but still in a pre-`Return`-result
# dialog phase
case (
Unresolved, # noqa (ruff, you so weird..)
None, # no (remote) error set
):
if stream := self._stream:
if stream.closed:
status = 'streaming-finished'
else:
status = 'streaming'
elif self._started_called:
status = 'started'
else:
if self.side == 'child':
status = 'pre-started'
else:
status = 'syncing-to-child'
if status is None:
status = '??unknown??'
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
return status
async def started(
self,
@ -1451,7 +1569,11 @@ class Context:
value: PayloadT|None = None,
strict_parity: bool = False,
complain_no_parity: bool = True,
# TODO: this will always emit now that we do `.pld: Raw`
# passthrough.. so maybe just only complain when above strict
# flag is set?
complain_no_parity: bool = False,
) -> None:
'''
@ -1511,18 +1633,19 @@ class Context:
)
raise RuntimeError(
'Failed to roundtrip `Started` msg?\n'
f'{pformat(rt_started)}\n'
f'{pretty_struct.pformat(rt_started)}\n'
)
if rt_started != started_msg:
# TODO: break these methods out from the struct subtype?
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
rt_started,
started_msg,
)
complaint: str = (
'Started value does not match after codec rountrip?\n\n'
'Started value does not match after roundtrip?\n\n'
f'{diff}'
)
@ -1538,8 +1661,6 @@ class Context:
else:
log.warning(complaint)
# started_msg = rt_started
await self.chan.send(started_msg)
# raise any msg type error NO MATTER WHAT!
@ -1667,7 +1788,6 @@ class Context:
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
f'{flow_body}'
f'{pformat(re)}\n'
)
self._cancel_msg: dict = msg
@ -1932,6 +2052,7 @@ async def open_context_from_portal(
)
assert ctx._remote_func_type == 'context'
assert ctx._caller_info
_ctxvar_Context.set(ctx)
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
@ -2085,7 +2206,7 @@ async def open_context_from_portal(
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# exit silently.
# finish silently.
if (
ctx._cancel_called
and
@ -2210,6 +2331,11 @@ async def open_context_from_portal(
try:
result_or_err: Exception|Any = await ctx.result()
except BaseException as berr:
# cancelled before (or maybe during?) final result capture
# if isinstance(trio.Cancelled, berr):
# from .devx import mk_pdb
# mk_pdb.set_trace()
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
@ -2354,7 +2480,7 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and
# exit!
log.runtime(
'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'uid: {uid}\n'
f'cid: {ctx.cid}\n'
)
@ -2390,10 +2516,8 @@ def mk_context(
from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
pld_rx = msgops.PldRx(
# _rx_mc=recv_chan,
_msgdec=_codec.mk_dec(spec=pld_spec)
)
# TODO: when/how do we apply `.limit_plds()` from here?
pld_rx: msgops.PldRx = msgops.current_pldrx()
ctx = Context(
chan=chan,
@ -2407,12 +2531,12 @@ def mk_context(
_caller_info=caller_info,
**kwargs,
)
# TODO: we can drop the old placeholder yah?
# ctx._result: int | Any = id(ctx)
ctx._result = Unresolved
return ctx
# TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types
def context(func: Callable) -> Callable:
'''
Mark an (async) function as an SC-supervised, inter-`Actor`,
@ -2426,8 +2550,8 @@ def context(func: Callable) -> Callable:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore
sig = inspect.signature(func)
params = sig.parameters
sig: inspect.Signature = inspect.signature(func)
params: Mapping = sig.parameters
if 'ctx' not in params:
raise TypeError(
"The first argument to the context function "

View File

@ -20,6 +20,7 @@ Sub-process entry points.
"""
from __future__ import annotations
from functools import partial
# import textwrap
from typing import (
Any,
TYPE_CHECKING,
@ -91,7 +92,7 @@ def _mp_main(
pass # handle it the same way trio does?
finally:
log.info(f"Actor {actor.uid} terminated")
log.info(f"Subactor {actor.uid} terminated")
def _trio_main(
@ -125,9 +126,11 @@ def _trio_main(
f' loglevel: {actor.loglevel}\n'
)
log.info(
'Started new trio process:\n'
'Started new trio subactor:\n'
+
actor_info
'>\n' # like a "started/play"-icon from super perspective
+
actor_info,
)
try:
@ -146,7 +149,9 @@ def _trio_main(
finally:
log.info(
'Actor terminated\n'
'Subactor terminated\n'
+
'x\n' # like a "crossed-out/killed" from super perspective
+
actor_info
)

View File

@ -46,7 +46,6 @@ from tractor.msg import (
Error,
MsgType,
Stop,
Yield,
types as msgtypes,
MsgCodec,
MsgDec,
@ -140,71 +139,6 @@ def get_err_type(type_name: str) -> BaseException|None:
return type_ref
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
# orig
# f' |\n'
# f' ------ - ------\n\n'
# f'{tb_str}\n'
# f' ------ - ------\n'
# f' _|\n'
f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pack_from_raise(
local_err: (
ContextCancelled
@ -277,6 +211,8 @@ class RemoteActorError(Exception):
) -> None:
super().__init__(message)
# for manual display without having to muck with `Exception.args`
self._message: str = message
# TODO: maybe a better name?
# - .errtype
# - .retype
@ -504,30 +440,40 @@ class RemoteActorError(Exception):
reprol_str: str = (
f'{type(self).__name__}' # type name
f'[{self.boxed_type_str}]' # parameterized by boxed type
'(' # init-style look
)
_repr: str = self._mk_fields_str(
self.reprol_fields,
end_char=' ',
)
if _repr:
reprol_str += '(' # init-style call
return (
reprol_str
+
_repr
)
def __repr__(self) -> str:
def pformat(self) -> str:
'''
Nicely formatted boxed error meta data + traceback.
Nicely formatted boxed error meta data + traceback, OR just
the normal message from `.args` (for eg. as you'd want shown
by a locally raised `ContextCancelled`).
'''
tb_str: str = self.tb_str
if tb_str:
fields: str = self._mk_fields_str(
_body_fields
+
self.extra_body_fields,
)
from tractor.devx import (
pformat_boxed_tb,
)
body: str = pformat_boxed_tb(
tb_str=self.tb_str,
tb_str=tb_str,
fields_str=fields,
field_prefix=' |_',
# ^- is so that it's placed like so,
@ -535,12 +481,20 @@ class RemoteActorError(Exception):
# |___ ..
tb_body_indent=1,
)
else:
body: str = textwrap.indent(
self._message,
prefix=' ',
) + '\n'
return (
f'<{type(self).__name__}(\n'
f'{body}'
')>'
)
__repr__ = pformat
__str__ = pformat
def unwrap(
self,
) -> BaseException:
@ -870,12 +824,9 @@ def pack_error(
def unpack_error(
msg: Error,
chan: Channel|None = None,
chan: Channel,
box_type: RemoteActorError = RemoteActorError,
hide_tb: bool = True,
) -> None|Exception:
'''
Unpack an 'error' message from the wire
@ -885,12 +836,10 @@ def unpack_error(
which is the responsibilitiy of the caller.
'''
__tracebackhide__: bool = hide_tb
if not isinstance(msg, Error):
return None
# retrieve the remote error's encoded details from fields
# retrieve the remote error's msg-encoded details
tb_str: str = msg.tb_str
message: str = (
f'{chan.uid}\n'
@ -919,7 +868,6 @@ def unpack_error(
# original source error.
elif boxed_type_str == 'RemoteActorError':
assert boxed_type is RemoteActorError
# assert len(error_dict['relay_path']) >= 1
assert len(msg.relay_path) >= 1
exc = box_type(
@ -1004,8 +952,6 @@ def _raise_from_unexpected_msg(
raise unpack_error(
msg,
ctx.chan,
hide_tb=hide_tb,
) from src_err
# `MsgStream` termination msg.
@ -1075,6 +1021,7 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None,
is_invalid_payload: bool = False,
) -> MsgTypeError:
'''
@ -1089,17 +1036,13 @@ def _mk_msg_type_err(
'`codec` must be a `MsgCodec` for send-side errors?'
)
from tractor.devx import (
pformat_caller_frame,
)
# no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part.
if message is None:
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
field_prefix=' ',
indent='',
)
tb_fmt: str = pformat_caller_frame(stack_limit=3)
message: str = (
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
@ -1135,6 +1078,16 @@ def _mk_msg_type_err(
return msgtyperr
# `Channel.recv()` case
else:
if is_invalid_payload:
msg_type: str = type(msg)
message: str = (
f'invalid `{msg_type.__qualname__}` payload\n\n'
f'<{type(msg).__qualname__}(\n'
f' |_pld: {codec.pld_spec_str} = {msg.pld!r}'
f')>\n'
)
else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
@ -1147,12 +1100,9 @@ def _mk_msg_type_err(
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
if verb_header:
message = f'{verb_header} ' + message
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
# XXX be "fancy" and see if we can determine the exact
# invalid field such that we can comprehensively report
# the specific field's type problem.
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
@ -1177,6 +1127,9 @@ def _mk_msg_type_err(
f')>'
)
if verb_header:
message = f'{verb_header} ' + message
msgtyperr = MsgTypeError.from_decode(
message=message,
msgdict=msg_dict,

View File

@ -68,40 +68,6 @@ if TYPE_CHECKING:
log = get_logger(__name__)
# TODO: remove and/or rework?
# -[ ] rename to `unwrap_result()` and use
# `._raise_from_unexpected_msg()` (after tweak to accept a `chan:
# Channel` arg) in key block??
# -[ ] pretty sure this is entirely covered by
# `_exceptions._raise_from_unexpected_msg()` so REMOVE!
# def _unwrap_msg(
# msg: Return|Error,
# ctx: Context,
# hide_tb: bool = True,
# ) -> Any:
# '''
# Unwrap a final result from a `{return: <Any>}` IPC msg.
# '''
# __tracebackhide__: bool = hide_tb
# try:
# return msg.pld
# except AttributeError as err:
# # internal error should never get here
# # assert msg.get('cid'), (
# assert msg.cid, (
# "Received internal error at portal?"
# )
# raise unpack_error(
# msg,
# ctx.chan,
# ) from err
class Portal:
'''
A 'portal' to a memory-domain-separated `Actor`.
@ -173,12 +139,13 @@ class Portal:
portal=self,
)
# @api_frame
async def result(self) -> Any:
'''
Return the result(s) from the remote actor's "main" task.
'''
# __tracebackhide__ = True
__tracebackhide__ = True
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
@ -254,11 +221,11 @@ class Portal:
return False
reminfo: str = (
f'`Portal.cancel_actor()` => {self.channel.uid}\n'
f'Portal.cancel_actor() => {self.channel.uid}\n'
f'|_{chan}\n'
)
log.cancel(
f'Sending runtime `.cancel()` request to peer\n\n'
f'Requesting runtime cancel for peer\n\n'
f'{reminfo}'
)
@ -435,7 +402,6 @@ class Portal:
yield stream
finally:
# cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one
@ -496,7 +462,7 @@ class LocalPortal:
async def open_portal(
channel: Channel,
nursery: trio.Nursery|None = None,
tn: trio.Nursery|None = None,
start_msg_loop: bool = True,
shield: bool = False,
@ -504,15 +470,19 @@ async def open_portal(
'''
Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing (normally
done by the actor-runtime implicitly).
Spawns a background task to handle RPC processing, normally
done by the actor-runtime implicitly via a call to
`._rpc.process_messages()`. just after connection establishment.
'''
actor = current_actor()
assert actor
was_connected: bool = False
async with maybe_open_nursery(nursery, shield=shield) as nursery:
async with maybe_open_nursery(
tn,
shield=shield,
) as tn:
if not channel.connected():
await channel.connect()
@ -524,7 +494,7 @@ async def open_portal(
msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop:
from ._runtime import process_messages
msg_loop_cs = await nursery.start(
msg_loop_cs = await tn.start(
partial(
process_messages,
actor,
@ -544,7 +514,7 @@ async def open_portal(
await channel.aclose()
# cancel background msg loop task
if msg_loop_cs:
if msg_loop_cs is not None:
msg_loop_cs.cancel()
nursery.cancel_scope.cancel()
tn.cancel_scope.cancel()

View File

@ -124,8 +124,9 @@ async def open_root_actor(
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'`tractor` blocks built-in `breakpoint()` calls by default!\n'
'If you need to us it please install `greenback` and set '
'Trying to use `breakpoint()` eh?\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)

View File

@ -57,6 +57,7 @@ from ._exceptions import (
from .devx import (
maybe_wait_for_debugger,
_debug,
add_div,
)
from . import _state
from .log import get_logger
@ -64,11 +65,13 @@ from .msg import (
current_codec,
MsgCodec,
NamespacePath,
pretty_struct,
)
from tractor.msg.types import (
CancelAck,
Error,
Msg,
MsgType,
Return,
Start,
StartAck,
@ -248,6 +251,9 @@ async def _errors_relayed_via_ipc(
) -> None:
__tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()`
try:
yield # run RPC invoke body
@ -271,6 +277,8 @@ async def _errors_relayed_via_ipc(
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
#
# -[ ] this if check is duplicate with `._maybe_enter_pm()`..
if not is_multi_cancelled(err):
entered_debug: bool = False
if (
@ -294,7 +302,6 @@ async def _errors_relayed_via_ipc(
)
)
):
# await _debug.pause()
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this
@ -302,7 +309,14 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
entered_debug = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if not entered_debug:
log.exception(
'RPC task crashed\n'
@ -432,6 +446,8 @@ async def _invoke(
)
context: bool = False
assert not _state._ctxvar_Context.get()
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
@ -555,6 +571,7 @@ async def _invoke(
async with trio.open_nursery() as tn:
ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx)
# TODO: should would be nice to have our
@ -590,7 +607,6 @@ async def _invoke(
cs: CancelScope = ctx._scope
if cs.cancel_called:
canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -619,23 +635,9 @@ async def _invoke(
else:
explain += 'a remote peer'
# TODO: move this "div centering" into
# a helper for use elsewhere!
div_chars: str = '------ - ------'
div_offset: int = (
round(len(explain)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
explain += (
div_str +
add_div(message=explain)
+
f'<= canceller: {canceller}\n'
f'=> cancellee: {our_uid}\n'
# TODO: better repr for ctx tasks..
@ -662,10 +664,10 @@ async def _invoke(
boxed_type=trio.Cancelled,
canceller=canceller,
)
# assign local error so that the `.outcome`
# resolves to an error for both reporting and
# state checks.
ctx._local_error = ctxc
# does this matter other then for
# consistentcy/testing? |_ no user code should be
# in this scope at this point..
# ctx._local_error = ctxc
raise ctxc
# XXX: do we ever trigger this block any more?
@ -675,6 +677,13 @@ async def _invoke(
BaseException,
) as scope_error:
if (
isinstance(scope_error, RuntimeError)
and scope_error.args
and 'Cancel scope stack corrupted' in scope_error.args[0]
):
log.exception('Cancel scope stack corrupted!?\n')
# _debug.mk_pdb().set_trace()
# always set this (child) side's exception as the
# local error on the context
@ -708,17 +717,32 @@ async def _invoke(
res_type_str,
res_str,
) = (
('error', f'{type(merr)}',)
if merr
('error', f'{type(merr)}',) if merr
else (
'result',
f'`{repr(ctx.outcome)}`',
)
)
log.runtime(
message: str = (
f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}'
)
if merr:
from tractor import RemoteActorError
if not isinstance(merr, RemoteActorError):
fmt_merr: str = (
f'\n{merr!r}\n'
# f'{merr.args[0]!r}\n'
)
else:
fmt_merr = f'\n{merr!r}'
log.error(
message
+
fmt_merr
)
else:
log.runtime(message)
async def try_ship_error_to_remote(
@ -774,7 +798,10 @@ async def process_messages(
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
) -> (
bool, # chan diconnected
MsgType, # last msg
):
'''
This is the low-level, per-IPC-channel, RPC task scheduler loop.
@ -816,11 +843,6 @@ async def process_messages(
# |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
log.runtime(
'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
nursery_cancelled_before_task: bool = False
msg: Msg|None = None
try:
@ -834,12 +856,15 @@ async def process_messages(
async for msg in chan:
log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n'
f'IPC msg from peer\n'
f'<= {chan.uid}\n\n'
# TODO: avoid fmting depending on loglevel for perf?
# -[ ] specifically `pformat()` sub-call..?
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this?
# -[ ] use `.msg.pretty_struct` here now instead!
f'{pformat(msg)}\n'
# f'{pretty_struct.pformat(msg)}\n'
f'{msg}\n'
)
match msg:
@ -952,11 +977,19 @@ async def process_messages(
kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid,
):
log.runtime(
'Handling RPC `Start` request from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
start_status: str = (
'Handling RPC `Start` request\n'
f'<= peer: {actorid}\n\n'
f' |_{chan}\n'
f' |_cid: {cid}\n\n'
# f' |_{ns}.{funcname}({kwargs})\n'
f'>> {actor.uid}\n'
f' |_{actor}\n'
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
# f' |_{ns}.{funcname}({kwargs})\n\n'
# f'{pretty_struct.pformat(msg)}\n'
)
# runtime-internal endpoint: `Actor.<funcname>`
@ -985,6 +1018,10 @@ async def process_messages(
await chan.send(err_msg)
continue
start_status += (
f' -> func: {func}\n'
)
# schedule a task for the requested RPC function
# in the actor's main "service nursery".
#
@ -992,18 +1029,8 @@ async def process_messages(
# supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks`
# table?
log.runtime(
f'Spawning task for RPC request\n'
f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n'
# ^-TODO-^ maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n'
f'=> {actor}\n'
f' |_cid: {cid}\n'
f' |>> {func}()\n'
)
start_status += ' -> scheduling new task..\n'
log.runtime(start_status)
try:
ctx: Context = await actor._service_n.start(
partial(
@ -1031,8 +1058,9 @@ async def process_messages(
# scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception):
log.warning(
'Task for RPC failed?'
f'|_ {func}()\n\n'
start_status
+
' -> task for RPC failed?\n\n'
f'{err}'
)
continue
@ -1097,25 +1125,24 @@ async def process_messages(
parent_chan=chan,
)
except (
TransportClosed,
):
except TransportClosed:
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up..
# TODO: add a teardown handshake? and,
#
# TODO: maybe add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
# -[ ] figure out how this will break with other transports?
log.runtime(
f'channel closed abruptly with\n'
f'peer: {chan.uid}\n'
f'IPC channel closed abruptly\n'
f'<=x peer: {chan.uid}\n'
f' |_{chan.raddr}\n'
)
# transport **WAS** disconnected
return True
return (True, msg)
except (
Exception,
@ -1152,12 +1179,17 @@ async def process_messages(
finally:
# msg debugging for when he machinery is brokey
log.runtime(
if msg is None:
message: str = 'Exiting IPC msg loop without receiving a msg?'
else:
message: str = (
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
f'{pformat(msg)}\n\n'
f'{pretty_struct.pformat(msg)}'
)
log.runtime(message)
# transport **WAS NOT** disconnected
return False
return (False, msg)

View File

@ -49,6 +49,7 @@ from pprint import pformat
import signal
import sys
from typing import (
Any,
Callable,
TYPE_CHECKING,
)
@ -68,7 +69,7 @@ from tractor.msg import (
pretty_struct,
NamespacePath,
types as msgtypes,
Msg,
MsgType,
)
from ._ipc import Channel
from ._context import (
@ -96,19 +97,6 @@ from ._rpc import (
process_messages,
try_ship_error_to_remote,
)
# from tractor.msg.types import (
# Aid,
# SpawnSpec,
# Start,
# StartAck,
# Started,
# Yield,
# Stop,
# Return,
# Error,
# )
if TYPE_CHECKING:
@ -315,29 +303,32 @@ class Actor:
self._reg_addrs = addrs
async def wait_for_peer(
self, uid: tuple[str, str]
self,
uid: tuple[str, str],
) -> tuple[trio.Event, Channel]:
'''
Wait for a connection back from a spawned actor with a `uid`
using a `trio.Event` for sync.
Wait for a connection back from a (spawned sub-)actor with
a `uid` using a `trio.Event` for sync.
'''
log.runtime(f"Waiting for peer {uid} to connect")
log.debug(f'Waiting for peer {uid!r} to connect')
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.runtime(f"{uid} successfully connected back to us")
log.debug(f'{uid!r} successfully connected back to us')
return event, self._peers[uid][-1]
def load_modules(
self,
debug_mode: bool = False,
# debug_mode: bool = False,
) -> None:
'''
Load enabled RPC py-modules locally (after process fork/spawn).
Load explicitly enabled python modules from local fs after
process spawn.
Since this actor may be spawned on a different machine from
the original nursery we need to try and load the local module
code (presuming it exists).
code manually (presuming it exists).
'''
try:
@ -350,16 +341,21 @@ class Actor:
_mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path'])
status: str = 'Attempting to import enabled modules:\n'
for modpath, filepath in self.enable_modules.items():
# XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath))
log.runtime(f"Attempting to import {modpath}@{filepath}")
mod = importlib.import_module(modpath)
status += (
f'|_{modpath!r} -> {filepath!r}\n'
)
mod: ModuleType = importlib.import_module(modpath)
self._mods[modpath] = mod
if modpath == '__main__':
self._mods['__mp_main__'] = mod
log.runtime(status)
except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error
# will be raised later
@ -413,21 +409,23 @@ class Actor:
chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid
con_msg: str = ''
if their_uid:
con_status: str = ''
# TODO: remove this branch since can never happen?
# NOTE: `.uid` is only set after first contact
con_msg = (
'IPC Re-connection from already known peer? '
if their_uid:
con_status = (
'IPC Re-connection from already known peer?\n'
)
else:
con_msg = (
'New IPC connection to us '
con_status = (
'New inbound IPC connection <=\n'
)
con_msg += (
f'<= @{chan.raddr}\n'
con_status += (
f'|_{chan}\n'
# f' |_@{chan.raddr}\n\n'
# ^-TODO-^ remove since alfready in chan.__repr__()?
)
# send/receive initial handshake response
try:
@ -447,13 +445,13 @@ class Actor:
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(
con_msg
con_status
+
' -> But failed to handshake? Ignoring..\n'
)
return
con_msg += (
con_status += (
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
)
# IPC connection tracking for both peers and new children:
@ -466,7 +464,7 @@ class Actor:
None,
)
if event:
con_msg += (
con_status += (
' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
@ -477,7 +475,7 @@ class Actor:
event.set()
else:
con_msg += (
con_status += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore
@ -491,13 +489,18 @@ class Actor:
# TODO: can we just use list-ref directly?
chans.append(chan)
log.runtime(con_msg)
con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
# Begin channel management - respond to remote requests and
# process received reponses.
disconnected: bool = False
last_msg: MsgType
try:
disconnected: bool = await process_messages(
(
disconnected,
last_msg,
) = await process_messages(
self,
chan,
)
@ -598,16 +601,24 @@ class Actor:
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry = local_nursery._children.get(uid)
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
and
poll() is None # proc still alive
):
log.cancel(
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
@ -616,17 +627,17 @@ class Actor:
# ``Channel`` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected
log.runtime(
f'Disconnected IPC channel:\n'
f'uid: {chan.uid}\n'
f'|_{pformat(chan)}\n'
con_teardown_status: str = (
f'IPC channel disconnected:\n'
f'<=x uid: {chan.uid}\n'
f' |_{pformat(chan)}\n\n'
)
chans.remove(chan)
# TODO: do we need to be this pedantic?
if not chans:
log.runtime(
f'No more channels with {chan.uid}'
con_teardown_status += (
f'-> No more channels with {chan.uid}'
)
self._peers.pop(uid, None)
@ -640,15 +651,16 @@ class Actor:
f' |_[{i}] {pformat(chan)}\n'
)
log.runtime(
f'Remaining IPC {len(self._peers)} peers:\n'
+ peers_str
con_teardown_status += (
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
)
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
con_teardown_status += (
'Signalling no more peer channel connections'
)
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
@ -723,13 +735,16 @@ class Actor:
# TODO: figure out why this breaks tests..
db_cs.cancel()
log.runtime(con_teardown_status)
# finally block closure
# TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD
async def _deliver_ctx_payload(
self,
chan: Channel,
cid: str,
msg: Msg|MsgTypeError,
msg: MsgType|MsgTypeError,
) -> None|bool:
'''
@ -754,7 +769,7 @@ class Actor:
# XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n'
f'{pretty_struct.Struct.pformat(msg)}\n'
f'{pretty_struct.pformat(msg)}\n'
)
return
@ -896,9 +911,11 @@ class Actor:
cid=cid,
)
log.runtime(
'Sending RPC start msg\n\n'
'Sending RPC `Start`\n\n'
f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n'
f' |_ {ns}.{func}({kwargs})\n\n'
f'{pretty_struct.pformat(msg)}'
)
await chan.send(msg)
@ -955,31 +972,29 @@ class Actor:
if self._spawn_method == "trio":
# Receive runtime state from our parent
# parent_data: dict[str, Any]
# parent_data = await chan.recv()
# TODO: maybe we should just wrap this directly
# in a `Actor.spawn_info: SpawnInfo` struct?
# Receive post-spawn runtime state from our parent.
spawnspec: msgtypes.SpawnSpec = await chan.recv()
self._spawn_spec = spawnspec
log.runtime(
'Received runtime spec from parent:\n\n'
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
log.runtime(
'Received runtime spec from parent:\n\n'
f'{pformat(spawnspec)}\n'
f'{pretty_struct.pformat(spawnspec)}\n'
)
# accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
# rvs = parent_data.pop('_runtime_vars')
rvs = spawnspec._runtime_vars
# TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']:
try:
log.info(
# TODO: maybe return some status msgs upward
# to that we can emit them in `con_status`
# instead?
log.devx(
'Enabling `stackscope` traces on SIGUSR1'
)
from .devx import enable_stack_on_sig
@ -989,7 +1004,6 @@ class Actor:
'`stackscope` not installed for use in debug mode!'
)
log.runtime(f'Runtime vars are: {rvs}')
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
@ -1006,18 +1020,12 @@ class Actor:
for val in spawnspec.reg_addrs
]
# for attr, value in parent_data.items():
# TODO: better then monkey patching..
# -[ ] maybe read the actual f#$-in `._spawn_spec` XD
for _, attr, value in pretty_struct.iter_fields(
spawnspec,
):
setattr(self, attr, value)
# if (
# attr == 'reg_addrs'
# and value
# ):
# self.reg_addrs = [tuple(val) for val in value]
# else:
# setattr(self, attr, value)
return (
chan,
@ -1026,12 +1034,11 @@ class Actor:
except OSError: # failed to connect
log.warning(
f'Failed to connect to parent!?\n\n'
'Closing IPC [TCP] transport server to\n'
f'{parent_addr}\n'
f'Failed to connect to spawning parent actor!?\n'
f'x=> {parent_addr}\n'
f'|_{self}\n\n'
)
await self.cancel(chan=None) # self cancel
await self.cancel(req_chan=None) # self cancel
raise
async def _serve_forever(
@ -1109,8 +1116,7 @@ class Actor:
# chan whose lifetime limits the lifetime of its remotely
# requested and locally spawned RPC tasks - similar to the
# supervision semantics of a nursery wherein the actual
# implementation does start all such tasks in
# a sub-nursery.
# implementation does start all such tasks in a sub-nursery.
req_chan: Channel|None,
) -> bool:
@ -1151,7 +1157,7 @@ class Actor:
# other) repr fields instead of doing this all manual..
msg: str = (
f'Runtime cancel request from {requester_type}:\n\n'
f'<= .cancel(): {requesting_uid}\n'
f'<= .cancel(): {requesting_uid}\n\n'
)
# TODO: what happens here when we self-cancel tho?
@ -1166,8 +1172,8 @@ class Actor:
dbcs = _debug.DebugStatus.req_cs
if dbcs is not None:
msg += (
'>> Cancelling active debugger request..\n'
f'|_{_debug.Lock}\n'
'-> Cancelling active debugger request..\n'
f'|_{_debug.Lock.pformat()}'
)
dbcs.cancel()
@ -1418,7 +1424,12 @@ class Actor:
'''
if self._server_n:
log.runtime("Shutting down channel server")
# TODO: obvi a different server type when we eventually
# support some others XD
server_prot: str = 'TCP'
log.runtime(
f'Cancelling {server_prot} server'
)
self._server_n.cancel_scope.cancel()
return True
@ -1602,6 +1613,7 @@ async def async_main(
assert accept_addrs
try:
# TODO: why is this not with the root nursery?
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
@ -1886,13 +1898,13 @@ class Arbiter(Actor):
sockaddrs: list[tuple[str, int]] = []
sockaddr: tuple[str, int]
for (aname, _), sockaddr in self._registry.items():
log.runtime(
f'Actor mailbox info:\n'
f'aname: {aname}\n'
f'sockaddr: {sockaddr}\n'
mailbox_info: str = 'Actor registry contact infos:\n'
for uid, sockaddr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_sockaddr: {sockaddr}\n\n'
)
if name == aname:
if name == uid[0]:
sockaddrs.append(sockaddr)
if not sockaddrs:
@ -1904,6 +1916,7 @@ class Arbiter(Actor):
if not isinstance(uid, trio.Event):
sockaddrs.append(self._registry[uid])
log.runtime(mailbox_info)
return sockaddrs
async def register_actor(

View File

@ -451,10 +451,9 @@ async def trio_proc(
proc: trio.Process|None = None
try:
try:
# TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process(spawn_cmd)
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd)
log.runtime(
'Started new sub-proc\n'
'Started new child\n'
f'|_{proc}\n'
)

View File

@ -19,13 +19,19 @@ Per process state
"""
from __future__ import annotations
from contextvars import (
ContextVar,
)
from typing import (
Any,
TYPE_CHECKING,
)
from trio.lowlevel import current_task
if TYPE_CHECKING:
from ._runtime import Actor
from ._context import Context
_current_actor: Actor|None = None # type: ignore # noqa
@ -110,3 +116,20 @@ def debug_mode() -> bool:
def is_root_process() -> bool:
return _runtime_vars['_is_root']
_ctxvar_Context: ContextVar[Context] = ContextVar(
'ipc_context',
default=None,
)
def current_ipc_ctx() -> Context:
ctx: Context = _ctxvar_Context.get()
if not ctx:
from ._exceptions import InternalError
raise InternalError(
'No IPC context has been allocated for this task yet?\n'
f'|_{current_task()}\n'
)
return ctx

View File

@ -364,14 +364,10 @@ class MsgStream(trio.abc.Channel):
if not self._eoc:
message: str = (
f'Context stream closed by {self._ctx.side!r}\n'
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
f'|_{self}\n'
)
log.cancel(
'Stream self-closed before receiving EoC\n\n'
+
message
)
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?

View File

@ -30,7 +30,13 @@ from ._debug import (
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem,
mk_pdb as mk_pdb,
)
from ._stackscope import (
enable_stack_on_sig as enable_stack_on_sig,
)
from .pformat import (
add_div as add_div,
pformat_caller_frame as pformat_caller_frame,
pformat_boxed_tb as pformat_boxed_tb,
)

View File

@ -23,6 +23,8 @@ from __future__ import annotations
import inspect
# import msgspec
# from pprint import pformat
import textwrap
import traceback
from types import (
FrameType,
FunctionType,
@ -175,3 +177,103 @@ def find_caller_info(
)
return None
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
'''
Create a "boxed" looking traceback string.
Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
Any other parent/container "fields" can be passed in the
`fields_str` input along with other prefix/indent settings.
'''
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
# orig
# f' |\n'
# f' ------ - ------\n\n'
# f'{tb_str}\n'
# f' ------ - ------\n'
# f' _|\n'
f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str

File diff suppressed because it is too large Load Diff

View File

@ -65,7 +65,7 @@ def dump_task_tree() -> None:
level='cancel',
)
actor: Actor = _state.current_actor()
log.pdb(
log.devx(
f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
@ -104,7 +104,7 @@ def signal_handler(
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.pdb(
log.devx(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'

View File

@ -0,0 +1,168 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Pretty formatters for use throughout the code base.
Mostly handy for logging and exception message content.
'''
import textwrap
import traceback
from trio import CancelScope
def add_div(
message: str,
div_str: str = '------ - ------',
) -> str:
'''
Add a "divider string" to the input `message` with
a little math to center it underneath.
'''
div_offset: int = (
round(len(message)/2)+1
-
round(len(div_str)/2)+1
)
div_str: str = (
'\n' + ' '*div_offset + f'{div_str}\n'
)
return div_str
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
'''
Create a "boxed" looking traceback string.
Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
Any other parent/container "fields" can be passed in the
`fields_str` input along with other prefix/indent settings.
'''
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
f'|\n'
f' ------ - ------\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str
def pformat_cs(
cs: CancelScope,
var_name: str = 'cs',
field_prefix: str = ' |_',
) -> str:
'''
Pretty format info about a `trio.CancelScope` including most
of its public state and `._cancel_status`.
The output can be modified to show a "var name" for the
instance as a field prefix, just a simple str before each
line more or less.
'''
fields: str = textwrap.indent(
(
f'cancel_called = {cs.cancel_called}\n'
f'cancelled_caught = {cs.cancelled_caught}\n'
f'_cancel_status = {cs._cancel_status}\n'
f'shield = {cs.shield}\n'
),
prefix=field_prefix,
)
return (
f'{var_name}: {cs}\n'
+
fields
)

View File

@ -21,6 +21,11 @@ Log like a forester!
from collections.abc import Mapping
import sys
import logging
from logging import (
LoggerAdapter,
Logger,
StreamHandler,
)
import colorlog # type: ignore
import trio
@ -53,6 +58,7 @@ LEVELS: dict[str, int] = {
'RUNTIME': 15,
'CANCEL': 16,
'PDB': 500,
'DEVX': 600,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
@ -62,6 +68,7 @@ STD_PALETTE = {
'CRITICAL': 'red',
'ERROR': 'red',
'PDB': 'white',
'DEVX': 'cyan',
'WARNING': 'yellow',
'INFO': 'green',
'CANCEL': 'yellow',
@ -78,7 +85,7 @@ BOLD_PALETTE = {
# TODO: this isn't showing the correct '{filename}'
# as it did before..
class StackLevelAdapter(logging.LoggerAdapter):
class StackLevelAdapter(LoggerAdapter):
def transport(
self,
@ -86,7 +93,8 @@ class StackLevelAdapter(logging.LoggerAdapter):
) -> None:
'''
IPC level msg-ing.
IPC transport level msg IO; generally anything below
`._ipc.Channel` and friends.
'''
return self.log(5, msg)
@ -102,7 +110,7 @@ class StackLevelAdapter(logging.LoggerAdapter):
msg: str,
) -> None:
'''
Cancellation logging, mostly for runtime reporting.
Cancellation sequencing, mostly for runtime reporting.
'''
return self.log(
@ -116,11 +124,21 @@ class StackLevelAdapter(logging.LoggerAdapter):
msg: str,
) -> None:
'''
Debugger logging.
`pdb`-REPL (debugger) related statuses.
'''
return self.log(500, msg)
def devx(
self,
msg: str,
) -> None:
'''
"Developer experience" sub-sys statuses.
'''
return self.log(600, msg)
def log(
self,
level,
@ -224,6 +242,7 @@ def get_logger(
'''Return the package log or a sub-logger for ``name`` if provided.
'''
log: Logger
log = rlog = logging.getLogger(_root_name)
if (
@ -278,7 +297,7 @@ def get_logger(
def get_console_log(
level: str | None = None,
**kwargs,
) -> logging.LoggerAdapter:
) -> LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it.
@ -303,7 +322,7 @@ def get_console_log(
None,
)
):
handler = logging.StreamHandler()
handler = StreamHandler()
formatter = colorlog.ColoredFormatter(
LOG_FORMAT,
datefmt=DATE_FORMAT,
@ -323,3 +342,19 @@ def get_loglevel() -> str:
# global module logger for tractor itself
log = get_logger('tractor')
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False

View File

@ -25,12 +25,12 @@ from contextlib import (
# asynccontextmanager as acm,
contextmanager as cm,
)
from pprint import pformat
from contextvars import ContextVar
from typing import (
Any,
Type,
TYPE_CHECKING,
# Union,
Union,
)
# ------ - ------
from msgspec import (
@ -63,7 +63,7 @@ from .types import (
Started,
Stop,
Yield,
# pretty_struct,
pretty_struct,
)
@ -75,6 +75,9 @@ if TYPE_CHECKING:
log = get_logger(__name__)
_def_any_pldec: MsgDec = mk_dec()
class PldRx(Struct):
'''
A "msg payload receiver".
@ -101,10 +104,13 @@ class PldRx(Struct):
'''
# TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel
_msgdec: MsgDec = mk_dec(spec=Any)
_pldec: MsgDec
_ipc: Context|MsgStream|None = None
@property
def pld_dec(self) -> MsgDec:
return self._pldec
@cm
def apply_to_ipc(
self,
@ -122,9 +128,29 @@ class PldRx(Struct):
finally:
self._ipc = None
@cm
def limit_plds(
self,
spec: Union[Type[Struct]],
) -> MsgDec:
'''
Type-limit the loadable msg payloads via an applied
`MsgDec` given an input spec, revert to prior decoder on
exit.
'''
orig_dec: MsgDec = self._pldec
limit_dec: MsgDec = mk_dec(spec=spec)
try:
self._pldec = limit_dec
yield limit_dec
finally:
self._pldec = orig_dec
@property
def dec(self) -> msgpack.Decoder:
return self._msgdec.dec
return self._pldec.dec
def recv_pld_nowait(
self,
@ -135,9 +161,10 @@ class PldRx(Struct):
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
**kwargs,
**dec_msg_kwargs,
) -> Any|Raw:
__tracebackhide__: bool = True
msg: MsgType = (
ipc_msg
@ -150,6 +177,7 @@ class PldRx(Struct):
msg,
ctx=ctx,
expect_msg=expect_msg,
**dec_msg_kwargs,
)
async def recv_pld(
@ -157,14 +185,16 @@ class PldRx(Struct):
ctx: Context,
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
**kwargs
**dec_msg_kwargs,
) -> Any|Raw:
'''
Receive a `MsgType`, then decode and return its `.pld` field.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = (
ipc_msg
or
@ -173,16 +203,20 @@ class PldRx(Struct):
await ctx._rx_chan.receive()
)
return self.dec_msg(
msg,
msg=msg,
ctx=ctx,
expect_msg=expect_msg,
**dec_msg_kwargs,
)
def dec_msg(
self,
msg: MsgType,
ctx: Context,
expect_msg: Type[MsgType]|None = None,
expect_msg: Type[MsgType]|None,
raise_error: bool = True,
hide_tb: bool = True,
) -> PayloadT|Raw:
'''
@ -190,6 +224,7 @@ class PldRx(Struct):
return the value or raise an appropriate error.
'''
__tracebackhide__: bool = hide_tb
match msg:
# payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code.
@ -199,11 +234,12 @@ class PldRx(Struct):
|Return(pld=pld) # termination phase
):
try:
pld: PayloadT = self._msgdec.decode(pld)
pld: PayloadT = self._pldec.decode(pld)
log.runtime(
'Decode msg payload\n\n'
'Decoded msg payload\n\n'
f'{msg}\n\n'
f'{pld}\n'
f'where payload is\n'
f'|_pld={pld!r}\n'
)
return pld
@ -211,8 +247,9 @@ class PldRx(Struct):
except ValidationError as src_err:
msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg,
codec=self._dec,
codec=self.pld_dec,
src_validation_error=src_err,
is_invalid_payload=True,
)
msg: Error = pack_from_raise(
local_err=msgterr,
@ -237,8 +274,62 @@ class PldRx(Struct):
case Error():
src_err = MessagingError(
'IPC dialog termination by msg'
'IPC ctx dialog terminated without `Return`-ing a result\n'
f'Instead it raised {msg.boxed_type_str!r}!'
)
# XXX NOTE XXX another super subtle runtime-y thing..
#
# - when user code (transitively) calls into this
# func (usually via a `Context/MsgStream` API) we
# generally want errors to propagate immediately
# and directly so that the user can define how it
# wants to handle them.
#
# HOWEVER,
#
# - for certain runtime calling cases, we don't want to
# directly raise since the calling code might have
# special logic around whether to raise the error
# or supress it silently (eg. a `ContextCancelled`
# received from the far end which was requested by
# this side, aka a self-cancel).
#
# SO, we offer a flag to control this.
if not raise_error:
return src_err
case Stop(cid=cid):
message: str = (
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
f'{ctx.peer_side!r} peer ?\n'
f'|_cid: {cid}\n\n'
f'{pretty_struct.pformat(msg)}\n'
)
if ctx._stream is None:
explain: str = (
f'BUT, no `MsgStream` (was) open(ed) on this '
f'{ctx.side!r}-side of the IPC ctx?\n'
f'Maybe check your code for streaming phase race conditions?\n'
)
log.warning(
message
+
explain
)
# let caller decide what to do when only one
# side opened a stream, don't raise.
return msg
else:
explain: str = (
'Received a `Stop` when it should NEVER be possible!?!?\n'
)
# TODO: this is constructed inside
# `_raise_from_unexpected_msg()` but maybe we
# should pass it in?
# src_err = trio.EndOfChannel(explain)
src_err = None
case _:
src_err = InternalError(
@ -246,6 +337,9 @@ class PldRx(Struct):
f'{msg}\n'
)
# TODO: maybe use the new `.add_note()` from 3.11?
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
#
# fallthrough and raise from `src_err`
_raise_from_unexpected_msg(
ctx=ctx,
@ -253,12 +347,18 @@ class PldRx(Struct):
src_err=src_err,
log=log,
expect_msg=expect_msg,
hide_tb=False,
hide_tb=hide_tb,
)
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]:
'''
@ -268,16 +368,102 @@ class PldRx(Struct):
'''
msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs:
match msg:
case Stop():
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.dec_msg(
msg,
ctx=ipc,
expect_msg=expect_msg,
**kwargs,
)
return msg, pld
# Always maintain a task-context-global `PldRx`
_def_pld_rx: PldRx = PldRx(
_pldec=_def_any_pldec,
)
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
'pld_rx',
default=_def_pld_rx,
)
def current_pldrx() -> PldRx:
'''
Return the current `trio.Task.context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
up from `tractor`'s transport layer but BEFORE the data is
yielded to application code, normally via an IPC primitive API
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by
the interchange backend.
- for `msgspec` see <PUTLINKHERE>.
NOTE that the `PldRx` itself is a per-`Context` global sub-system
that normally does not change other then the applied pld-spec
for the current `trio.Task`.
'''
# ctx: context = current_ipc_ctx()
# return ctx._pld_rx
return _ctxvar_PldRx.get()
@cm
def limit_plds(
spec: Union[Type[Struct]],
**kwargs,
) -> MsgDec:
'''
Apply a `MsgCodec` that will natively decode the SC-msg set's
`Msg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`.
'''
__tracebackhide__: bool = True
try:
# sanity on orig settings
orig_pldrx: PldRx = current_pldrx()
orig_pldec: MsgDec = orig_pldrx.pld_dec
with orig_pldrx.limit_plds(
spec=spec,
**kwargs,
) as pldec:
log.info(
'Applying payload-decoder\n\n'
f'{pldec}\n'
)
yield pldec
finally:
log.info(
'Reverted to previous payload-decoder\n\n'
f'{orig_pldec}\n'
)
assert (
(pldrx := current_pldrx()) is orig_pldrx
and
pldrx.pld_dec is orig_pldec
)
async def drain_to_final_msg(
ctx: Context,
@ -308,67 +494,33 @@ async def drain_to_final_msg(
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
pre_result_drained: list[MsgType] = []
return_msg: Return|None = None
result_msg: Return|Error|None = None
while not (
ctx.maybe_error
and not ctx._final_result_is_set()
):
try:
# TODO: can remove?
# await trio.lowlevel.checkpoint()
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# TODO: bad idea?
# -[ ] wrap final outcome channel wait in a scope so
# receive all msgs, scanning for either a final result
# or error; the underlying call should never raise any
# remote error directly!
msg, pld = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx,
expect_msg=Return,
raise_error=False,
)
# ^-TODO-^ some bad ideas?
# -[ ] wrap final outcome .receive() in a scope so
# it can be cancelled out of band if needed?
#
# with trio.CancelScope() as res_cs:
# |_with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
# TODO: ensure there's no more hangs, debugging the
# runtime pretty preaase!
# from .devx._debug import pause
# await pause()
# TODO: can remove this finally?
# we have no more need for the sync draining right
# since we're can kinda guarantee the async
# `.receive()` below will never block yah?
#
# if (
# ctx._cancel_called and (
# ctx.cancel_acked
# # or ctx.chan._cancel_called
# )
# # or not ctx._final_result_is_set()
# # ctx.outcome is not
# # or ctx.chan._closed
# ):
# try:
# msg: dict = await ctx._rx_chan.receive_nowait()()
# except trio.WouldBlock:
# log.warning(
# 'When draining already `.cancel_called` ctx!\n'
# 'No final msg arrived..\n'
# )
# break
# else:
# msg: dict = await ctx._rx_chan.receive()
# TODO: don't need it right jefe?
# with trio.move_on_after(1) as cs:
# if cs.cancelled_caught:
# from .devx._debug import pause
# -[ ] make sure pause points work here for REPLing
# the runtime itself; i.e. ensure there's no hangs!
# |_from tractor.devx._debug import pause
# await pause()
# pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._rx_chan.receive()
msg, pld = await ctx._pld_rx.recv_msg_w_pld(ipc=ctx)
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
@ -376,7 +528,7 @@ async def drain_to_final_msg(
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
except trio.Cancelled as taskc:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
@ -386,7 +538,7 @@ async def drain_to_final_msg(
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
raise taskc
match msg:
@ -399,14 +551,14 @@ async def drain_to_final_msg(
ctx._result: Any = pld
log.runtime(
'Context delivered final draining msg:\n'
f'{pformat(msg)}'
f'{pretty_struct.pformat(msg)}'
)
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._rx_chan:
# await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right?
return_msg = msg
result_msg = msg
break
# far end task is still streaming to us so discard
@ -435,12 +587,9 @@ async def drain_to_final_msg(
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n'
)
return (
return_msg,
pre_result_drained,
f'{pretty_struct.pformat(msg)}\n'
)
break
# drain up to the `msg_limit` hoping to get
# a final result or error/ctxc.
@ -452,7 +601,7 @@ async def drain_to_final_msg(
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n'
f'{pretty_struct.pformat(msg)}\n'
)
continue
@ -467,7 +616,7 @@ async def drain_to_final_msg(
pre_result_drained.append(msg)
log.cancel(
'Remote stream terminated due to "stop" msg:\n\n'
f'{pformat(msg)}\n'
f'{pretty_struct.pformat(msg)}\n'
)
continue
@ -476,7 +625,7 @@ async def drain_to_final_msg(
case Error():
# TODO: can we replace this with `ctx.maybe_raise()`?
# -[ ] would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# |_async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
#
@ -512,7 +661,7 @@ async def drain_to_final_msg(
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
result_msg = msg
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
@ -558,6 +707,6 @@ async def drain_to_final_msg(
)
return (
return_msg,
result_msg,
pre_result_drained,
)

View File

@ -56,6 +56,7 @@ log = get_logger('tractor.msgspec')
PayloadT = TypeVar('PayloadT')
# TODO: PayloadMsg
class Msg(
Struct,
Generic[PayloadT],
@ -81,7 +82,7 @@ class Msg(
tree.
'''
cid: str|None # call/context-id
cid: str # call/context-id
# ^-TODO-^: more explicit type?
# -[ ] use UNSET here?
# https://jcristharif.com/msgspec/supported-types.html#unset
@ -106,7 +107,7 @@ class Msg(
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below.
pld: PayloadT|Raw
pld: Raw
class Aid(
@ -143,6 +144,8 @@ class SpawnSpec(
`Aid` msg.
'''
# TODO: similar to the `Start` kwargs spec needed below, we need
# a hard `Struct` def for all of these fields!
_parent_main_data: dict
_runtime_vars: dict[str, Any]