Compare commits

..

23 Commits

Author SHA1 Message Date
Tyler Goodlet 343b7c9712 Even moar bitty `Context` refinements
- set `._state._ctxvar_Context` just after `StartAck` inside
  `open_context_from_portal()` so that `current_ipc_ctx()` always
  works on the 'parent' side.
- always set `.canceller` to any `MsgTypeError.src_uid` and otherwise to
  any maybe-detected `.src_uid` (i.e. for RAEs).
- always set `.canceller` to us when we rx a ctxc which reports us as
  its canceller; this is a sanity check on definite "self cancellation".
- adjust `._is_self_cancelled()` logic to only be `True` when
  `._remote_error` is both a ctxc with a `.canceller` set to us AND
  when `Context.canceller` is also set to us (since the change above)
  as a little bit of extra rigor.
- fill-in/fix some `.repr_state` edge cases:
  - merge self-vs.-peer ctxc cases to one block and distinguish via
    nested `._is_self_cancelled()` check.
  - set 'errored' for all exception matched cases despite `.canceller`.
  - add pre-`Return` phase statuses:
   |_'pre-started' and 'syncing-to-child' depending on side and when
     `._stream` has not (yet) been set.
   |_'streaming' and 'streaming-finished' depending on side when
     `._stream` is set and whether it was stopped/closed.
- tweak drainage log-message to use "outcome" instead of "result".
- use new `.devx.pformat.pformat_cs()` inside `_maybe_cancel_and_set_remote_error()`
  but, IFF the log level is at least 'cancel'.
2024-05-08 14:02:56 -04:00
Tyler Goodlet 45f37870af Add a `.log.at_least_level()` predicate 2024-05-08 13:33:59 -04:00
Tyler Goodlet 4d528b76a0 Move `_debug.pformat_cs()` into `devx.pformat` 2024-05-08 13:30:15 -04:00
Tyler Goodlet 05b143d9ef Big debugger rework, more tolerance for internal err-hangs
Since i was running into them (internal errors) during lock request
machinery dev and was getting all sorts of difficult to understand hangs
whenever i intro-ed a bug to either side of the ipc ctx; this all while
trying to get the msg-spec working for `Lock` requesting subactors..

Deats:
- hideframes for `@acm`s and `trio.Event.wait()`, `Lock.release()`.
- better detail out the `Lock.acquire/release()` impls
- drop `Lock.remote_task_in_debug`, use new `.ctx_in_debug`.
- add a `Lock.release(force: bool)`.
- move most of what was `_acquire_debug_lock_from_root_task()` and some
  of the `lock_tty_for_child().__a[enter/exit]()` logic into
  `Lock.[acquire/release]()`  including bunch more logging.
- move `lock_tty_for_child()` up in the module to below `Lock`, with
  some rework:
  - drop `subactor_uid: tuple` arg since we can just use the `ctx`..
  - add exception handler blocks for reporting internal (impl) errors
    and always force release the lock in such cases.
- extend `DebugStatus` (prolly will rename to `DebugRequest` btw):
  - add `.req_ctx: Context` for subactor side.
  - add `.req_finished: trio.Event` to sub to signal request task exit.
  - extend `.shield_sigint()` doc-str.
  - add `.release()` to encaps all the state mgmt previously strewn
    about inside `._pause()`..
- use new `DebugStatus.release()` to replace all the duplication:
  - inside `PdbREPL.set_[continue/quit]()`.
  - inside `._pause()` for the subactor branch on internal
    repl-invocation error cases,
  - in the `_enter_repl_sync()` closure on error,
- replace `apply_debug_codec()` -> `apply_debug_pldec()` in tandem with
  the new `PldRx` sub-sys  which handles the new `__pld_spec__`.
- add a new `pformat_cs()` helper orig to help debug cs stack
  a corruption; going to move to `.devx.pformat` obvi.
- rename `wait_for_parent_stdin_hijack()` -> `request_root_stdio_lock()`
  with improvements:
  - better doc-str and add todos,
  - use `DebugStatus` more stringently to encaps all subactor req state.
  - error handling blocks for cancellation and straight up impl errors
    directly around the `.open_context()` block with the latter doing
    a `ctx.cancel()` to avoid hanging in the shielded `.req_cs` scope.
  - similar exc blocks for the func's overall body with explicit
    `log.exception()` reporting.
  - only set the new `DebugStatus.req_finished: trio.Event` in `finally`.
- rename `mk_mpdb()` -> `mk_pdb()` and don't cal `.shield_sigint()`
  implicitly since the caller usage does matter for this.
- factor out `any_connected_locker_child()` from the SIGINT handler.
- rework SIGINT handler to better handle any stale-lock/hang cases:
  - use new `Lock.ctx_in_debug: Context` to detect subactor-in-debug.
    and use it to cancel any lock request instead of the lower level
  - use `problem: str` summary approach to log emissions.
- rework `_pause()` given all of the above, stuff not yet mentioned:
  - don't take `shield: bool` input and proxy to `debug_func()` (for now).
  - drop `extra_frames_up_when_async: int` usage, expect
    `**debug_func_kwargs` to passthrough an `api_frame: Frametype` (more
    on this later).
  - lotsa asserts around the request ctx vs. task-in-debug ctx using new
    `current_ipc_ctx()`.
  - asserts around `DebugStatus` state.
- rework and simplify the `debug_func` hooks,
  `_set_trace()`/`_post_mortem()`:
  - make them accept a non-optional `repl: PdbRepl` and `api_frame:
    FrameType` which should be used to set the current frame when the
    REPL engages.
  - always hide the hook frames.
  - always accept a `tb: TracebackType` to `_post_mortem()`.
   |_ copy and re-impl what was the delegation to
     `pdbp.xpm()`/`pdbp.post_mortem()` and instead call the
     underlying `Pdb.interaction()` ourselves with a `caller_frame`
     and tb instance.
- adjust the public `.pause()` impl:
  - accept optional `hide_tb` and `api_frame` inputs.
  - mask opening a cancel-scope for now (can cause `trio` stack
    corruption, see notes) and thus don't use the `shield` input other
    then to eventually passthrough to `_post_mortem()`?
   |_ thus drop `task_status` support for now as well.
   |_ pretty sure correct soln is a debug-nursery around `._invoke()`.
- since no longer using `extra_frames_up_when_async` inside
  `debug_func()`s ensure all public apis pass a `api_frame`.
- re-impl our `tractor.post_mortem()` to directly call into `._pause()`
  instead of binding in via `partial` and mk it take similar input as
  `.pause()`.
- drop `Lock.release()` from `_maybe_enter_pm()`, expose and pass
  expected frame and tb.
- use necessary changes from all the above within
  `maybe_wait_for_debugger()` and `acquire_debug_lock()`.

Lel, sorry thought that would be shorter..
There's still a lot more re-org to do particularly with `DebugStatus`
encapsulation but it's coming in follow up.
2024-05-08 11:44:55 -04:00
Tyler Goodlet a354732a9e Allow `Stop` passthrough from `PldRx.recv_msg_w_pld()`
Since we need to allow it (at the least) inside
`drain_until_final_msg()` for handling stream-phase termination races
where we don't want to have to handle a raised error from something like
`Context.result()`. Expose the passthrough option via
a `passthrough_non_pld_msgs: bool` kwarg.

Add comprehensive comment to `current_pldrx()`.
2024-05-08 08:50:16 -04:00
Tyler Goodlet fbc21a1dec Add a "current IPC `Context`" `ContextVar`
Expose it from `._state.current_ipc_ctx()` and set it inside
`._rpc._invoke()` for child and inside `Portal.open_context()` for
parent.

Still need to write a few more tests (particularly demonstrating usage
throughout multiple nested nurseries on each side) but this suffices as
a proto for testing with some debugger request-from-subactor stuff.

Other,
- use new `.devx.pformat.add_div()` for ctxc messages.
- add a block to always traceback dump on corrupted cs stacks.
- better handle non-RAEs exception output-formatting in context
  termination summary log message.
- use a summary for `start_status` for msg logging in RPC loop.
2024-05-07 15:35:45 -04:00
Tyler Goodlet b278164f83 Mk `drain_to_final_msg()` never raise from `Error`
Since we usually want them raised from some (internal) call to
`Context.maybe_raise()` and NOT directly from the drainage call, make it
possible via a new `raise_error: bool` to both `PldRx.recv_msg_w_pld()`
and `.dec_msg()`.

In support,
- rename `return_msg` -> `result_msg` since we expect to return
  `Error`s.
- do a `result_msg` assign and `break` in the `case Error()`.
- add `**dec_msg_kwargs` passthrough for other `.dec_msg()` calling
  methods.

Other,
- drop/aggregate todo-notes around the main loop's
  `ctx._pld_rx.recv_msg_w_pld()` call.
- add (configurable) frame hiding to most payload receive meths.
2024-05-06 13:43:51 -04:00
Tyler Goodlet 8ffa6a5e68 "Icons" in `._entry`'s subactor `.info()` messages
Add a little `>` or `X` supervision icon indicating the spawning or
termination of each sub-actor respectively.
2024-05-06 13:12:44 -04:00
Tyler Goodlet 7707e0e75a Woops, make `log.devx()` level 600 2024-05-06 13:07:53 -04:00
Tyler Goodlet 523c24eb72 Move pformatters into new `.devx.pformat`
Since `._code` is prolly gonna get renamed (to something "frame & stack
tools" related) and to give a bit better organization.

Also adds a new `add_div()` helper, factored out of ctxc message
creation in `._rpc._invoke()`, for adding a little "header line" divider
under a given `message: str` with a little math to center it.
2024-05-06 13:04:58 -04:00
Tyler Goodlet 544ff5ab4c Change to `RemoteActorError.pformat()`
For more sane manual calls as needed in logging purposes. Obvi remap
the dunder methods to it.

Other:
- drop `hide_tb: bool` from `unpack_error()`, shouldn't need it since
  frame won't ever be part of any tb raised from returned error.
- add a `is_invalid_payload: bool` to `_raise_from_unexpected_msg()` to
  be used from `PldRx` where we don't need to decode the IPC
  msg, just the payload; make the error message reflect this case.
- drop commented `._portal._unwrap_msg()` since we've replaced it with
  `PldRx`'s delegation to newer `._raise_from_unexpected_msg()`.
- hide the `Portal.result()` frame by default, again.
2024-05-06 13:01:56 -04:00
Tyler Goodlet 63c23d6b82 Add todo for rigorous struct-type spec of `SpawnSpec` fields 2024-04-30 13:01:07 -04:00
Tyler Goodlet cca3206fd6 Use `log.devx()` for `stackscope` messages 2024-04-30 13:00:03 -04:00
Tyler Goodlet 54530dcf94 Type annot the proc from `trio.lowlevel.open_process()` 2024-04-30 12:59:38 -04:00
Tyler Goodlet 338395346d Tweak `breakpoint()` usage error message 2024-04-30 12:56:29 -04:00
Tyler Goodlet 30c5896d26 Fix attr name error, use public `MsgDec.dec` 2024-04-30 12:55:46 -04:00
Tyler Goodlet 88a0e90f82 Reorg frames pformatters, add `Context.repr_state`
A better spot for the pretty-formatting of frame text (and thus tracebacks)
is in the new `.devx._code` module:
- move from `._exceptions` -> `.devx._code.pformat_boxed_tb()`.
- add new `pformat_caller_frame()` factored out the use case in
  `._exceptions._mk_msg_type_err()` where we dump a stack trace
  for bad `.send()` side IPC msgs.

Add some new pretty-format methods to `Context`:
- explicitly implement `.pformat()` and allow an `extra_fields: dict`
  which can be used to inject additional fields (maybe eventually by
  default) such as is now used inside
  `._maybe_cancel_and_set_remote_error()` when reporting the internal
  `._scope` state in cancel logging.
- add a new `.repr_state -> str` which provides a single string status
  depending on the internal state of the IPC ctx in terms of the shuttle
  protocol's "phase"; use it from `.pformat()` for the `|_state:`.
- set `.started(complain_no_parity=False)` now since we presume decoding
  with `.pld: Raw` now with the new `PldRx` design.
- use new `msgops.current_pldrx()` in `mk_context()`.
2024-04-30 12:53:55 -04:00
Tyler Goodlet 40c972f0ec Mk `process_messages()` return last msg; summary logging
Not sure it's **that** useful (yet) but in theory would allow avoiding
certain log level usage around transient RPC requests for discovery methods
(like `.register_actor()` and friends); can't hurt to be able to
introspect that last message for other future cases I'd imagine as well.
Adjust the calling code in `._runtime` to match; other spots are using
the `trio.Nursery.start()` schedule style and are fine as is.

Improve a bunch more log messages throughout a few mods mostly by going
to a "summary" single-emission style where possible/appropriate:
- in `._runtime` more "single summary" status style log emissions:
 |_mk `Actor.load_modules()` render a single mod loaded summary.
 |_use a summary `con_status: str` for `Actor._stream_handler()` conn
   setup and an equiv (`con_teardown_status`) for connection teardowns.
 |_similar thing in `Actor.wait_for_actor()`.
- generally more usage of `.msg.pretty_struct` apis throughout `._runtime`.
2024-04-30 12:15:46 -04:00
Tyler Goodlet f139adddca Add a `log.devx()` level 2024-04-30 11:47:26 -04:00
Tyler Goodlet 979af79588 First draft, package with `poetry` Bo 2024-04-30 11:46:56 -04:00
Tyler Goodlet a3429268ea First draft payload-spec limit API
Add new task-scope oriented `PldRx.pld_spec` management API similar to
`.msg._codec.limit_msg_spec()`, but obvi built to process and filter
`MsgType.pld` values.

New API related changes include:
- new per-task singleton getter `msg._ops.current_pldrx()` which
  delivers the current (global) payload receiver via a new
  `_ctxvar_PldRx: ContextVar` configured with a default
  `_def_any_pldec: MsgDec[Any]` decoder.
- a `PldRx.limit_plds()` which sets the decoder (`.type` underneath)
  for the specific payload rx instance.
- `.msg._ops.limit_plds()` which obtains the current task-scoped `PldRx`
  and applies the pld spec via a new `PldRx.limit_plds()`.
- rename `PldRx._msgdec` -> `._pldec`.
- add `.pld_dec` as pub attr for -^

Unrelated adjustments:
- use `.msg.pretty_struct.pformat()` where handy.
- always pass `expect_msg: MsgType`.
- add a `case Stop()` to `PldRx.dec_msg()` which will `log.warning()`
  when a stop is received by no stream was open on this receiving side
  since we rarely want that to raise since it's prolly just a runtime
  race or mistake in user code.

Other:
2024-04-26 15:29:50 -04:00
Tyler Goodlet d285a3479a Make `.msg.types.Msg.pld: Raw` only, since `PldRx`.. 2024-04-26 13:18:06 -04:00
Tyler Goodlet 61db040702 More bitty (runtime) logging tweaks 2024-04-26 13:13:04 -04:00
20 changed files with 2198 additions and 1294 deletions

View File

@ -1,3 +1,68 @@
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
# ------ - ------
[tool.poetry]
name = "tractor"
version = "0.1.0a6dev0"
description='structured concurrent `trio`-"actors"'
authors = ["Tyler Goodlet <goodboy_foss@protonmail.com>"]
license = "AGPlv3"
readme = "docs/README.rst"
# TODO: do we need this xontrib loader at all given pep420
# and xonsh's xontrib global-autoload-via-setuptools?
# https://xon.sh/tutorial_xontrib.html#authoring-xontribs
packages = [
{include = 'tractor' },
# {include = 'tractor.experimental' },
# {include = 'tractor.trionics' },
# {include = 'tractor.msg' },
# {include = 'tractor.devx' },
]
# ------ - ------
[tool.poetry.dependencies]
python = "^3.11"
# trio runtime related
# proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
trio='^0.24'
tricycle = "^0.4.1"
trio-typing = "^0.10.0"
msgspec='^0.18.5' # interchange
wrapt = "^1.16.0" # decorators
colorlog = "^6.8.2" # logging
# .devx tooling
stackscope = "^0.2.2"
pdbp = "^1.5.0"
# TODO: distributed transport using
# linux kernel networking
# 'pyroute2
# ------ - ------
xontrib-vox = "^0.0.1"
[tool.poetry.group.dev]
optional = false
[tool.poetry.group.dev.dependencies]
pytest = "^8.2.0"
# only for xonsh as sh..
xontrib-vox = "^0.0.1"
prompt-toolkit = "^3.0.43"
xonsh-vox-tabcomplete = "^0.5"
# ------ - ------
[tool.towncrier] [tool.towncrier]
package = "tractor" package = "tractor"
filename = "NEWS.rst" filename = "NEWS.rst"
@ -27,6 +92,7 @@ all_bullets = true
name = "Trivial/Internal Changes" name = "Trivial/Internal Changes"
showcontent = true showcontent = true
# ------ - ------
[tool.pytest.ini_options] [tool.pytest.ini_options]
minversion = '6.0' minversion = '6.0'
@ -46,3 +112,26 @@ log_cli = false
# TODO: maybe some of these layout choices? # TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules # https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
# pythonpath = "src" # pythonpath = "src"
# ------ - ------
[project]
keywords = [
'trio',
'async',
'concurrency',
'structured concurrency',
'actor model',
'distributed',
'multiprocessing'
]
classifiers = [
"Development Status :: 3 - Alpha",
"Operating System :: POSIX :: Linux",
"Framework :: Trio",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Topic :: System :: Distributed Computing",
]

View File

@ -25,6 +25,7 @@ from tractor._exceptions import (
StreamOverrun, StreamOverrun,
ContextCancelled, ContextCancelled,
) )
from tractor._state import current_ipc_ctx
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
@ -144,6 +145,8 @@ async def simple_setup_teardown(
global _state global _state
_state = True _state = True
assert current_ipc_ctx() is ctx
# signal to parent that we're up # signal to parent that we're up
await ctx.started(data + 1) await ctx.started(data + 1)
@ -204,6 +207,7 @@ def test_simple_context(
block_forever=callee_blocks_forever, block_forever=callee_blocks_forever,
) as (ctx, sent), ) as (ctx, sent),
): ):
assert current_ipc_ctx() is ctx
assert sent == 11 assert sent == 11
if callee_blocks_forever: if callee_blocks_forever:

View File

@ -37,8 +37,9 @@ import inspect
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable,
AsyncGenerator, AsyncGenerator,
Callable,
Mapping,
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
Union, Union,
@ -59,9 +60,11 @@ from ._exceptions import (
pack_from_raise, pack_from_raise,
unpack_error, unpack_error,
) )
from .log import get_logger from .log import (
get_logger,
at_least_level,
)
from .msg import ( from .msg import (
_codec,
Error, Error,
MsgType, MsgType,
MsgCodec, MsgCodec,
@ -84,6 +87,7 @@ from ._streaming import MsgStream
from ._state import ( from ._state import (
current_actor, current_actor,
debug_mode, debug_mode,
_ctxvar_Context,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -103,7 +107,6 @@ class Unresolved:
a final return value or raised error is resolved. a final return value or raised error is resolved.
''' '''
...
# TODO: make this a .msg.types.Struct! # TODO: make this a .msg.types.Struct!
@ -116,19 +119,19 @@ class Context:
NB: This class should **never be instatiated directly**, it is allocated NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways: by the runtime in 2 ways:
- by entering ``Portal.open_context()`` which is the primary - by entering `Portal.open_context()` which is the primary
public API for any "caller" task or, public API for any "parent" task or,
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
to a remotely scheduled "callee" function. to a remotely scheduled "child" function.
AND is always constructed using the below ``mk_context()``. AND is always constructed using the below `mk_context()`.
Allows maintaining task or protocol specific state between Allows maintaining task or protocol specific state between
2 cancel-scope-linked, communicating and parallel executing 2 cancel-scope-linked, communicating and parallel executing
`trio.Task`s. Contexts are allocated on each side of any task `trio.Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is actor from a `Portal`. On the "callee" side a context is
always allocated inside ``._rpc._invoke()``. always allocated inside `._rpc._invoke()`.
TODO: more detailed writeup on cancellation, error and TODO: more detailed writeup on cancellation, error and
streaming semantics.. streaming semantics..
@ -206,7 +209,7 @@ class Context:
# cancelled that the other side is as well, so maybe we should # cancelled that the other side is as well, so maybe we should
# instead just have a `.canceller` pulled from the # instead just have a `.canceller` pulled from the
# `ContextCancelled`? # `ContextCancelled`?
_canceller: tuple[str, str] | None = None _canceller: tuple[str, str]|None = None
# NOTE: we try to ensure assignment of a "cancel msg" since # NOTE: we try to ensure assignment of a "cancel msg" since
# there's always going to be an "underlying reason" that any # there's always going to be an "underlying reason" that any
@ -262,7 +265,13 @@ class Context:
_strict_started: bool = False _strict_started: bool = False
_cancel_on_msgerr: bool = True _cancel_on_msgerr: bool = True
def __str__(self) -> str: def pformat(
self,
extra_fields: dict[str, Any]|None = None,
# ^-TODO-^ some built-in extra state fields
# we'll want in some devx specific cases?
) -> str:
ds: str = '=' ds: str = '='
# ds: str = ': ' # ds: str = ': '
@ -279,11 +288,7 @@ class Context:
outcome_str: str = self.repr_outcome( outcome_str: str = self.repr_outcome(
show_error_fields=True show_error_fields=True
) )
outcome_typ_str: str = self.repr_outcome( fmtstr: str = (
type_only=True
)
return (
f'<Context(\n' f'<Context(\n'
# f'\n' # f'\n'
# f' ---\n' # f' ---\n'
@ -304,12 +309,12 @@ class Context:
# f' -----\n' # f' -----\n'
# #
# TODO: better state `str`ids? # TODO: better state `str`ids?
# -[ ] maybe map err-types to strs like 'cancelled', # -[x] maybe map err-types to strs like 'cancelled',
# 'errored', 'streaming', 'started', .. etc. # 'errored', 'streaming', 'started', .. etc.
# -[ ] as well as a final result wrapper like # -[ ] as well as a final result wrapper like
# `outcome.Value`? # `outcome.Value`?
# #
f' |_state: {outcome_typ_str}\n' f' |_state: {self.repr_state!r}\n'
f' outcome{ds}{outcome_str}\n' f' outcome{ds}{outcome_str}\n'
f' result{ds}{self._result}\n' f' result{ds}{self._result}\n'
@ -324,6 +329,16 @@ class Context:
# -[ ] remove this ^ right? # -[ ] remove this ^ right?
# f' _remote_error={self._remote_error} # f' _remote_error={self._remote_error}
)
if extra_fields:
for key, val in extra_fields.items():
fmtstr += (
f' {key}{ds}{val!r}\n'
)
return (
fmtstr
+
')>\n' ')>\n'
) )
# NOTE: making this return a value that can be passed to # NOTE: making this return a value that can be passed to
@ -335,7 +350,8 @@ class Context:
# logging perspective over `eval()`-ability since we do NOT # logging perspective over `eval()`-ability since we do NOT
# target serializing non-struct instances! # target serializing non-struct instances!
# def __repr__(self) -> str: # def __repr__(self) -> str:
__repr__ = __str__ __str__ = pformat
__repr__ = pformat
@property @property
def cancel_called(self) -> bool: def cancel_called(self) -> bool:
@ -373,8 +389,12 @@ class Context:
re: BaseException|None = ( re: BaseException|None = (
remote_error remote_error
or self._remote_error or
self._remote_error
) )
# XXX we only report "this context" as self-cancelled
# once we've received a ctxc from our direct-peer task
# (aka we're `.cancel_acked`).
if not re: if not re:
return False return False
@ -385,10 +405,10 @@ class Context:
our_canceller = self.canceller our_canceller = self.canceller
return bool( return bool(
isinstance(re, ContextCancelled) isinstance((ctxc := re), ContextCancelled)
and from_uid == self.chan.uid and from_uid == self.chan.uid
and re.canceller == our_uid and ctxc.canceller == our_uid
and our_canceller == from_uid and our_canceller == our_uid
) )
@property @property
@ -608,52 +628,61 @@ class Context:
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
msgerr: bool = False
# self-cancel (ack) or, # self-cancel (ack) or,
# peer propagated remote cancellation. # peer propagated remote cancellation.
msgerr: bool = False
if isinstance(error, ContextCancelled): if isinstance(error, ContextCancelled):
# NOTE in the case error is a ctxc the canceller will
# either be another peer or us. in the case where it's us
# we mark ourself as the canceller of ourselves (a ctx
# "self cancel" from this side's perspective), if instead
# the far end was cancelled by some other (inter-) peer,
# we want to mark our canceller as the actor that was
# cancelled, NOT their reported canceller. IOW in the
# latter case we're cancelled by someone else getting
# cancelled.
if (canc := error.canceller) == self._actor.uid:
whom: str = 'us'
self._canceller = canc
else:
whom = 'a remote peer (not us)'
self._canceller = error.src_uid
whom: str = (
'us' if error.canceller == self._actor.uid
else 'peer'
)
log.cancel( log.cancel(
f'IPC context cancelled by {whom}!\n\n' f'IPC context was cancelled by {whom}!\n\n'
f'{error}' f'{error}'
) )
elif isinstance(error, MsgTypeError): elif isinstance(error, MsgTypeError):
msgerr = True msgerr = True
self._canceller = error.src_uid
log.error( log.error(
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n' f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
f'{error}\n' f'{error}\n'
f'{pformat(self)}\n' f'{pformat(self)}\n'
) )
else: else:
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
maybe_error_src_uid: tuple = getattr(
error,
'src_uid',
None,
)
# we mark the source actor as our canceller
self._canceller = maybe_error_src_uid
log.error( log.error(
f'Remote context error:\n\n' f'Remote context error:\n\n'
# f'{pformat(self)}\n' # f'{pformat(self)}\n'
f'{error}\n' f'{error}\n'
) )
# always record the cancelling actor's uid since its if self._canceller is None:
# cancellation state is linked and we want to know log.error('Ctx has no canceller set!?')
# which process was the cause / requester of the
# cancellation.
maybe_error_src: tuple = getattr(
error,
'src_uid',
None,
)
self._canceller = (
maybe_error_src
or
# XXX: in the case we get a non-boxed error?
# -> wait but this should never happen right?
self.chan.uid
)
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
@ -696,29 +725,35 @@ class Context:
else: else:
message: str = 'NOT cancelling `Context._scope` !\n\n' message: str = 'NOT cancelling `Context._scope` !\n\n'
scope_info: str = 'No `self._scope: CancelScope` was set/used ?' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
if cs: if (
scope_info: str = ( cs
f'self._scope: {cs}\n' and
f'|_ .cancel_called: {cs.cancel_called}\n' at_least_level(log=log, level='cancel')
f'|_ .cancelled_caught: {cs.cancelled_caught}\n' ):
f'|_ ._cancel_status: {cs._cancel_status}\n\n' fmt_str: str = self.pformat(
extra_fields={
f'{self}\n' '._is_self_cancelled()': self._is_self_cancelled(),
f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n' '._cancel_on_msgerr': self._cancel_on_msgerr,
f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n' }
)
f'msgerr: {msgerr}\n' from .devx.pformat import pformat_cs
cs_fmt: str = pformat_cs(
cs,
var_name='Context._scope',
)
fmt_str += (
'\n'
+
cs_fmt
) )
log.cancel( log.cancel(
message message
+ +
f'{scope_info}' fmt_str
) )
# TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs?
# TODO: add to `Channel`? # TODO: also add to `Channel`?
@property @property
def dst_maddr(self) -> str: def dst_maddr(self) -> str:
chan: Channel = self.chan chan: Channel = self.chan
@ -748,7 +783,7 @@ class Context:
) )
return ( return (
# f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:' # f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:'
f'{self._nsf}() -> {outcome_str}:' f'{self._nsf}() -> {outcome_str}'
) )
@property @property
@ -836,7 +871,7 @@ class Context:
if not self._portal: if not self._portal:
raise InternalError( raise InternalError(
'No portal found!?\n' 'No portal found!?\n'
'Why is this supposed caller context missing it?' 'Why is this supposed {self.side!r}-side ctx task missing it?!?'
) )
cid: str = self.cid cid: str = self.cid
@ -1091,7 +1126,8 @@ class Context:
f'ctx id: {self.cid}' f'ctx id: {self.cid}'
) )
# TODO: replace all the instances of this!! XD # TODO: replace all the `._maybe_raise_remote_err()` usage
# with instances of this!!
def maybe_raise( def maybe_raise(
self, self,
hide_tb: bool = True, hide_tb: bool = True,
@ -1102,6 +1138,7 @@ class Context:
if re := self._remote_error: if re := self._remote_error:
return self._maybe_raise_remote_err( return self._maybe_raise_remote_err(
re, re,
hide_tb=hide_tb,
**kwargs, **kwargs,
) )
@ -1203,7 +1240,6 @@ class Context:
# runtime frames from the tb explicitly? # runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607 # https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True
raise remote_error # from None raise remote_error # from None
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
@ -1254,8 +1290,15 @@ class Context:
# wait for a final context result/error by "draining" # wait for a final context result/error by "draining"
# (by more or less ignoring) any bi-dir-stream "yield" # (by more or less ignoring) any bi-dir-stream "yield"
# msgs still in transit from the far end. # msgs still in transit from the far end.
#
# XXX NOTE XXX: this call shouldn't really ever raise
# (other then internal error), instead delivering an
# `Error`-msg and that being `.maybe_raise()`-ed below
# since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set
# any `.maybe_error`.
( (
return_msg, outcome_msg,
drained_msgs, drained_msgs,
) = await msgops.drain_to_final_msg( ) = await msgops.drain_to_final_msg(
ctx=self, ctx=self,
@ -1273,13 +1316,18 @@ class Context:
f'{msg}\n' f'{msg}\n'
) )
log.cancel( drained_status: str = (
'Ctx drained pre-result msgs:\n' 'Ctx drained to final outcome msg\n\n'
f'{pformat(drained_msgs)}\n\n' f'{outcome_msg}\n'
f'Final return msg:\n'
f'{return_msg}\n'
) )
if drained_msgs:
drained_status += (
'\n'
f'The pre-drained msgs are\n'
f'{pformat(drained_msgs)}\n'
)
log.cancel(drained_status)
self.maybe_raise( self.maybe_raise(
# NOTE: obvi we don't care if we # NOTE: obvi we don't care if we
@ -1310,7 +1358,7 @@ class Context:
@property @property
def maybe_error(self) -> BaseException|None: def maybe_error(self) -> BaseException|None:
le: Exception|None = self._local_error le: BaseException|None = self._local_error
re: RemoteActorError|ContextCancelled|None = self._remote_error re: RemoteActorError|ContextCancelled|None = self._remote_error
match (le, re): match (le, re):
@ -1338,7 +1386,7 @@ class Context:
# ContextCancelled(canceller=), # ContextCancelled(canceller=),
# ): # ):
error: Exception|None = le or re error: BaseException|None = le or re
if error: if error:
return error return error
@ -1443,6 +1491,76 @@ class Context:
repr(self._result) repr(self._result)
) )
@property
def repr_state(self) -> str:
'''
A `str`-status describing the current state of this
inter-actor IPC context in terms of the current "phase" state
of the SC shuttling dialog protocol.
'''
merr: Exception|None = self.maybe_error
outcome: Unresolved|Exception|Any = self.outcome
status: str|None = None
match (
outcome,
merr,
):
# "graceful" ctx cancellation
case (
Unresolved,
ContextCancelled(),
):
if self._is_self_cancelled():
status = 'self-cancelled'
elif (
self.canceller
and not self._cancel_called
):
status = 'peer-cancelled'
# (remote) error condition
case (
Unresolved,
BaseException(), # any error-type
):
status = 'errored'
# result already returned
case (
_, # any non-unresolved value
None,
) if self._final_result_is_set():
status = 'returned'
# normal operation but still in a pre-`Return`-result
# dialog phase
case (
Unresolved, # noqa (ruff, you so weird..)
None, # no (remote) error set
):
if stream := self._stream:
if stream.closed:
status = 'streaming-finished'
else:
status = 'streaming'
elif self._started_called:
status = 'started'
else:
if self.side == 'child':
status = 'pre-started'
else:
status = 'syncing-to-child'
if status is None:
status = '??unknown??'
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
return status
async def started( async def started(
self, self,
@ -1451,7 +1569,11 @@ class Context:
value: PayloadT|None = None, value: PayloadT|None = None,
strict_parity: bool = False, strict_parity: bool = False,
complain_no_parity: bool = True,
# TODO: this will always emit now that we do `.pld: Raw`
# passthrough.. so maybe just only complain when above strict
# flag is set?
complain_no_parity: bool = False,
) -> None: ) -> None:
''' '''
@ -1511,18 +1633,19 @@ class Context:
) )
raise RuntimeError( raise RuntimeError(
'Failed to roundtrip `Started` msg?\n' 'Failed to roundtrip `Started` msg?\n'
f'{pformat(rt_started)}\n' f'{pretty_struct.pformat(rt_started)}\n'
) )
if rt_started != started_msg: if rt_started != started_msg:
# TODO: break these methods out from the struct subtype? # TODO: break these methods out from the struct subtype?
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__( diff = pretty_struct.Struct.__sub__(
rt_started, rt_started,
started_msg, started_msg,
) )
complaint: str = ( complaint: str = (
'Started value does not match after codec rountrip?\n\n' 'Started value does not match after roundtrip?\n\n'
f'{diff}' f'{diff}'
) )
@ -1538,8 +1661,6 @@ class Context:
else: else:
log.warning(complaint) log.warning(complaint)
# started_msg = rt_started
await self.chan.send(started_msg) await self.chan.send(started_msg)
# raise any msg type error NO MATTER WHAT! # raise any msg type error NO MATTER WHAT!
@ -1667,7 +1788,6 @@ class Context:
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n' f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
f'{flow_body}' f'{flow_body}'
f'{pformat(re)}\n' f'{pformat(re)}\n'
) )
self._cancel_msg: dict = msg self._cancel_msg: dict = msg
@ -1932,6 +2052,7 @@ async def open_context_from_portal(
) )
assert ctx._remote_func_type == 'context' assert ctx._remote_func_type == 'context'
assert ctx._caller_info assert ctx._caller_info
_ctxvar_Context.set(ctx)
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the # XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered # `Started`-msg any cancellation triggered
@ -2085,7 +2206,7 @@ async def open_context_from_portal(
# CASE 2: context was cancelled by local task calling # CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should # `.cancel()`, we don't raise and the exit block should
# exit silently. # finish silently.
if ( if (
ctx._cancel_called ctx._cancel_called
and and
@ -2210,6 +2331,11 @@ async def open_context_from_portal(
try: try:
result_or_err: Exception|Any = await ctx.result() result_or_err: Exception|Any = await ctx.result()
except BaseException as berr: except BaseException as berr:
# cancelled before (or maybe during?) final result capture
# if isinstance(trio.Cancelled, berr):
# from .devx import mk_pdb
# mk_pdb.set_trace()
# on normal teardown, if we get some error # on normal teardown, if we get some error
# raised in `Context.result()` we still want to # raised in `Context.result()` we still want to
# save that error on the ctx's state to # save that error on the ctx's state to
@ -2354,7 +2480,7 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit!
log.runtime( log.runtime(
'De-allocating IPC ctx opened with {ctx.side!r} peer \n' f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'uid: {uid}\n' f'uid: {uid}\n'
f'cid: {ctx.cid}\n' f'cid: {ctx.cid}\n'
) )
@ -2390,10 +2516,8 @@ def mk_context(
from .devx._code import find_caller_info from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info() caller_info: CallerInfo|None = find_caller_info()
pld_rx = msgops.PldRx( # TODO: when/how do we apply `.limit_plds()` from here?
# _rx_mc=recv_chan, pld_rx: msgops.PldRx = msgops.current_pldrx()
_msgdec=_codec.mk_dec(spec=pld_spec)
)
ctx = Context( ctx = Context(
chan=chan, chan=chan,
@ -2407,12 +2531,12 @@ def mk_context(
_caller_info=caller_info, _caller_info=caller_info,
**kwargs, **kwargs,
) )
# TODO: we can drop the old placeholder yah?
# ctx._result: int | Any = id(ctx)
ctx._result = Unresolved ctx._result = Unresolved
return ctx return ctx
# TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types
def context(func: Callable) -> Callable: def context(func: Callable) -> Callable:
''' '''
Mark an (async) function as an SC-supervised, inter-`Actor`, Mark an (async) function as an SC-supervised, inter-`Actor`,
@ -2426,8 +2550,8 @@ def context(func: Callable) -> Callable:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912 # https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore func._tractor_context_function = True # type: ignore
sig = inspect.signature(func) sig: inspect.Signature = inspect.signature(func)
params = sig.parameters params: Mapping = sig.parameters
if 'ctx' not in params: if 'ctx' not in params:
raise TypeError( raise TypeError(
"The first argument to the context function " "The first argument to the context function "

View File

@ -20,6 +20,7 @@ Sub-process entry points.
""" """
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
# import textwrap
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
@ -91,7 +92,7 @@ def _mp_main(
pass # handle it the same way trio does? pass # handle it the same way trio does?
finally: finally:
log.info(f"Actor {actor.uid} terminated") log.info(f"Subactor {actor.uid} terminated")
def _trio_main( def _trio_main(
@ -125,9 +126,11 @@ def _trio_main(
f' loglevel: {actor.loglevel}\n' f' loglevel: {actor.loglevel}\n'
) )
log.info( log.info(
'Started new trio process:\n' 'Started new trio subactor:\n'
+ +
actor_info '>\n' # like a "started/play"-icon from super perspective
+
actor_info,
) )
try: try:
@ -146,7 +149,9 @@ def _trio_main(
finally: finally:
log.info( log.info(
'Actor terminated\n' 'Subactor terminated\n'
+
'x\n' # like a "crossed-out/killed" from super perspective
+ +
actor_info actor_info
) )

View File

@ -46,7 +46,6 @@ from tractor.msg import (
Error, Error,
MsgType, MsgType,
Stop, Stop,
Yield,
types as msgtypes, types as msgtypes,
MsgCodec, MsgCodec,
MsgDec, MsgDec,
@ -140,71 +139,6 @@ def get_err_type(type_name: str) -> BaseException|None:
return type_ref return type_ref
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
# orig
# f' |\n'
# f' ------ - ------\n\n'
# f'{tb_str}\n'
# f' ------ - ------\n'
# f' _|\n'
f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pack_from_raise( def pack_from_raise(
local_err: ( local_err: (
ContextCancelled ContextCancelled
@ -277,6 +211,8 @@ class RemoteActorError(Exception):
) -> None: ) -> None:
super().__init__(message) super().__init__(message)
# for manual display without having to muck with `Exception.args`
self._message: str = message
# TODO: maybe a better name? # TODO: maybe a better name?
# - .errtype # - .errtype
# - .retype # - .retype
@ -504,43 +440,61 @@ class RemoteActorError(Exception):
reprol_str: str = ( reprol_str: str = (
f'{type(self).__name__}' # type name f'{type(self).__name__}' # type name
f'[{self.boxed_type_str}]' # parameterized by boxed type f'[{self.boxed_type_str}]' # parameterized by boxed type
'(' # init-style look
) )
_repr: str = self._mk_fields_str( _repr: str = self._mk_fields_str(
self.reprol_fields, self.reprol_fields,
end_char=' ', end_char=' ',
) )
if _repr:
reprol_str += '(' # init-style call
return ( return (
reprol_str reprol_str
+ +
_repr _repr
) )
def __repr__(self) -> str: def pformat(self) -> str:
''' '''
Nicely formatted boxed error meta data + traceback. Nicely formatted boxed error meta data + traceback, OR just
the normal message from `.args` (for eg. as you'd want shown
by a locally raised `ContextCancelled`).
''' '''
fields: str = self._mk_fields_str( tb_str: str = self.tb_str
_body_fields if tb_str:
+ fields: str = self._mk_fields_str(
self.extra_body_fields, _body_fields
) +
body: str = pformat_boxed_tb( self.extra_body_fields,
tb_str=self.tb_str, )
fields_str=fields, from tractor.devx import (
field_prefix=' |_', pformat_boxed_tb,
# ^- is so that it's placed like so, )
# just after <Type( body: str = pformat_boxed_tb(
# |___ .. tb_str=tb_str,
tb_body_indent=1, fields_str=fields,
) field_prefix=' |_',
# ^- is so that it's placed like so,
# just after <Type(
# |___ ..
tb_body_indent=1,
)
else:
body: str = textwrap.indent(
self._message,
prefix=' ',
) + '\n'
return ( return (
f'<{type(self).__name__}(\n' f'<{type(self).__name__}(\n'
f'{body}' f'{body}'
')>' ')>'
) )
__repr__ = pformat
__str__ = pformat
def unwrap( def unwrap(
self, self,
) -> BaseException: ) -> BaseException:
@ -870,12 +824,9 @@ def pack_error(
def unpack_error( def unpack_error(
msg: Error, msg: Error,
chan: Channel,
chan: Channel|None = None,
box_type: RemoteActorError = RemoteActorError, box_type: RemoteActorError = RemoteActorError,
hide_tb: bool = True,
) -> None|Exception: ) -> None|Exception:
''' '''
Unpack an 'error' message from the wire Unpack an 'error' message from the wire
@ -885,12 +836,10 @@ def unpack_error(
which is the responsibilitiy of the caller. which is the responsibilitiy of the caller.
''' '''
__tracebackhide__: bool = hide_tb
if not isinstance(msg, Error): if not isinstance(msg, Error):
return None return None
# retrieve the remote error's encoded details from fields # retrieve the remote error's msg-encoded details
tb_str: str = msg.tb_str tb_str: str = msg.tb_str
message: str = ( message: str = (
f'{chan.uid}\n' f'{chan.uid}\n'
@ -919,7 +868,6 @@ def unpack_error(
# original source error. # original source error.
elif boxed_type_str == 'RemoteActorError': elif boxed_type_str == 'RemoteActorError':
assert boxed_type is RemoteActorError assert boxed_type is RemoteActorError
# assert len(error_dict['relay_path']) >= 1
assert len(msg.relay_path) >= 1 assert len(msg.relay_path) >= 1
exc = box_type( exc = box_type(
@ -1004,8 +952,6 @@ def _raise_from_unexpected_msg(
raise unpack_error( raise unpack_error(
msg, msg,
ctx.chan, ctx.chan,
hide_tb=hide_tb,
) from src_err ) from src_err
# `MsgStream` termination msg. # `MsgStream` termination msg.
@ -1075,6 +1021,7 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None, src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None, src_type_error: TypeError|None = None,
is_invalid_payload: bool = False,
) -> MsgTypeError: ) -> MsgTypeError:
''' '''
@ -1089,17 +1036,13 @@ def _mk_msg_type_err(
'`codec` must be a `MsgCodec` for send-side errors?' '`codec` must be a `MsgCodec` for send-side errors?'
) )
from tractor.devx import (
pformat_caller_frame,
)
# no src error from `msgspec.msgpack.Decoder.decode()` so # no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part. # prolly a manual type-check on our part.
if message is None: if message is None:
fmt_stack: str = ( tb_fmt: str = pformat_caller_frame(stack_limit=3)
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
field_prefix=' ',
indent='',
)
message: str = ( message: str = (
f'invalid msg -> {msg}: {type(msg)}\n\n' f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n' f'{tb_fmt}\n'
@ -1136,47 +1079,57 @@ def _mk_msg_type_err(
# `Channel.recv()` case # `Channel.recv()` case
else: else:
# decode the msg-bytes using the std msgpack if is_invalid_payload:
# interchange-prot (i.e. without any msg_type: str = type(msg)
# `msgspec.Struct` handling) so that we can message: str = (
# determine what `.msg.types.Msg` is the culprit f'invalid `{msg_type.__qualname__}` payload\n\n'
# by reporting the received value. f'<{type(msg).__qualname__}(\n'
msg_dict: dict = msgpack.decode(msg) f' |_pld: {codec.pld_spec_str} = {msg.pld!r}'
msg_type_name: str = msg_dict['msg_type'] f')>\n'
msg_type = getattr(msgtypes, msg_type_name) )
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n' else:
) # decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
# XXX be "fancy" and see if we can determine the exact
# invalid field such that we can comprehensively report
# the specific field's type problem.
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += (
f'{msg.rstrip("`")}\n\n'
f'<{msg_type.__qualname__}(\n'
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
)
if verb_header: if verb_header:
message = f'{verb_header} ' + message message = f'{verb_header} ' + message
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += (
f'{msg.rstrip("`")}\n\n'
f'<{msg_type.__qualname__}(\n'
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
)
msgtyperr = MsgTypeError.from_decode( msgtyperr = MsgTypeError.from_decode(
message=message, message=message,
msgdict=msg_dict, msgdict=msg_dict,

View File

@ -68,40 +68,6 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
# TODO: remove and/or rework?
# -[ ] rename to `unwrap_result()` and use
# `._raise_from_unexpected_msg()` (after tweak to accept a `chan:
# Channel` arg) in key block??
# -[ ] pretty sure this is entirely covered by
# `_exceptions._raise_from_unexpected_msg()` so REMOVE!
# def _unwrap_msg(
# msg: Return|Error,
# ctx: Context,
# hide_tb: bool = True,
# ) -> Any:
# '''
# Unwrap a final result from a `{return: <Any>}` IPC msg.
# '''
# __tracebackhide__: bool = hide_tb
# try:
# return msg.pld
# except AttributeError as err:
# # internal error should never get here
# # assert msg.get('cid'), (
# assert msg.cid, (
# "Received internal error at portal?"
# )
# raise unpack_error(
# msg,
# ctx.chan,
# ) from err
class Portal: class Portal:
''' '''
A 'portal' to a memory-domain-separated `Actor`. A 'portal' to a memory-domain-separated `Actor`.
@ -173,12 +139,13 @@ class Portal:
portal=self, portal=self,
) )
# @api_frame
async def result(self) -> Any: async def result(self) -> Any:
''' '''
Return the result(s) from the remote actor's "main" task. Return the result(s) from the remote actor's "main" task.
''' '''
# __tracebackhide__ = True __tracebackhide__ = True
# Check for non-rpc errors slapped on the # Check for non-rpc errors slapped on the
# channel for which we always raise # channel for which we always raise
exc = self.channel._exc exc = self.channel._exc
@ -254,11 +221,11 @@ class Portal:
return False return False
reminfo: str = ( reminfo: str = (
f'`Portal.cancel_actor()` => {self.channel.uid}\n' f'Portal.cancel_actor() => {self.channel.uid}\n'
f' |_{chan}\n' f'|_{chan}\n'
) )
log.cancel( log.cancel(
f'Sending runtime `.cancel()` request to peer\n\n' f'Requesting runtime cancel for peer\n\n'
f'{reminfo}' f'{reminfo}'
) )
@ -435,7 +402,6 @@ class Portal:
yield stream yield stream
finally: finally:
# cancel the far end task on consumer close # cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using # NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one # this ``.open_fream_from()`` api, the stream is one a one
@ -496,7 +462,7 @@ class LocalPortal:
async def open_portal( async def open_portal(
channel: Channel, channel: Channel,
nursery: trio.Nursery|None = None, tn: trio.Nursery|None = None,
start_msg_loop: bool = True, start_msg_loop: bool = True,
shield: bool = False, shield: bool = False,
@ -504,15 +470,19 @@ async def open_portal(
''' '''
Open a ``Portal`` through the provided ``channel``. Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing (normally Spawns a background task to handle RPC processing, normally
done by the actor-runtime implicitly). done by the actor-runtime implicitly via a call to
`._rpc.process_messages()`. just after connection establishment.
''' '''
actor = current_actor() actor = current_actor()
assert actor assert actor
was_connected: bool = False was_connected: bool = False
async with maybe_open_nursery(nursery, shield=shield) as nursery: async with maybe_open_nursery(
tn,
shield=shield,
) as tn:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()
@ -524,7 +494,7 @@ async def open_portal(
msg_loop_cs: trio.CancelScope|None = None msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop: if start_msg_loop:
from ._runtime import process_messages from ._runtime import process_messages
msg_loop_cs = await nursery.start( msg_loop_cs = await tn.start(
partial( partial(
process_messages, process_messages,
actor, actor,
@ -544,7 +514,7 @@ async def open_portal(
await channel.aclose() await channel.aclose()
# cancel background msg loop task # cancel background msg loop task
if msg_loop_cs: if msg_loop_cs is not None:
msg_loop_cs.cancel() msg_loop_cs.cancel()
nursery.cancel_scope.cancel() tn.cancel_scope.cancel()

View File

@ -124,8 +124,9 @@ async def open_root_actor(
# usage by a clobbered TTY's stdstreams! # usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs): def block_bps(*args, **kwargs):
raise RuntimeError( raise RuntimeError(
'`tractor` blocks built-in `breakpoint()` calls by default!\n' 'Trying to use `breakpoint()` eh?\n'
'If you need to us it please install `greenback` and set ' 'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime ' '`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n' '(either via `.open_nursery()` or `open_root_actor()`)\n'
) )

View File

@ -57,6 +57,7 @@ from ._exceptions import (
from .devx import ( from .devx import (
maybe_wait_for_debugger, maybe_wait_for_debugger,
_debug, _debug,
add_div,
) )
from . import _state from . import _state
from .log import get_logger from .log import get_logger
@ -64,11 +65,13 @@ from .msg import (
current_codec, current_codec,
MsgCodec, MsgCodec,
NamespacePath, NamespacePath,
pretty_struct,
) )
from tractor.msg.types import ( from tractor.msg.types import (
CancelAck, CancelAck,
Error, Error,
Msg, Msg,
MsgType,
Return, Return,
Start, Start,
StartAck, StartAck,
@ -248,6 +251,9 @@ async def _errors_relayed_via_ipc(
) -> None: ) -> None:
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()`
try: try:
yield # run RPC invoke body yield # run RPC invoke body
@ -271,6 +277,8 @@ async def _errors_relayed_via_ipc(
# TODO: maybe we'll want different "levels" of debugging # TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ? # eventualy such as ('app', 'supervisory', 'runtime') ?
#
# -[ ] this if check is duplicate with `._maybe_enter_pm()`..
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
entered_debug: bool = False entered_debug: bool = False
if ( if (
@ -294,7 +302,6 @@ async def _errors_relayed_via_ipc(
) )
) )
): ):
# await _debug.pause()
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this # => I can't think of a reason that inspecting this
@ -302,7 +309,14 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of # recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err) log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
entered_debug = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if not entered_debug: if not entered_debug:
log.exception( log.exception(
'RPC task crashed\n' 'RPC task crashed\n'
@ -432,6 +446,8 @@ async def _invoke(
) )
context: bool = False context: bool = False
assert not _state._ctxvar_Context.get()
# TODO: deprecate this style.. # TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
@ -555,6 +571,7 @@ async def _invoke(
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
ctx._scope_nursery = tn ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx) task_status.started(ctx)
# TODO: should would be nice to have our # TODO: should would be nice to have our
@ -590,7 +607,6 @@ async def _invoke(
cs: CancelScope = ctx._scope cs: CancelScope = ctx._scope
if cs.cancel_called: if cs.cancel_called:
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by ' explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -619,23 +635,9 @@ async def _invoke(
else: else:
explain += 'a remote peer' explain += 'a remote peer'
# TODO: move this "div centering" into
# a helper for use elsewhere!
div_chars: str = '------ - ------'
div_offset: int = (
round(len(explain)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
explain += ( explain += (
div_str + add_div(message=explain)
+
f'<= canceller: {canceller}\n' f'<= canceller: {canceller}\n'
f'=> cancellee: {our_uid}\n' f'=> cancellee: {our_uid}\n'
# TODO: better repr for ctx tasks.. # TODO: better repr for ctx tasks..
@ -662,10 +664,10 @@ async def _invoke(
boxed_type=trio.Cancelled, boxed_type=trio.Cancelled,
canceller=canceller, canceller=canceller,
) )
# assign local error so that the `.outcome` # does this matter other then for
# resolves to an error for both reporting and # consistentcy/testing? |_ no user code should be
# state checks. # in this scope at this point..
ctx._local_error = ctxc # ctx._local_error = ctxc
raise ctxc raise ctxc
# XXX: do we ever trigger this block any more? # XXX: do we ever trigger this block any more?
@ -675,6 +677,13 @@ async def _invoke(
BaseException, BaseException,
) as scope_error: ) as scope_error:
if (
isinstance(scope_error, RuntimeError)
and scope_error.args
and 'Cancel scope stack corrupted' in scope_error.args[0]
):
log.exception('Cancel scope stack corrupted!?\n')
# _debug.mk_pdb().set_trace()
# always set this (child) side's exception as the # always set this (child) side's exception as the
# local error on the context # local error on the context
@ -708,17 +717,32 @@ async def _invoke(
res_type_str, res_type_str,
res_str, res_str,
) = ( ) = (
('error', f'{type(merr)}',) ('error', f'{type(merr)}',) if merr
if merr
else ( else (
'result', 'result',
f'`{repr(ctx.outcome)}`', f'`{repr(ctx.outcome)}`',
) )
) )
log.runtime( message: str = (
f'IPC context terminated with a final {res_type_str}\n\n' f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}' f'{ctx}'
) )
if merr:
from tractor import RemoteActorError
if not isinstance(merr, RemoteActorError):
fmt_merr: str = (
f'\n{merr!r}\n'
# f'{merr.args[0]!r}\n'
)
else:
fmt_merr = f'\n{merr!r}'
log.error(
message
+
fmt_merr
)
else:
log.runtime(message)
async def try_ship_error_to_remote( async def try_ship_error_to_remote(
@ -774,7 +798,10 @@ async def process_messages(
shield: bool = False, shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool: ) -> (
bool, # chan diconnected
MsgType, # last msg
):
''' '''
This is the low-level, per-IPC-channel, RPC task scheduler loop. This is the low-level, per-IPC-channel, RPC task scheduler loop.
@ -816,11 +843,6 @@ async def process_messages(
# |_ for ex, from `aioquic` which exposed "stream ids": # |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
log.runtime(
'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: Msg|None = None msg: Msg|None = None
try: try:
@ -834,12 +856,15 @@ async def process_messages(
async for msg in chan: async for msg in chan:
log.transport( # type: ignore log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n' f'IPC msg from peer\n'
f'<= {chan.uid}\n\n'
# TODO: avoid fmting depending on loglevel for perf? # TODO: avoid fmting depending on loglevel for perf?
# -[ ] specifically `pformat()` sub-call..? # -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this?
# -[ ] use `.msg.pretty_struct` here now instead! # -[ ] use `.msg.pretty_struct` here now instead!
f'{pformat(msg)}\n' # f'{pretty_struct.pformat(msg)}\n'
f'{msg}\n'
) )
match msg: match msg:
@ -952,11 +977,19 @@ async def process_messages(
kwargs=kwargs, # type-spec this? see `msg.types` kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid, uid=actorid,
): ):
log.runtime( start_status: str = (
'Handling RPC `Start` request from\n' 'Handling RPC `Start` request\n'
f'peer: {actorid}\n' f'<= peer: {actorid}\n\n'
'\n' f' |_{chan}\n'
f'=> {ns}.{funcname}({kwargs})\n' f' |_cid: {cid}\n\n'
# f' |_{ns}.{funcname}({kwargs})\n'
f'>> {actor.uid}\n'
f' |_{actor}\n'
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
# f' |_{ns}.{funcname}({kwargs})\n\n'
# f'{pretty_struct.pformat(msg)}\n'
) )
# runtime-internal endpoint: `Actor.<funcname>` # runtime-internal endpoint: `Actor.<funcname>`
@ -985,6 +1018,10 @@ async def process_messages(
await chan.send(err_msg) await chan.send(err_msg)
continue continue
start_status += (
f' -> func: {func}\n'
)
# schedule a task for the requested RPC function # schedule a task for the requested RPC function
# in the actor's main "service nursery". # in the actor's main "service nursery".
# #
@ -992,18 +1029,8 @@ async def process_messages(
# supervision isolation? would avoid having to # supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks` # manage RPC tasks individually in `._rpc_tasks`
# table? # table?
log.runtime( start_status += ' -> scheduling new task..\n'
f'Spawning task for RPC request\n' log.runtime(start_status)
f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n'
# ^-TODO-^ maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n'
f'=> {actor}\n'
f' |_cid: {cid}\n'
f' |>> {func}()\n'
)
try: try:
ctx: Context = await actor._service_n.start( ctx: Context = await actor._service_n.start(
partial( partial(
@ -1031,8 +1058,9 @@ async def process_messages(
# scoped exception from ``_invoke()`` itself. # scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception): if isinstance(err := ctx, Exception):
log.warning( log.warning(
'Task for RPC failed?' start_status
f'|_ {func}()\n\n' +
' -> task for RPC failed?\n\n'
f'{err}' f'{err}'
) )
continue continue
@ -1097,25 +1125,24 @@ async def process_messages(
parent_chan=chan, parent_chan=chan,
) )
except ( except TransportClosed:
TransportClosed,
):
# channels "breaking" (for TCP streams by EOF or 104 # channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown # connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of # handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean # the message loop and expect the teardown sequence to clean
# up.. # up..
# TODO: add a teardown handshake? and, #
# TODO: maybe add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call? # -[ ] don't show this msg if it's an ephemeral discovery ep call?
# -[ ] figure out how this will break with other transports? # -[ ] figure out how this will break with other transports?
log.runtime( log.runtime(
f'channel closed abruptly with\n' f'IPC channel closed abruptly\n'
f'peer: {chan.uid}\n' f'<=x peer: {chan.uid}\n'
f'|_{chan.raddr}\n' f' |_{chan.raddr}\n'
) )
# transport **WAS** disconnected # transport **WAS** disconnected
return True return (True, msg)
except ( except (
Exception, Exception,
@ -1152,12 +1179,17 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( if msg is None:
'Exiting IPC msg loop with final msg\n\n' message: str = 'Exiting IPC msg loop without receiving a msg?'
f'<= peer: {chan.uid}\n' else:
f'|_{chan}\n\n' message: str = (
f'{pformat(msg)}\n\n' 'Exiting IPC msg loop with final msg\n\n'
) f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
f'{pretty_struct.pformat(msg)}'
)
log.runtime(message)
# transport **WAS NOT** disconnected # transport **WAS NOT** disconnected
return False return (False, msg)

View File

@ -49,6 +49,7 @@ from pprint import pformat
import signal import signal
import sys import sys
from typing import ( from typing import (
Any,
Callable, Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -68,7 +69,7 @@ from tractor.msg import (
pretty_struct, pretty_struct,
NamespacePath, NamespacePath,
types as msgtypes, types as msgtypes,
Msg, MsgType,
) )
from ._ipc import Channel from ._ipc import Channel
from ._context import ( from ._context import (
@ -96,19 +97,6 @@ from ._rpc import (
process_messages, process_messages,
try_ship_error_to_remote, try_ship_error_to_remote,
) )
# from tractor.msg.types import (
# Aid,
# SpawnSpec,
# Start,
# StartAck,
# Started,
# Yield,
# Stop,
# Return,
# Error,
# )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -315,29 +303,32 @@ class Actor:
self._reg_addrs = addrs self._reg_addrs = addrs
async def wait_for_peer( async def wait_for_peer(
self, uid: tuple[str, str] self,
uid: tuple[str, str],
) -> tuple[trio.Event, Channel]: ) -> tuple[trio.Event, Channel]:
''' '''
Wait for a connection back from a spawned actor with a `uid` Wait for a connection back from a (spawned sub-)actor with
using a `trio.Event` for sync. a `uid` using a `trio.Event` for sync.
''' '''
log.runtime(f"Waiting for peer {uid} to connect") log.debug(f'Waiting for peer {uid!r} to connect')
event = self._peer_connected.setdefault(uid, trio.Event()) event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait() await event.wait()
log.runtime(f"{uid} successfully connected back to us") log.debug(f'{uid!r} successfully connected back to us')
return event, self._peers[uid][-1] return event, self._peers[uid][-1]
def load_modules( def load_modules(
self, self,
debug_mode: bool = False, # debug_mode: bool = False,
) -> None: ) -> None:
''' '''
Load enabled RPC py-modules locally (after process fork/spawn). Load explicitly enabled python modules from local fs after
process spawn.
Since this actor may be spawned on a different machine from Since this actor may be spawned on a different machine from
the original nursery we need to try and load the local module the original nursery we need to try and load the local module
code (presuming it exists). code manually (presuming it exists).
''' '''
try: try:
@ -350,16 +341,21 @@ class Actor:
_mp_fixup_main._fixup_main_from_path( _mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path']) parent_data['init_main_from_path'])
status: str = 'Attempting to import enabled modules:\n'
for modpath, filepath in self.enable_modules.items(): for modpath, filepath in self.enable_modules.items():
# XXX append the allowed module to the python path which # XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports. # should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath)) sys.path.append(os.path.dirname(filepath))
log.runtime(f"Attempting to import {modpath}@{filepath}") status += (
mod = importlib.import_module(modpath) f'|_{modpath!r} -> {filepath!r}\n'
)
mod: ModuleType = importlib.import_module(modpath)
self._mods[modpath] = mod self._mods[modpath] = mod
if modpath == '__main__': if modpath == '__main__':
self._mods['__mp_main__'] = mod self._mods['__mp_main__'] = mod
log.runtime(status)
except ModuleNotFoundError: except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error # it is expected the corresponding `ModuleNotExposed` error
# will be raised later # will be raised later
@ -413,21 +409,23 @@ class Actor:
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid their_uid: tuple[str, str]|None = chan.uid
con_msg: str = '' con_status: str = ''
# TODO: remove this branch since can never happen?
# NOTE: `.uid` is only set after first contact
if their_uid: if their_uid:
# NOTE: `.uid` is only set after first contact con_status = (
con_msg = ( 'IPC Re-connection from already known peer?\n'
'IPC Re-connection from already known peer? '
) )
else: else:
con_msg = ( con_status = (
'New IPC connection to us ' 'New inbound IPC connection <=\n'
) )
con_msg += ( con_status += (
f'<= @{chan.raddr}\n'
f'|_{chan}\n' f'|_{chan}\n'
# f' |_@{chan.raddr}\n\n' # f' |_@{chan.raddr}\n\n'
# ^-TODO-^ remove since alfready in chan.__repr__()?
) )
# send/receive initial handshake response # send/receive initial handshake response
try: try:
@ -447,13 +445,13 @@ class Actor:
# a bound listener on the "arbiter" addr. the reset will be # a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place. # because the handshake was never meant took place.
log.warning( log.warning(
con_msg con_status
+ +
' -> But failed to handshake? Ignoring..\n' ' -> But failed to handshake? Ignoring..\n'
) )
return return
con_msg += ( con_status += (
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n' f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
) )
# IPC connection tracking for both peers and new children: # IPC connection tracking for both peers and new children:
@ -466,7 +464,7 @@ class Actor:
None, None,
) )
if event: if event:
con_msg += ( con_status += (
' -> Waking subactor spawn waiters: ' ' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n' f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
@ -477,7 +475,7 @@ class Actor:
event.set() event.set()
else: else:
con_msg += ( con_status += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore ) # type: ignore
@ -491,13 +489,18 @@ class Actor:
# TODO: can we just use list-ref directly? # TODO: can we just use list-ref directly?
chans.append(chan) chans.append(chan)
log.runtime(con_msg) con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
disconnected: bool = False disconnected: bool = False
last_msg: MsgType
try: try:
disconnected: bool = await process_messages( (
disconnected,
last_msg,
) = await process_messages(
self, self,
chan, chan,
) )
@ -598,16 +601,24 @@ class Actor:
# that the IPC layer may have failed # that the IPC layer may have failed
# unexpectedly since it may be the cause of # unexpectedly since it may be the cause of
# other downstream errors. # other downstream errors.
entry = local_nursery._children.get(uid) entry: tuple|None = local_nursery._children.get(uid)
if entry: if entry:
proc: trio.Process proc: trio.Process
_, proc, _ = entry _, proc, _ = entry
if ( if (
(poll := getattr(proc, 'poll', None)) (poll := getattr(proc, 'poll', None))
and poll() is None and
poll() is None # proc still alive
): ):
log.cancel( # TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n' f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n' f'<=x {chan.uid}@{chan.raddr}\n'
@ -616,17 +627,17 @@ class Actor:
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected # drop ref to channel so it can be gc-ed and disconnected
log.runtime( con_teardown_status: str = (
f'Disconnected IPC channel:\n' f'IPC channel disconnected:\n'
f'uid: {chan.uid}\n' f'<=x uid: {chan.uid}\n'
f'|_{pformat(chan)}\n' f' |_{pformat(chan)}\n\n'
) )
chans.remove(chan) chans.remove(chan)
# TODO: do we need to be this pedantic? # TODO: do we need to be this pedantic?
if not chans: if not chans:
log.runtime( con_teardown_status += (
f'No more channels with {chan.uid}' f'-> No more channels with {chan.uid}'
) )
self._peers.pop(uid, None) self._peers.pop(uid, None)
@ -640,15 +651,16 @@ class Actor:
f' |_[{i}] {pformat(chan)}\n' f' |_[{i}] {pformat(chan)}\n'
) )
log.runtime( con_teardown_status += (
f'Remaining IPC {len(self._peers)} peers:\n' f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
+ peers_str
) )
# No more channels to other actors (at all) registered # No more channels to other actors (at all) registered
# as connected. # as connected.
if not self._peers: if not self._peers:
log.runtime("Signalling no more peer channel connections") con_teardown_status += (
'Signalling no more peer channel connections'
)
self._no_more_peers.set() self._no_more_peers.set()
# NOTE: block this actor from acquiring the # NOTE: block this actor from acquiring the
@ -723,13 +735,16 @@ class Actor:
# TODO: figure out why this breaks tests.. # TODO: figure out why this breaks tests..
db_cs.cancel() db_cs.cancel()
log.runtime(con_teardown_status)
# finally block closure
# TODO: rename to `._deliver_payload()` since this handles # TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD # more then just `result` msgs now obvi XD
async def _deliver_ctx_payload( async def _deliver_ctx_payload(
self, self,
chan: Channel, chan: Channel,
cid: str, cid: str,
msg: Msg|MsgTypeError, msg: MsgType|MsgTypeError,
) -> None|bool: ) -> None|bool:
''' '''
@ -754,7 +769,7 @@ class Actor:
# XXX don't need right since it's always in msg? # XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n' # f'=> cid: {cid}\n\n'
f'{pretty_struct.Struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
return return
@ -896,9 +911,11 @@ class Actor:
cid=cid, cid=cid,
) )
log.runtime( log.runtime(
'Sending RPC start msg\n\n' 'Sending RPC `Start`\n\n'
f'=> peer: {chan.uid}\n' f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n' f' |_ {ns}.{func}({kwargs})\n\n'
f'{pretty_struct.pformat(msg)}'
) )
await chan.send(msg) await chan.send(msg)
@ -955,31 +972,29 @@ class Actor:
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive runtime state from our parent # Receive post-spawn runtime state from our parent.
# parent_data: dict[str, Any]
# parent_data = await chan.recv()
# TODO: maybe we should just wrap this directly
# in a `Actor.spawn_info: SpawnInfo` struct?
spawnspec: msgtypes.SpawnSpec = await chan.recv() spawnspec: msgtypes.SpawnSpec = await chan.recv()
self._spawn_spec = spawnspec self._spawn_spec = spawnspec
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
log.runtime( log.runtime(
'Received runtime spec from parent:\n\n' 'Received runtime spec from parent:\n\n'
f'{pformat(spawnspec)}\n'
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n'
) )
# accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
# rvs = parent_data.pop('_runtime_vars') # TODO: another `Struct` for rtvs..
rvs = spawnspec._runtime_vars rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']: if rvs['_debug_mode']:
try: try:
log.info( # TODO: maybe return some status msgs upward
# to that we can emit them in `con_status`
# instead?
log.devx(
'Enabling `stackscope` traces on SIGUSR1' 'Enabling `stackscope` traces on SIGUSR1'
) )
from .devx import enable_stack_on_sig from .devx import enable_stack_on_sig
@ -989,7 +1004,6 @@ class Actor:
'`stackscope` not installed for use in debug mode!' '`stackscope` not installed for use in debug mode!'
) )
log.runtime(f'Runtime vars are: {rvs}')
rvs['_is_root'] = False rvs['_is_root'] = False
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
@ -1006,18 +1020,12 @@ class Actor:
for val in spawnspec.reg_addrs for val in spawnspec.reg_addrs
] ]
# for attr, value in parent_data.items(): # TODO: better then monkey patching..
# -[ ] maybe read the actual f#$-in `._spawn_spec` XD
for _, attr, value in pretty_struct.iter_fields( for _, attr, value in pretty_struct.iter_fields(
spawnspec, spawnspec,
): ):
setattr(self, attr, value) setattr(self, attr, value)
# if (
# attr == 'reg_addrs'
# and value
# ):
# self.reg_addrs = [tuple(val) for val in value]
# else:
# setattr(self, attr, value)
return ( return (
chan, chan,
@ -1026,12 +1034,11 @@ class Actor:
except OSError: # failed to connect except OSError: # failed to connect
log.warning( log.warning(
f'Failed to connect to parent!?\n\n' f'Failed to connect to spawning parent actor!?\n'
'Closing IPC [TCP] transport server to\n' f'x=> {parent_addr}\n'
f'{parent_addr}\n'
f'|_{self}\n\n' f'|_{self}\n\n'
) )
await self.cancel(chan=None) # self cancel await self.cancel(req_chan=None) # self cancel
raise raise
async def _serve_forever( async def _serve_forever(
@ -1109,8 +1116,7 @@ class Actor:
# chan whose lifetime limits the lifetime of its remotely # chan whose lifetime limits the lifetime of its remotely
# requested and locally spawned RPC tasks - similar to the # requested and locally spawned RPC tasks - similar to the
# supervision semantics of a nursery wherein the actual # supervision semantics of a nursery wherein the actual
# implementation does start all such tasks in # implementation does start all such tasks in a sub-nursery.
# a sub-nursery.
req_chan: Channel|None, req_chan: Channel|None,
) -> bool: ) -> bool:
@ -1151,7 +1157,7 @@ class Actor:
# other) repr fields instead of doing this all manual.. # other) repr fields instead of doing this all manual..
msg: str = ( msg: str = (
f'Runtime cancel request from {requester_type}:\n\n' f'Runtime cancel request from {requester_type}:\n\n'
f'<= .cancel(): {requesting_uid}\n' f'<= .cancel(): {requesting_uid}\n\n'
) )
# TODO: what happens here when we self-cancel tho? # TODO: what happens here when we self-cancel tho?
@ -1166,8 +1172,8 @@ class Actor:
dbcs = _debug.DebugStatus.req_cs dbcs = _debug.DebugStatus.req_cs
if dbcs is not None: if dbcs is not None:
msg += ( msg += (
'>> Cancelling active debugger request..\n' '-> Cancelling active debugger request..\n'
f'|_{_debug.Lock}\n' f'|_{_debug.Lock.pformat()}'
) )
dbcs.cancel() dbcs.cancel()
@ -1418,7 +1424,12 @@ class Actor:
''' '''
if self._server_n: if self._server_n:
log.runtime("Shutting down channel server") # TODO: obvi a different server type when we eventually
# support some others XD
server_prot: str = 'TCP'
log.runtime(
f'Cancelling {server_prot} server'
)
self._server_n.cancel_scope.cancel() self._server_n.cancel_scope.cancel()
return True return True
@ -1602,6 +1613,7 @@ async def async_main(
assert accept_addrs assert accept_addrs
try: try:
# TODO: why is this not with the root nursery?
actor._server_n = await service_nursery.start( actor._server_n = await service_nursery.start(
partial( partial(
actor._serve_forever, actor._serve_forever,
@ -1886,13 +1898,13 @@ class Arbiter(Actor):
sockaddrs: list[tuple[str, int]] = [] sockaddrs: list[tuple[str, int]] = []
sockaddr: tuple[str, int] sockaddr: tuple[str, int]
for (aname, _), sockaddr in self._registry.items(): mailbox_info: str = 'Actor registry contact infos:\n'
log.runtime( for uid, sockaddr in self._registry.items():
f'Actor mailbox info:\n' mailbox_info += (
f'aname: {aname}\n' f'|_uid: {uid}\n'
f'sockaddr: {sockaddr}\n' f'|_sockaddr: {sockaddr}\n\n'
) )
if name == aname: if name == uid[0]:
sockaddrs.append(sockaddr) sockaddrs.append(sockaddr)
if not sockaddrs: if not sockaddrs:
@ -1904,6 +1916,7 @@ class Arbiter(Actor):
if not isinstance(uid, trio.Event): if not isinstance(uid, trio.Event):
sockaddrs.append(self._registry[uid]) sockaddrs.append(self._registry[uid])
log.runtime(mailbox_info)
return sockaddrs return sockaddrs
async def register_actor( async def register_actor(

View File

@ -451,10 +451,9 @@ async def trio_proc(
proc: trio.Process|None = None proc: trio.Process|None = None
try: try:
try: try:
# TODO: needs ``trio_typing`` patch? proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd)
proc = await trio.lowlevel.open_process(spawn_cmd)
log.runtime( log.runtime(
'Started new sub-proc\n' 'Started new child\n'
f'|_{proc}\n' f'|_{proc}\n'
) )

View File

@ -19,13 +19,19 @@ Per process state
""" """
from __future__ import annotations from __future__ import annotations
from contextvars import (
ContextVar,
)
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
) )
from trio.lowlevel import current_task
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
from ._context import Context
_current_actor: Actor|None = None # type: ignore # noqa _current_actor: Actor|None = None # type: ignore # noqa
@ -110,3 +116,20 @@ def debug_mode() -> bool:
def is_root_process() -> bool: def is_root_process() -> bool:
return _runtime_vars['_is_root'] return _runtime_vars['_is_root']
_ctxvar_Context: ContextVar[Context] = ContextVar(
'ipc_context',
default=None,
)
def current_ipc_ctx() -> Context:
ctx: Context = _ctxvar_Context.get()
if not ctx:
from ._exceptions import InternalError
raise InternalError(
'No IPC context has been allocated for this task yet?\n'
f'|_{current_task()}\n'
)
return ctx

View File

@ -364,14 +364,10 @@ class MsgStream(trio.abc.Channel):
if not self._eoc: if not self._eoc:
message: str = ( message: str = (
f'Context stream closed by {self._ctx.side!r}\n' f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
f'|_{self}\n' f'|_{self}\n'
) )
log.cancel( log.cancel(message)
'Stream self-closed before receiving EoC\n\n'
+
message
)
self._eoc = trio.EndOfChannel(message) self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?

View File

@ -30,7 +30,13 @@ from ._debug import (
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem, post_mortem as post_mortem,
mk_pdb as mk_pdb,
) )
from ._stackscope import ( from ._stackscope import (
enable_stack_on_sig as enable_stack_on_sig, enable_stack_on_sig as enable_stack_on_sig,
) )
from .pformat import (
add_div as add_div,
pformat_caller_frame as pformat_caller_frame,
pformat_boxed_tb as pformat_boxed_tb,
)

View File

@ -23,6 +23,8 @@ from __future__ import annotations
import inspect import inspect
# import msgspec # import msgspec
# from pprint import pformat # from pprint import pformat
import textwrap
import traceback
from types import ( from types import (
FrameType, FrameType,
FunctionType, FunctionType,
@ -175,3 +177,103 @@ def find_caller_info(
) )
return None return None
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
'''
Create a "boxed" looking traceback string.
Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
Any other parent/container "fields" can be passed in the
`fields_str` input along with other prefix/indent settings.
'''
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
# orig
# f' |\n'
# f' ------ - ------\n\n'
# f'{tb_str}\n'
# f' ------ - ------\n'
# f' _|\n'
f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str

File diff suppressed because it is too large Load Diff

View File

@ -65,7 +65,7 @@ def dump_task_tree() -> None:
level='cancel', level='cancel',
) )
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
log.pdb( log.devx(
f'Dumping `stackscope` tree for actor\n' f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n' f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n' f' |_{mp.current_process()}\n\n'
@ -104,7 +104,7 @@ def signal_handler(
subproc: ProcessType subproc: ProcessType
subactor: Actor subactor: Actor
for subactor, subproc, _ in an._children.values(): for subactor, subproc, _ in an._children.values():
log.pdb( log.devx(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n' f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n' f'{subactor}\n'
f' |_{subproc}\n' f' |_{subproc}\n'

View File

@ -0,0 +1,168 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Pretty formatters for use throughout the code base.
Mostly handy for logging and exception message content.
'''
import textwrap
import traceback
from trio import CancelScope
def add_div(
message: str,
div_str: str = '------ - ------',
) -> str:
'''
Add a "divider string" to the input `message` with
a little math to center it underneath.
'''
div_offset: int = (
round(len(message)/2)+1
-
round(len(div_str)/2)+1
)
div_str: str = (
'\n' + ' '*div_offset + f'{div_str}\n'
)
return div_str
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
'''
Create a "boxed" looking traceback string.
Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
Any other parent/container "fields" can be passed in the
`fields_str` input along with other prefix/indent settings.
'''
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
f'|\n'
f' ------ - ------\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str
def pformat_cs(
cs: CancelScope,
var_name: str = 'cs',
field_prefix: str = ' |_',
) -> str:
'''
Pretty format info about a `trio.CancelScope` including most
of its public state and `._cancel_status`.
The output can be modified to show a "var name" for the
instance as a field prefix, just a simple str before each
line more or less.
'''
fields: str = textwrap.indent(
(
f'cancel_called = {cs.cancel_called}\n'
f'cancelled_caught = {cs.cancelled_caught}\n'
f'_cancel_status = {cs._cancel_status}\n'
f'shield = {cs.shield}\n'
),
prefix=field_prefix,
)
return (
f'{var_name}: {cs}\n'
+
fields
)

View File

@ -21,6 +21,11 @@ Log like a forester!
from collections.abc import Mapping from collections.abc import Mapping
import sys import sys
import logging import logging
from logging import (
LoggerAdapter,
Logger,
StreamHandler,
)
import colorlog # type: ignore import colorlog # type: ignore
import trio import trio
@ -53,6 +58,7 @@ LEVELS: dict[str, int] = {
'RUNTIME': 15, 'RUNTIME': 15,
'CANCEL': 16, 'CANCEL': 16,
'PDB': 500, 'PDB': 500,
'DEVX': 600,
} }
# _custom_levels: set[str] = { # _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys() # lvlname.lower for lvlname in LEVELS.keys()
@ -62,6 +68,7 @@ STD_PALETTE = {
'CRITICAL': 'red', 'CRITICAL': 'red',
'ERROR': 'red', 'ERROR': 'red',
'PDB': 'white', 'PDB': 'white',
'DEVX': 'cyan',
'WARNING': 'yellow', 'WARNING': 'yellow',
'INFO': 'green', 'INFO': 'green',
'CANCEL': 'yellow', 'CANCEL': 'yellow',
@ -78,7 +85,7 @@ BOLD_PALETTE = {
# TODO: this isn't showing the correct '{filename}' # TODO: this isn't showing the correct '{filename}'
# as it did before.. # as it did before..
class StackLevelAdapter(logging.LoggerAdapter): class StackLevelAdapter(LoggerAdapter):
def transport( def transport(
self, self,
@ -86,7 +93,8 @@ class StackLevelAdapter(logging.LoggerAdapter):
) -> None: ) -> None:
''' '''
IPC level msg-ing. IPC transport level msg IO; generally anything below
`._ipc.Channel` and friends.
''' '''
return self.log(5, msg) return self.log(5, msg)
@ -102,7 +110,7 @@ class StackLevelAdapter(logging.LoggerAdapter):
msg: str, msg: str,
) -> None: ) -> None:
''' '''
Cancellation logging, mostly for runtime reporting. Cancellation sequencing, mostly for runtime reporting.
''' '''
return self.log( return self.log(
@ -116,11 +124,21 @@ class StackLevelAdapter(logging.LoggerAdapter):
msg: str, msg: str,
) -> None: ) -> None:
''' '''
Debugger logging. `pdb`-REPL (debugger) related statuses.
''' '''
return self.log(500, msg) return self.log(500, msg)
def devx(
self,
msg: str,
) -> None:
'''
"Developer experience" sub-sys statuses.
'''
return self.log(600, msg)
def log( def log(
self, self,
level, level,
@ -224,6 +242,7 @@ def get_logger(
'''Return the package log or a sub-logger for ``name`` if provided. '''Return the package log or a sub-logger for ``name`` if provided.
''' '''
log: Logger
log = rlog = logging.getLogger(_root_name) log = rlog = logging.getLogger(_root_name)
if ( if (
@ -278,7 +297,7 @@ def get_logger(
def get_console_log( def get_console_log(
level: str | None = None, level: str | None = None,
**kwargs, **kwargs,
) -> logging.LoggerAdapter: ) -> LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr. '''Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it. Yeah yeah, i know we can use ``DictConfig``. You do it.
@ -303,7 +322,7 @@ def get_console_log(
None, None,
) )
): ):
handler = logging.StreamHandler() handler = StreamHandler()
formatter = colorlog.ColoredFormatter( formatter = colorlog.ColoredFormatter(
LOG_FORMAT, LOG_FORMAT,
datefmt=DATE_FORMAT, datefmt=DATE_FORMAT,
@ -323,3 +342,19 @@ def get_loglevel() -> str:
# global module logger for tractor itself # global module logger for tractor itself
log = get_logger('tractor') log = get_logger('tractor')
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False

View File

@ -25,12 +25,12 @@ from contextlib import (
# asynccontextmanager as acm, # asynccontextmanager as acm,
contextmanager as cm, contextmanager as cm,
) )
from pprint import pformat from contextvars import ContextVar
from typing import ( from typing import (
Any, Any,
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
# Union, Union,
) )
# ------ - ------ # ------ - ------
from msgspec import ( from msgspec import (
@ -63,7 +63,7 @@ from .types import (
Started, Started,
Stop, Stop,
Yield, Yield,
# pretty_struct, pretty_struct,
) )
@ -75,6 +75,9 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
_def_any_pldec: MsgDec = mk_dec()
class PldRx(Struct): class PldRx(Struct):
''' '''
A "msg payload receiver". A "msg payload receiver".
@ -101,10 +104,13 @@ class PldRx(Struct):
''' '''
# TODO: better to bind it here? # TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel # _rx_mc: trio.MemoryReceiveChannel
_msgdec: MsgDec = mk_dec(spec=Any) _pldec: MsgDec
_ipc: Context|MsgStream|None = None _ipc: Context|MsgStream|None = None
@property
def pld_dec(self) -> MsgDec:
return self._pldec
@cm @cm
def apply_to_ipc( def apply_to_ipc(
self, self,
@ -122,9 +128,29 @@ class PldRx(Struct):
finally: finally:
self._ipc = None self._ipc = None
@cm
def limit_plds(
self,
spec: Union[Type[Struct]],
) -> MsgDec:
'''
Type-limit the loadable msg payloads via an applied
`MsgDec` given an input spec, revert to prior decoder on
exit.
'''
orig_dec: MsgDec = self._pldec
limit_dec: MsgDec = mk_dec(spec=spec)
try:
self._pldec = limit_dec
yield limit_dec
finally:
self._pldec = orig_dec
@property @property
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._msgdec.dec return self._pldec.dec
def recv_pld_nowait( def recv_pld_nowait(
self, self,
@ -135,9 +161,10 @@ class PldRx(Struct):
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
**kwargs, **dec_msg_kwargs,
) -> Any|Raw: ) -> Any|Raw:
__tracebackhide__: bool = True
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
@ -150,6 +177,7 @@ class PldRx(Struct):
msg, msg,
ctx=ctx, ctx=ctx,
expect_msg=expect_msg, expect_msg=expect_msg,
**dec_msg_kwargs,
) )
async def recv_pld( async def recv_pld(
@ -157,14 +185,16 @@ class PldRx(Struct):
ctx: Context, ctx: Context,
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
**kwargs **dec_msg_kwargs,
) -> Any|Raw: ) -> Any|Raw:
''' '''
Receive a `MsgType`, then decode and return its `.pld` field. Receive a `MsgType`, then decode and return its `.pld` field.
''' '''
__tracebackhide__: bool = hide_tb
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
@ -173,16 +203,20 @@ class PldRx(Struct):
await ctx._rx_chan.receive() await ctx._rx_chan.receive()
) )
return self.dec_msg( return self.dec_msg(
msg, msg=msg,
ctx=ctx, ctx=ctx,
expect_msg=expect_msg, expect_msg=expect_msg,
**dec_msg_kwargs,
) )
def dec_msg( def dec_msg(
self, self,
msg: MsgType, msg: MsgType,
ctx: Context, ctx: Context,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None,
raise_error: bool = True,
hide_tb: bool = True,
) -> PayloadT|Raw: ) -> PayloadT|Raw:
''' '''
@ -190,6 +224,7 @@ class PldRx(Struct):
return the value or raise an appropriate error. return the value or raise an appropriate error.
''' '''
__tracebackhide__: bool = hide_tb
match msg: match msg:
# payload-data shuttle msg; deliver the `.pld` value # payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code. # directly to IPC (primitive) client-consumer code.
@ -199,11 +234,12 @@ class PldRx(Struct):
|Return(pld=pld) # termination phase |Return(pld=pld) # termination phase
): ):
try: try:
pld: PayloadT = self._msgdec.decode(pld) pld: PayloadT = self._pldec.decode(pld)
log.runtime( log.runtime(
'Decode msg payload\n\n' 'Decoded msg payload\n\n'
f'{msg}\n\n' f'{msg}\n\n'
f'{pld}\n' f'where payload is\n'
f'|_pld={pld!r}\n'
) )
return pld return pld
@ -211,8 +247,9 @@ class PldRx(Struct):
except ValidationError as src_err: except ValidationError as src_err:
msgterr: MsgTypeError = _mk_msg_type_err( msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg, msg=msg,
codec=self._dec, codec=self.pld_dec,
src_validation_error=src_err, src_validation_error=src_err,
is_invalid_payload=True,
) )
msg: Error = pack_from_raise( msg: Error = pack_from_raise(
local_err=msgterr, local_err=msgterr,
@ -237,8 +274,62 @@ class PldRx(Struct):
case Error(): case Error():
src_err = MessagingError( src_err = MessagingError(
'IPC dialog termination by msg' 'IPC ctx dialog terminated without `Return`-ing a result\n'
f'Instead it raised {msg.boxed_type_str!r}!'
) )
# XXX NOTE XXX another super subtle runtime-y thing..
#
# - when user code (transitively) calls into this
# func (usually via a `Context/MsgStream` API) we
# generally want errors to propagate immediately
# and directly so that the user can define how it
# wants to handle them.
#
# HOWEVER,
#
# - for certain runtime calling cases, we don't want to
# directly raise since the calling code might have
# special logic around whether to raise the error
# or supress it silently (eg. a `ContextCancelled`
# received from the far end which was requested by
# this side, aka a self-cancel).
#
# SO, we offer a flag to control this.
if not raise_error:
return src_err
case Stop(cid=cid):
message: str = (
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
f'{ctx.peer_side!r} peer ?\n'
f'|_cid: {cid}\n\n'
f'{pretty_struct.pformat(msg)}\n'
)
if ctx._stream is None:
explain: str = (
f'BUT, no `MsgStream` (was) open(ed) on this '
f'{ctx.side!r}-side of the IPC ctx?\n'
f'Maybe check your code for streaming phase race conditions?\n'
)
log.warning(
message
+
explain
)
# let caller decide what to do when only one
# side opened a stream, don't raise.
return msg
else:
explain: str = (
'Received a `Stop` when it should NEVER be possible!?!?\n'
)
# TODO: this is constructed inside
# `_raise_from_unexpected_msg()` but maybe we
# should pass it in?
# src_err = trio.EndOfChannel(explain)
src_err = None
case _: case _:
src_err = InternalError( src_err = InternalError(
@ -246,6 +337,9 @@ class PldRx(Struct):
f'{msg}\n' f'{msg}\n'
) )
# TODO: maybe use the new `.add_note()` from 3.11?
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
#
# fallthrough and raise from `src_err` # fallthrough and raise from `src_err`
_raise_from_unexpected_msg( _raise_from_unexpected_msg(
ctx=ctx, ctx=ctx,
@ -253,12 +347,18 @@ class PldRx(Struct):
src_err=src_err, src_err=src_err,
log=log, log=log,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=False, hide_tb=hide_tb,
) )
async def recv_msg_w_pld( async def recv_msg_w_pld(
self, self,
ipc: Context|MsgStream, ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]: ) -> tuple[MsgType, PayloadT]:
''' '''
@ -268,16 +368,102 @@ class PldRx(Struct):
''' '''
msg: MsgType = await ipc._rx_chan.receive() msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs:
match msg:
case Stop():
return msg, None
# TODO: is there some way we can inject the decoded # TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original # payload into an existing output buffer for the original
# msg instance? # msg instance?
pld: PayloadT = self.dec_msg( pld: PayloadT = self.dec_msg(
msg, msg,
ctx=ipc, ctx=ipc,
expect_msg=expect_msg,
**kwargs,
) )
return msg, pld return msg, pld
# Always maintain a task-context-global `PldRx`
_def_pld_rx: PldRx = PldRx(
_pldec=_def_any_pldec,
)
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
'pld_rx',
default=_def_pld_rx,
)
def current_pldrx() -> PldRx:
'''
Return the current `trio.Task.context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
up from `tractor`'s transport layer but BEFORE the data is
yielded to application code, normally via an IPC primitive API
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by
the interchange backend.
- for `msgspec` see <PUTLINKHERE>.
NOTE that the `PldRx` itself is a per-`Context` global sub-system
that normally does not change other then the applied pld-spec
for the current `trio.Task`.
'''
# ctx: context = current_ipc_ctx()
# return ctx._pld_rx
return _ctxvar_PldRx.get()
@cm
def limit_plds(
spec: Union[Type[Struct]],
**kwargs,
) -> MsgDec:
'''
Apply a `MsgCodec` that will natively decode the SC-msg set's
`Msg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`.
'''
__tracebackhide__: bool = True
try:
# sanity on orig settings
orig_pldrx: PldRx = current_pldrx()
orig_pldec: MsgDec = orig_pldrx.pld_dec
with orig_pldrx.limit_plds(
spec=spec,
**kwargs,
) as pldec:
log.info(
'Applying payload-decoder\n\n'
f'{pldec}\n'
)
yield pldec
finally:
log.info(
'Reverted to previous payload-decoder\n\n'
f'{orig_pldec}\n'
)
assert (
(pldrx := current_pldrx()) is orig_pldrx
and
pldrx.pld_dec is orig_pldec
)
async def drain_to_final_msg( async def drain_to_final_msg(
ctx: Context, ctx: Context,
@ -308,67 +494,33 @@ async def drain_to_final_msg(
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
# from the far end. # from the far end.
pre_result_drained: list[MsgType] = [] pre_result_drained: list[MsgType] = []
return_msg: Return|None = None result_msg: Return|Error|None = None
while not ( while not (
ctx.maybe_error ctx.maybe_error
and not ctx._final_result_is_set() and not ctx._final_result_is_set()
): ):
try: try:
# TODO: can remove? # receive all msgs, scanning for either a final result
# await trio.lowlevel.checkpoint() # or error; the underlying call should never raise any
# remote error directly!
# NOTE: this REPL usage actually works here dawg! Bo msg, pld = await ctx._pld_rx.recv_msg_w_pld(
# from .devx._debug import pause ipc=ctx,
# await pause() expect_msg=Return,
raise_error=False,
# TODO: bad idea? )
# -[ ] wrap final outcome channel wait in a scope so # ^-TODO-^ some bad ideas?
# it can be cancelled out of band if needed? # -[ ] wrap final outcome .receive() in a scope so
# it can be cancelled out of band if needed?
# |_with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
# #
# with trio.CancelScope() as res_cs: # -[ ] make sure pause points work here for REPLing
# ctx._res_scope = res_cs # the runtime itself; i.e. ensure there's no hangs!
# msg: dict = await ctx._rx_chan.receive() # |_from tractor.devx._debug import pause
# if res_cs.cancelled_caught: # await pause()
# TODO: ensure there's no more hangs, debugging the
# runtime pretty preaase!
# from .devx._debug import pause
# await pause()
# TODO: can remove this finally?
# we have no more need for the sync draining right
# since we're can kinda guarantee the async
# `.receive()` below will never block yah?
#
# if (
# ctx._cancel_called and (
# ctx.cancel_acked
# # or ctx.chan._cancel_called
# )
# # or not ctx._final_result_is_set()
# # ctx.outcome is not
# # or ctx.chan._closed
# ):
# try:
# msg: dict = await ctx._rx_chan.receive_nowait()()
# except trio.WouldBlock:
# log.warning(
# 'When draining already `.cancel_called` ctx!\n'
# 'No final msg arrived..\n'
# )
# break
# else:
# msg: dict = await ctx._rx_chan.receive()
# TODO: don't need it right jefe?
# with trio.move_on_after(1) as cs:
# if cs.cancelled_caught:
# from .devx._debug import pause
# await pause()
# pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._rx_chan.receive()
msg, pld = await ctx._pld_rx.recv_msg_w_pld(ipc=ctx)
# NOTE: we get here if the far end was # NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases: # `ContextCancelled` in 2 cases:
@ -376,7 +528,7 @@ async def drain_to_final_msg(
# SHOULD NOT raise that far end error, # SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus # 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE! # SHOULD RAISE HERE!
except trio.Cancelled: except trio.Cancelled as taskc:
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
@ -386,7 +538,7 @@ async def drain_to_final_msg(
# CASE 1: we DID request the cancel we simply # CASE 1: we DID request the cancel we simply
# continue to bubble up as normal. # continue to bubble up as normal.
raise raise taskc
match msg: match msg:
@ -399,14 +551,14 @@ async def drain_to_final_msg(
ctx._result: Any = pld ctx._result: Any = pld
log.runtime( log.runtime(
'Context delivered final draining msg:\n' 'Context delivered final draining msg:\n'
f'{pformat(msg)}' f'{pretty_struct.pformat(msg)}'
) )
# XXX: only close the rx mem chan AFTER # XXX: only close the rx mem chan AFTER
# a final result is retreived. # a final result is retreived.
# if ctx._rx_chan: # if ctx._rx_chan:
# await ctx._rx_chan.aclose() # await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right? # TODO: ^ we don't need it right?
return_msg = msg result_msg = msg
break break
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
@ -435,12 +587,9 @@ async def drain_to_final_msg(
f'=> {ctx._task}\n' f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n' f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
)
return (
return_msg,
pre_result_drained,
) )
break
# drain up to the `msg_limit` hoping to get # drain up to the `msg_limit` hoping to get
# a final result or error/ctxc. # a final result or error/ctxc.
@ -452,7 +601,7 @@ async def drain_to_final_msg(
f'=> {ctx._task}\n' f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n' f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
continue continue
@ -467,7 +616,7 @@ async def drain_to_final_msg(
pre_result_drained.append(msg) pre_result_drained.append(msg)
log.cancel( log.cancel(
'Remote stream terminated due to "stop" msg:\n\n' 'Remote stream terminated due to "stop" msg:\n\n'
f'{pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
continue continue
@ -476,9 +625,9 @@ async def drain_to_final_msg(
case Error(): case Error():
# TODO: can we replace this with `ctx.maybe_raise()`? # TODO: can we replace this with `ctx.maybe_raise()`?
# -[ ] would this be handier for this case maybe? # -[ ] would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises: # |_async with maybe_raise_on_exit() as raises:
# if raises: # if raises:
# log.error('some msg about raising..') # log.error('some msg about raising..')
# #
re: Exception|None = ctx._remote_error re: Exception|None = ctx._remote_error
if re: if re:
@ -512,7 +661,7 @@ async def drain_to_final_msg(
# raise_overrun_from_self=False, # raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun, raise_overrun_from_self=raise_overrun,
) )
result_msg = msg
break # OOOOOF, yeah obvi we need this.. break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here # XXX we should never really get here
@ -558,6 +707,6 @@ async def drain_to_final_msg(
) )
return ( return (
return_msg, result_msg,
pre_result_drained, pre_result_drained,
) )

View File

@ -56,6 +56,7 @@ log = get_logger('tractor.msgspec')
PayloadT = TypeVar('PayloadT') PayloadT = TypeVar('PayloadT')
# TODO: PayloadMsg
class Msg( class Msg(
Struct, Struct,
Generic[PayloadT], Generic[PayloadT],
@ -81,7 +82,7 @@ class Msg(
tree. tree.
''' '''
cid: str|None # call/context-id cid: str # call/context-id
# ^-TODO-^: more explicit type? # ^-TODO-^: more explicit type?
# -[ ] use UNSET here? # -[ ] use UNSET here?
# https://jcristharif.com/msgspec/supported-types.html#unset # https://jcristharif.com/msgspec/supported-types.html#unset
@ -106,7 +107,7 @@ class Msg(
# TODO: could also be set to `msgspec.Raw` if the sub-decoders # TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization # approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below. # approach as take by `mk_msg_spec()` below.
pld: PayloadT|Raw pld: Raw
class Aid( class Aid(
@ -143,6 +144,8 @@ class SpawnSpec(
`Aid` msg. `Aid` msg.
''' '''
# TODO: similar to the `Start` kwargs spec needed below, we need
# a hard `Struct` def for all of these fields!
_parent_main_data: dict _parent_main_data: dict
_runtime_vars: dict[str, Any] _runtime_vars: dict[str, Any]