Compare commits

..

No commits in common. "343b7c971249b25480c6a59d8974856107e736b7" and "a5a0e6854b57875bcd6820ed7c58106f7bab55a1" have entirely different histories.

20 changed files with 1296 additions and 2200 deletions

View File

@ -1,68 +1,3 @@
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
# ------ - ------
[tool.poetry]
name = "tractor"
version = "0.1.0a6dev0"
description='structured concurrent `trio`-"actors"'
authors = ["Tyler Goodlet <goodboy_foss@protonmail.com>"]
license = "AGPlv3"
readme = "docs/README.rst"
# TODO: do we need this xontrib loader at all given pep420
# and xonsh's xontrib global-autoload-via-setuptools?
# https://xon.sh/tutorial_xontrib.html#authoring-xontribs
packages = [
{include = 'tractor' },
# {include = 'tractor.experimental' },
# {include = 'tractor.trionics' },
# {include = 'tractor.msg' },
# {include = 'tractor.devx' },
]
# ------ - ------
[tool.poetry.dependencies]
python = "^3.11"
# trio runtime related
# proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
trio='^0.24'
tricycle = "^0.4.1"
trio-typing = "^0.10.0"
msgspec='^0.18.5' # interchange
wrapt = "^1.16.0" # decorators
colorlog = "^6.8.2" # logging
# .devx tooling
stackscope = "^0.2.2"
pdbp = "^1.5.0"
# TODO: distributed transport using
# linux kernel networking
# 'pyroute2
# ------ - ------
xontrib-vox = "^0.0.1"
[tool.poetry.group.dev]
optional = false
[tool.poetry.group.dev.dependencies]
pytest = "^8.2.0"
# only for xonsh as sh..
xontrib-vox = "^0.0.1"
prompt-toolkit = "^3.0.43"
xonsh-vox-tabcomplete = "^0.5"
# ------ - ------
[tool.towncrier]
package = "tractor"
filename = "NEWS.rst"
@ -92,7 +27,6 @@ all_bullets = true
name = "Trivial/Internal Changes"
showcontent = true
# ------ - ------
[tool.pytest.ini_options]
minversion = '6.0'
@ -112,26 +46,3 @@ log_cli = false
# TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
# pythonpath = "src"
# ------ - ------
[project]
keywords = [
'trio',
'async',
'concurrency',
'structured concurrency',
'actor model',
'distributed',
'multiprocessing'
]
classifiers = [
"Development Status :: 3 - Alpha",
"Operating System :: POSIX :: Linux",
"Framework :: Trio",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Topic :: System :: Distributed Computing",
]

View File

@ -25,7 +25,6 @@ from tractor._exceptions import (
StreamOverrun,
ContextCancelled,
)
from tractor._state import current_ipc_ctx
from tractor._testing import (
tractor_test,
@ -145,8 +144,6 @@ async def simple_setup_teardown(
global _state
_state = True
assert current_ipc_ctx() is ctx
# signal to parent that we're up
await ctx.started(data + 1)
@ -207,7 +204,6 @@ def test_simple_context(
block_forever=callee_blocks_forever,
) as (ctx, sent),
):
assert current_ipc_ctx() is ctx
assert sent == 11
if callee_blocks_forever:

View File

@ -37,9 +37,8 @@ import inspect
from pprint import pformat
from typing import (
Any,
AsyncGenerator,
Callable,
Mapping,
AsyncGenerator,
Type,
TYPE_CHECKING,
Union,
@ -60,11 +59,9 @@ from ._exceptions import (
pack_from_raise,
unpack_error,
)
from .log import (
get_logger,
at_least_level,
)
from .log import get_logger
from .msg import (
_codec,
Error,
MsgType,
MsgCodec,
@ -87,7 +84,6 @@ from ._streaming import MsgStream
from ._state import (
current_actor,
debug_mode,
_ctxvar_Context,
)
if TYPE_CHECKING:
@ -107,6 +103,7 @@ class Unresolved:
a final return value or raised error is resolved.
'''
...
# TODO: make this a .msg.types.Struct!
@ -119,19 +116,19 @@ class Context:
NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways:
- by entering `Portal.open_context()` which is the primary
public API for any "parent" task or,
- by entering ``Portal.open_context()`` which is the primary
public API for any "caller" task or,
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
to a remotely scheduled "child" function.
to a remotely scheduled "callee" function.
AND is always constructed using the below `mk_context()`.
AND is always constructed using the below ``mk_context()``.
Allows maintaining task or protocol specific state between
2 cancel-scope-linked, communicating and parallel executing
`trio.Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is
always allocated inside `._rpc._invoke()`.
always allocated inside ``._rpc._invoke()``.
TODO: more detailed writeup on cancellation, error and
streaming semantics..
@ -209,7 +206,7 @@ class Context:
# cancelled that the other side is as well, so maybe we should
# instead just have a `.canceller` pulled from the
# `ContextCancelled`?
_canceller: tuple[str, str]|None = None
_canceller: tuple[str, str] | None = None
# NOTE: we try to ensure assignment of a "cancel msg" since
# there's always going to be an "underlying reason" that any
@ -265,13 +262,7 @@ class Context:
_strict_started: bool = False
_cancel_on_msgerr: bool = True
def pformat(
self,
extra_fields: dict[str, Any]|None = None,
# ^-TODO-^ some built-in extra state fields
# we'll want in some devx specific cases?
) -> str:
def __str__(self) -> str:
ds: str = '='
# ds: str = ': '
@ -288,7 +279,11 @@ class Context:
outcome_str: str = self.repr_outcome(
show_error_fields=True
)
fmtstr: str = (
outcome_typ_str: str = self.repr_outcome(
type_only=True
)
return (
f'<Context(\n'
# f'\n'
# f' ---\n'
@ -309,12 +304,12 @@ class Context:
# f' -----\n'
#
# TODO: better state `str`ids?
# -[x] maybe map err-types to strs like 'cancelled',
# -[ ] maybe map err-types to strs like 'cancelled',
# 'errored', 'streaming', 'started', .. etc.
# -[ ] as well as a final result wrapper like
# `outcome.Value`?
#
f' |_state: {self.repr_state!r}\n'
f' |_state: {outcome_typ_str}\n'
f' outcome{ds}{outcome_str}\n'
f' result{ds}{self._result}\n'
@ -329,16 +324,6 @@ class Context:
# -[ ] remove this ^ right?
# f' _remote_error={self._remote_error}
)
if extra_fields:
for key, val in extra_fields.items():
fmtstr += (
f' {key}{ds}{val!r}\n'
)
return (
fmtstr
+
')>\n'
)
# NOTE: making this return a value that can be passed to
@ -350,8 +335,7 @@ class Context:
# logging perspective over `eval()`-ability since we do NOT
# target serializing non-struct instances!
# def __repr__(self) -> str:
__str__ = pformat
__repr__ = pformat
__repr__ = __str__
@property
def cancel_called(self) -> bool:
@ -389,12 +373,8 @@ class Context:
re: BaseException|None = (
remote_error
or
self._remote_error
or self._remote_error
)
# XXX we only report "this context" as self-cancelled
# once we've received a ctxc from our direct-peer task
# (aka we're `.cancel_acked`).
if not re:
return False
@ -405,10 +385,10 @@ class Context:
our_canceller = self.canceller
return bool(
isinstance((ctxc := re), ContextCancelled)
isinstance(re, ContextCancelled)
and from_uid == self.chan.uid
and ctxc.canceller == our_uid
and our_canceller == our_uid
and re.canceller == our_uid
and our_canceller == from_uid
)
@property
@ -628,61 +608,52 @@ class Context:
)
self._remote_error: BaseException = error
msgerr: bool = False
# self-cancel (ack) or,
# peer propagated remote cancellation.
msgerr: bool = False
if isinstance(error, ContextCancelled):
# NOTE in the case error is a ctxc the canceller will
# either be another peer or us. in the case where it's us
# we mark ourself as the canceller of ourselves (a ctx
# "self cancel" from this side's perspective), if instead
# the far end was cancelled by some other (inter-) peer,
# we want to mark our canceller as the actor that was
# cancelled, NOT their reported canceller. IOW in the
# latter case we're cancelled by someone else getting
# cancelled.
if (canc := error.canceller) == self._actor.uid:
whom: str = 'us'
self._canceller = canc
else:
whom = 'a remote peer (not us)'
self._canceller = error.src_uid
whom: str = (
'us' if error.canceller == self._actor.uid
else 'peer'
)
log.cancel(
f'IPC context was cancelled by {whom}!\n\n'
f'IPC context cancelled by {whom}!\n\n'
f'{error}'
)
elif isinstance(error, MsgTypeError):
msgerr = True
self._canceller = error.src_uid
log.error(
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
f'{error}\n'
f'{pformat(self)}\n'
)
else:
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
maybe_error_src_uid: tuple = getattr(
error,
'src_uid',
None,
)
# we mark the source actor as our canceller
self._canceller = maybe_error_src_uid
log.error(
f'Remote context error:\n\n'
# f'{pformat(self)}\n'
f'{error}\n'
)
if self._canceller is None:
log.error('Ctx has no canceller set!?')
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
maybe_error_src: tuple = getattr(
error,
'src_uid',
None,
)
self._canceller = (
maybe_error_src
or
# XXX: in the case we get a non-boxed error?
# -> wait but this should never happen right?
self.chan.uid
)
# Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error
@ -725,35 +696,29 @@ class Context:
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
if (
cs
and
at_least_level(log=log, level='cancel')
):
fmt_str: str = self.pformat(
extra_fields={
'._is_self_cancelled()': self._is_self_cancelled(),
'._cancel_on_msgerr': self._cancel_on_msgerr,
}
)
from .devx.pformat import pformat_cs
cs_fmt: str = pformat_cs(
cs,
var_name='Context._scope',
)
fmt_str += (
'\n'
+
cs_fmt
scope_info: str = 'No `self._scope: CancelScope` was set/used ?'
if cs:
scope_info: str = (
f'self._scope: {cs}\n'
f'|_ .cancel_called: {cs.cancel_called}\n'
f'|_ .cancelled_caught: {cs.cancelled_caught}\n'
f'|_ ._cancel_status: {cs._cancel_status}\n\n'
f'{self}\n'
f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n'
f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n'
f'msgerr: {msgerr}\n'
)
log.cancel(
message
+
fmt_str
f'{scope_info}'
)
# TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs?
# TODO: also add to `Channel`?
# TODO: add to `Channel`?
@property
def dst_maddr(self) -> str:
chan: Channel = self.chan
@ -783,7 +748,7 @@ class Context:
)
return (
# f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:'
f'{self._nsf}() -> {outcome_str}'
f'{self._nsf}() -> {outcome_str}:'
)
@property
@ -871,7 +836,7 @@ class Context:
if not self._portal:
raise InternalError(
'No portal found!?\n'
'Why is this supposed {self.side!r}-side ctx task missing it?!?'
'Why is this supposed caller context missing it?'
)
cid: str = self.cid
@ -1126,8 +1091,7 @@ class Context:
f'ctx id: {self.cid}'
)
# TODO: replace all the `._maybe_raise_remote_err()` usage
# with instances of this!!
# TODO: replace all the instances of this!! XD
def maybe_raise(
self,
hide_tb: bool = True,
@ -1138,7 +1102,6 @@ class Context:
if re := self._remote_error:
return self._maybe_raise_remote_err(
re,
hide_tb=hide_tb,
**kwargs,
)
@ -1240,6 +1203,7 @@ class Context:
# runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True
raise remote_error # from None
# TODO: change to `.wait_for_result()`?
@ -1290,15 +1254,8 @@ class Context:
# wait for a final context result/error by "draining"
# (by more or less ignoring) any bi-dir-stream "yield"
# msgs still in transit from the far end.
#
# XXX NOTE XXX: this call shouldn't really ever raise
# (other then internal error), instead delivering an
# `Error`-msg and that being `.maybe_raise()`-ed below
# since every message should be delivered via the normal
# `._deliver_msg()` route which will appropriately set
# any `.maybe_error`.
(
outcome_msg,
return_msg,
drained_msgs,
) = await msgops.drain_to_final_msg(
ctx=self,
@ -1316,18 +1273,13 @@ class Context:
f'{msg}\n'
)
drained_status: str = (
'Ctx drained to final outcome msg\n\n'
f'{outcome_msg}\n'
)
if drained_msgs:
drained_status += (
'\n'
f'The pre-drained msgs are\n'
f'{pformat(drained_msgs)}\n'
)
log.cancel(
'Ctx drained pre-result msgs:\n'
f'{pformat(drained_msgs)}\n\n'
log.cancel(drained_status)
f'Final return msg:\n'
f'{return_msg}\n'
)
self.maybe_raise(
# NOTE: obvi we don't care if we
@ -1358,7 +1310,7 @@ class Context:
@property
def maybe_error(self) -> BaseException|None:
le: BaseException|None = self._local_error
le: Exception|None = self._local_error
re: RemoteActorError|ContextCancelled|None = self._remote_error
match (le, re):
@ -1386,7 +1338,7 @@ class Context:
# ContextCancelled(canceller=),
# ):
error: BaseException|None = le or re
error: Exception|None = le or re
if error:
return error
@ -1491,76 +1443,6 @@ class Context:
repr(self._result)
)
@property
def repr_state(self) -> str:
'''
A `str`-status describing the current state of this
inter-actor IPC context in terms of the current "phase" state
of the SC shuttling dialog protocol.
'''
merr: Exception|None = self.maybe_error
outcome: Unresolved|Exception|Any = self.outcome
status: str|None = None
match (
outcome,
merr,
):
# "graceful" ctx cancellation
case (
Unresolved,
ContextCancelled(),
):
if self._is_self_cancelled():
status = 'self-cancelled'
elif (
self.canceller
and not self._cancel_called
):
status = 'peer-cancelled'
# (remote) error condition
case (
Unresolved,
BaseException(), # any error-type
):
status = 'errored'
# result already returned
case (
_, # any non-unresolved value
None,
) if self._final_result_is_set():
status = 'returned'
# normal operation but still in a pre-`Return`-result
# dialog phase
case (
Unresolved, # noqa (ruff, you so weird..)
None, # no (remote) error set
):
if stream := self._stream:
if stream.closed:
status = 'streaming-finished'
else:
status = 'streaming'
elif self._started_called:
status = 'started'
else:
if self.side == 'child':
status = 'pre-started'
else:
status = 'syncing-to-child'
if status is None:
status = '??unknown??'
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
return status
async def started(
self,
@ -1569,11 +1451,7 @@ class Context:
value: PayloadT|None = None,
strict_parity: bool = False,
# TODO: this will always emit now that we do `.pld: Raw`
# passthrough.. so maybe just only complain when above strict
# flag is set?
complain_no_parity: bool = False,
complain_no_parity: bool = True,
) -> None:
'''
@ -1633,19 +1511,18 @@ class Context:
)
raise RuntimeError(
'Failed to roundtrip `Started` msg?\n'
f'{pretty_struct.pformat(rt_started)}\n'
f'{pformat(rt_started)}\n'
)
if rt_started != started_msg:
# TODO: break these methods out from the struct subtype?
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
rt_started,
started_msg,
)
complaint: str = (
'Started value does not match after roundtrip?\n\n'
'Started value does not match after codec rountrip?\n\n'
f'{diff}'
)
@ -1661,6 +1538,8 @@ class Context:
else:
log.warning(complaint)
# started_msg = rt_started
await self.chan.send(started_msg)
# raise any msg type error NO MATTER WHAT!
@ -1788,6 +1667,7 @@ class Context:
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
f'{flow_body}'
f'{pformat(re)}\n'
)
self._cancel_msg: dict = msg
@ -2052,7 +1932,6 @@ async def open_context_from_portal(
)
assert ctx._remote_func_type == 'context'
assert ctx._caller_info
_ctxvar_Context.set(ctx)
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
@ -2206,7 +2085,7 @@ async def open_context_from_portal(
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# finish silently.
# exit silently.
if (
ctx._cancel_called
and
@ -2331,11 +2210,6 @@ async def open_context_from_portal(
try:
result_or_err: Exception|Any = await ctx.result()
except BaseException as berr:
# cancelled before (or maybe during?) final result capture
# if isinstance(trio.Cancelled, berr):
# from .devx import mk_pdb
# mk_pdb.set_trace()
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
@ -2480,7 +2354,7 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and
# exit!
log.runtime(
f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'uid: {uid}\n'
f'cid: {ctx.cid}\n'
)
@ -2516,8 +2390,10 @@ def mk_context(
from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
# TODO: when/how do we apply `.limit_plds()` from here?
pld_rx: msgops.PldRx = msgops.current_pldrx()
pld_rx = msgops.PldRx(
# _rx_mc=recv_chan,
_msgdec=_codec.mk_dec(spec=pld_spec)
)
ctx = Context(
chan=chan,
@ -2531,12 +2407,12 @@ def mk_context(
_caller_info=caller_info,
**kwargs,
)
# TODO: we can drop the old placeholder yah?
# ctx._result: int | Any = id(ctx)
ctx._result = Unresolved
return ctx
# TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types
def context(func: Callable) -> Callable:
'''
Mark an (async) function as an SC-supervised, inter-`Actor`,
@ -2550,8 +2426,8 @@ def context(func: Callable) -> Callable:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore
sig: inspect.Signature = inspect.signature(func)
params: Mapping = sig.parameters
sig = inspect.signature(func)
params = sig.parameters
if 'ctx' not in params:
raise TypeError(
"The first argument to the context function "

View File

@ -20,7 +20,6 @@ Sub-process entry points.
"""
from __future__ import annotations
from functools import partial
# import textwrap
from typing import (
Any,
TYPE_CHECKING,
@ -92,7 +91,7 @@ def _mp_main(
pass # handle it the same way trio does?
finally:
log.info(f"Subactor {actor.uid} terminated")
log.info(f"Actor {actor.uid} terminated")
def _trio_main(
@ -126,11 +125,9 @@ def _trio_main(
f' loglevel: {actor.loglevel}\n'
)
log.info(
'Started new trio subactor:\n'
'Started new trio process:\n'
+
'>\n' # like a "started/play"-icon from super perspective
+
actor_info,
actor_info
)
try:
@ -149,9 +146,7 @@ def _trio_main(
finally:
log.info(
'Subactor terminated\n'
+
'x\n' # like a "crossed-out/killed" from super perspective
'Actor terminated\n'
+
actor_info
)

View File

@ -46,6 +46,7 @@ from tractor.msg import (
Error,
MsgType,
Stop,
Yield,
types as msgtypes,
MsgCodec,
MsgDec,
@ -139,6 +140,71 @@ def get_err_type(type_name: str) -> BaseException|None:
return type_ref
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
# orig
# f' |\n'
# f' ------ - ------\n\n'
# f'{tb_str}\n'
# f' ------ - ------\n'
# f' _|\n'
f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pack_from_raise(
local_err: (
ContextCancelled
@ -211,8 +277,6 @@ class RemoteActorError(Exception):
) -> None:
super().__init__(message)
# for manual display without having to muck with `Exception.args`
self._message: str = message
# TODO: maybe a better name?
# - .errtype
# - .retype
@ -440,61 +504,43 @@ class RemoteActorError(Exception):
reprol_str: str = (
f'{type(self).__name__}' # type name
f'[{self.boxed_type_str}]' # parameterized by boxed type
'(' # init-style look
)
_repr: str = self._mk_fields_str(
self.reprol_fields,
end_char=' ',
)
if _repr:
reprol_str += '(' # init-style call
return (
reprol_str
+
_repr
)
def pformat(self) -> str:
def __repr__(self) -> str:
'''
Nicely formatted boxed error meta data + traceback, OR just
the normal message from `.args` (for eg. as you'd want shown
by a locally raised `ContextCancelled`).
Nicely formatted boxed error meta data + traceback.
'''
tb_str: str = self.tb_str
if tb_str:
fields: str = self._mk_fields_str(
_body_fields
+
self.extra_body_fields,
)
from tractor.devx import (
pformat_boxed_tb,
)
body: str = pformat_boxed_tb(
tb_str=tb_str,
fields_str=fields,
field_prefix=' |_',
# ^- is so that it's placed like so,
# just after <Type(
# |___ ..
tb_body_indent=1,
)
else:
body: str = textwrap.indent(
self._message,
prefix=' ',
) + '\n'
fields: str = self._mk_fields_str(
_body_fields
+
self.extra_body_fields,
)
body: str = pformat_boxed_tb(
tb_str=self.tb_str,
fields_str=fields,
field_prefix=' |_',
# ^- is so that it's placed like so,
# just after <Type(
# |___ ..
tb_body_indent=1,
)
return (
f'<{type(self).__name__}(\n'
f'{body}'
')>'
)
__repr__ = pformat
__str__ = pformat
def unwrap(
self,
) -> BaseException:
@ -824,9 +870,12 @@ def pack_error(
def unpack_error(
msg: Error,
chan: Channel,
chan: Channel|None = None,
box_type: RemoteActorError = RemoteActorError,
hide_tb: bool = True,
) -> None|Exception:
'''
Unpack an 'error' message from the wire
@ -836,10 +885,12 @@ def unpack_error(
which is the responsibilitiy of the caller.
'''
__tracebackhide__: bool = hide_tb
if not isinstance(msg, Error):
return None
# retrieve the remote error's msg-encoded details
# retrieve the remote error's encoded details from fields
tb_str: str = msg.tb_str
message: str = (
f'{chan.uid}\n'
@ -868,6 +919,7 @@ def unpack_error(
# original source error.
elif boxed_type_str == 'RemoteActorError':
assert boxed_type is RemoteActorError
# assert len(error_dict['relay_path']) >= 1
assert len(msg.relay_path) >= 1
exc = box_type(
@ -952,6 +1004,8 @@ def _raise_from_unexpected_msg(
raise unpack_error(
msg,
ctx.chan,
hide_tb=hide_tb,
) from src_err
# `MsgStream` termination msg.
@ -1021,7 +1075,6 @@ def _mk_msg_type_err(
src_validation_error: ValidationError|None = None,
src_type_error: TypeError|None = None,
is_invalid_payload: bool = False,
) -> MsgTypeError:
'''
@ -1036,13 +1089,17 @@ def _mk_msg_type_err(
'`codec` must be a `MsgCodec` for send-side errors?'
)
from tractor.devx import (
pformat_caller_frame,
)
# no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part.
if message is None:
tb_fmt: str = pformat_caller_frame(stack_limit=3)
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
field_prefix=' ',
indent='',
)
message: str = (
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
@ -1079,57 +1136,47 @@ def _mk_msg_type_err(
# `Channel.recv()` case
else:
if is_invalid_payload:
msg_type: str = type(msg)
message: str = (
f'invalid `{msg_type.__qualname__}` payload\n\n'
f'<{type(msg).__qualname__}(\n'
f' |_pld: {codec.pld_spec_str} = {msg.pld!r}'
f')>\n'
)
else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
# XXX be "fancy" and see if we can determine the exact
# invalid field such that we can comprehensively report
# the specific field's type problem.
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += (
f'{msg.rstrip("`")}\n\n'
f'<{msg_type.__qualname__}(\n'
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
)
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
message: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
if verb_header:
message = f'{verb_header} ' + message
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += (
f'{msg.rstrip("`")}\n\n'
f'<{msg_type.__qualname__}(\n'
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
)
msgtyperr = MsgTypeError.from_decode(
message=message,
msgdict=msg_dict,

View File

@ -68,6 +68,40 @@ if TYPE_CHECKING:
log = get_logger(__name__)
# TODO: remove and/or rework?
# -[ ] rename to `unwrap_result()` and use
# `._raise_from_unexpected_msg()` (after tweak to accept a `chan:
# Channel` arg) in key block??
# -[ ] pretty sure this is entirely covered by
# `_exceptions._raise_from_unexpected_msg()` so REMOVE!
# def _unwrap_msg(
# msg: Return|Error,
# ctx: Context,
# hide_tb: bool = True,
# ) -> Any:
# '''
# Unwrap a final result from a `{return: <Any>}` IPC msg.
# '''
# __tracebackhide__: bool = hide_tb
# try:
# return msg.pld
# except AttributeError as err:
# # internal error should never get here
# # assert msg.get('cid'), (
# assert msg.cid, (
# "Received internal error at portal?"
# )
# raise unpack_error(
# msg,
# ctx.chan,
# ) from err
class Portal:
'''
A 'portal' to a memory-domain-separated `Actor`.
@ -139,13 +173,12 @@ class Portal:
portal=self,
)
# @api_frame
async def result(self) -> Any:
'''
Return the result(s) from the remote actor's "main" task.
'''
__tracebackhide__ = True
# __tracebackhide__ = True
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
@ -221,11 +254,11 @@ class Portal:
return False
reminfo: str = (
f'Portal.cancel_actor() => {self.channel.uid}\n'
f'|_{chan}\n'
f'`Portal.cancel_actor()` => {self.channel.uid}\n'
f' |_{chan}\n'
)
log.cancel(
f'Requesting runtime cancel for peer\n\n'
f'Sending runtime `.cancel()` request to peer\n\n'
f'{reminfo}'
)
@ -402,6 +435,7 @@ class Portal:
yield stream
finally:
# cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one
@ -462,7 +496,7 @@ class LocalPortal:
async def open_portal(
channel: Channel,
tn: trio.Nursery|None = None,
nursery: trio.Nursery|None = None,
start_msg_loop: bool = True,
shield: bool = False,
@ -470,19 +504,15 @@ async def open_portal(
'''
Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle RPC processing, normally
done by the actor-runtime implicitly via a call to
`._rpc.process_messages()`. just after connection establishment.
Spawns a background task to handle message processing (normally
done by the actor-runtime implicitly).
'''
actor = current_actor()
assert actor
was_connected: bool = False
async with maybe_open_nursery(
tn,
shield=shield,
) as tn:
async with maybe_open_nursery(nursery, shield=shield) as nursery:
if not channel.connected():
await channel.connect()
@ -494,7 +524,7 @@ async def open_portal(
msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop:
from ._runtime import process_messages
msg_loop_cs = await tn.start(
msg_loop_cs = await nursery.start(
partial(
process_messages,
actor,
@ -514,7 +544,7 @@ async def open_portal(
await channel.aclose()
# cancel background msg loop task
if msg_loop_cs is not None:
if msg_loop_cs:
msg_loop_cs.cancel()
tn.cancel_scope.cancel()
nursery.cancel_scope.cancel()

View File

@ -124,9 +124,8 @@ async def open_root_actor(
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`tractor` blocks built-in `breakpoint()` calls by default!\n'
'If you need to us it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)

View File

@ -57,7 +57,6 @@ from ._exceptions import (
from .devx import (
maybe_wait_for_debugger,
_debug,
add_div,
)
from . import _state
from .log import get_logger
@ -65,13 +64,11 @@ from .msg import (
current_codec,
MsgCodec,
NamespacePath,
pretty_struct,
)
from tractor.msg.types import (
CancelAck,
Error,
Msg,
MsgType,
Return,
Start,
StartAck,
@ -251,9 +248,6 @@ async def _errors_relayed_via_ipc(
) -> None:
__tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()`
try:
yield # run RPC invoke body
@ -277,8 +271,6 @@ async def _errors_relayed_via_ipc(
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
#
# -[ ] this if check is duplicate with `._maybe_enter_pm()`..
if not is_multi_cancelled(err):
entered_debug: bool = False
if (
@ -302,6 +294,7 @@ async def _errors_relayed_via_ipc(
)
)
):
# await _debug.pause()
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this
@ -309,14 +302,7 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
entered_debug = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
log.exception(
'RPC task crashed\n'
@ -446,8 +432,6 @@ async def _invoke(
)
context: bool = False
assert not _state._ctxvar_Context.get()
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
@ -571,7 +555,6 @@ async def _invoke(
async with trio.open_nursery() as tn:
ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx)
# TODO: should would be nice to have our
@ -607,6 +590,7 @@ async def _invoke(
cs: CancelScope = ctx._scope
if cs.cancel_called:
canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -635,9 +619,23 @@ async def _invoke(
else:
explain += 'a remote peer'
explain += (
add_div(message=explain)
# TODO: move this "div centering" into
# a helper for use elsewhere!
div_chars: str = '------ - ------'
div_offset: int = (
round(len(explain)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
explain += (
div_str +
f'<= canceller: {canceller}\n'
f'=> cancellee: {our_uid}\n'
# TODO: better repr for ctx tasks..
@ -664,10 +662,10 @@ async def _invoke(
boxed_type=trio.Cancelled,
canceller=canceller,
)
# does this matter other then for
# consistentcy/testing? |_ no user code should be
# in this scope at this point..
# ctx._local_error = ctxc
# assign local error so that the `.outcome`
# resolves to an error for both reporting and
# state checks.
ctx._local_error = ctxc
raise ctxc
# XXX: do we ever trigger this block any more?
@ -677,13 +675,6 @@ async def _invoke(
BaseException,
) as scope_error:
if (
isinstance(scope_error, RuntimeError)
and scope_error.args
and 'Cancel scope stack corrupted' in scope_error.args[0]
):
log.exception('Cancel scope stack corrupted!?\n')
# _debug.mk_pdb().set_trace()
# always set this (child) side's exception as the
# local error on the context
@ -717,32 +708,17 @@ async def _invoke(
res_type_str,
res_str,
) = (
('error', f'{type(merr)}',) if merr
('error', f'{type(merr)}',)
if merr
else (
'result',
f'`{repr(ctx.outcome)}`',
)
)
message: str = (
log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}'
)
if merr:
from tractor import RemoteActorError
if not isinstance(merr, RemoteActorError):
fmt_merr: str = (
f'\n{merr!r}\n'
# f'{merr.args[0]!r}\n'
)
else:
fmt_merr = f'\n{merr!r}'
log.error(
message
+
fmt_merr
)
else:
log.runtime(message)
async def try_ship_error_to_remote(
@ -798,10 +774,7 @@ async def process_messages(
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
) -> (
bool, # chan diconnected
MsgType, # last msg
):
) -> bool:
'''
This is the low-level, per-IPC-channel, RPC task scheduler loop.
@ -843,6 +816,11 @@ async def process_messages(
# |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
log.runtime(
'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
nursery_cancelled_before_task: bool = False
msg: Msg|None = None
try:
@ -856,15 +834,12 @@ async def process_messages(
async for msg in chan:
log.transport( # type: ignore
f'IPC msg from peer\n'
f'<= {chan.uid}\n\n'
f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: avoid fmting depending on loglevel for perf?
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this?
# -[ ] specifically `pformat()` sub-call..?
# -[ ] use `.msg.pretty_struct` here now instead!
# f'{pretty_struct.pformat(msg)}\n'
f'{msg}\n'
f'{pformat(msg)}\n'
)
match msg:
@ -977,19 +952,11 @@ async def process_messages(
kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid,
):
start_status: str = (
'Handling RPC `Start` request\n'
f'<= peer: {actorid}\n\n'
f' |_{chan}\n'
f' |_cid: {cid}\n\n'
# f' |_{ns}.{funcname}({kwargs})\n'
f'>> {actor.uid}\n'
f' |_{actor}\n'
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
# f' |_{ns}.{funcname}({kwargs})\n\n'
# f'{pretty_struct.pformat(msg)}\n'
log.runtime(
'Handling RPC `Start` request from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
# runtime-internal endpoint: `Actor.<funcname>`
@ -1018,10 +985,6 @@ async def process_messages(
await chan.send(err_msg)
continue
start_status += (
f' -> func: {func}\n'
)
# schedule a task for the requested RPC function
# in the actor's main "service nursery".
#
@ -1029,8 +992,18 @@ async def process_messages(
# supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks`
# table?
start_status += ' -> scheduling new task..\n'
log.runtime(start_status)
log.runtime(
f'Spawning task for RPC request\n'
f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n'
# ^-TODO-^ maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n'
f'=> {actor}\n'
f' |_cid: {cid}\n'
f' |>> {func}()\n'
)
try:
ctx: Context = await actor._service_n.start(
partial(
@ -1058,9 +1031,8 @@ async def process_messages(
# scoped exception from ``_invoke()`` itself.
if isinstance(err := ctx, Exception):
log.warning(
start_status
+
' -> task for RPC failed?\n\n'
'Task for RPC failed?'
f'|_ {func}()\n\n'
f'{err}'
)
continue
@ -1125,24 +1097,25 @@ async def process_messages(
parent_chan=chan,
)
except TransportClosed:
except (
TransportClosed,
):
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up..
#
# TODO: maybe add a teardown handshake? and,
# TODO: add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
# -[ ] figure out how this will break with other transports?
log.runtime(
f'IPC channel closed abruptly\n'
f'<=x peer: {chan.uid}\n'
f' |_{chan.raddr}\n'
f'channel closed abruptly with\n'
f'peer: {chan.uid}\n'
f'|_{chan.raddr}\n'
)
# transport **WAS** disconnected
return (True, msg)
return True
except (
Exception,
@ -1179,17 +1152,12 @@ async def process_messages(
finally:
# msg debugging for when he machinery is brokey
if msg is None:
message: str = 'Exiting IPC msg loop without receiving a msg?'
else:
message: str = (
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
f'{pretty_struct.pformat(msg)}'
)
log.runtime(message)
log.runtime(
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f'|_{chan}\n\n'
f'{pformat(msg)}\n\n'
)
# transport **WAS NOT** disconnected
return (False, msg)
return False

View File

@ -49,7 +49,6 @@ from pprint import pformat
import signal
import sys
from typing import (
Any,
Callable,
TYPE_CHECKING,
)
@ -69,7 +68,7 @@ from tractor.msg import (
pretty_struct,
NamespacePath,
types as msgtypes,
MsgType,
Msg,
)
from ._ipc import Channel
from ._context import (
@ -97,6 +96,19 @@ from ._rpc import (
process_messages,
try_ship_error_to_remote,
)
# from tractor.msg.types import (
# Aid,
# SpawnSpec,
# Start,
# StartAck,
# Started,
# Yield,
# Stop,
# Return,
# Error,
# )
if TYPE_CHECKING:
@ -303,32 +315,29 @@ class Actor:
self._reg_addrs = addrs
async def wait_for_peer(
self,
uid: tuple[str, str],
self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]:
'''
Wait for a connection back from a (spawned sub-)actor with
a `uid` using a `trio.Event` for sync.
Wait for a connection back from a spawned actor with a `uid`
using a `trio.Event` for sync.
'''
log.debug(f'Waiting for peer {uid!r} to connect')
log.runtime(f"Waiting for peer {uid} to connect")
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f'{uid!r} successfully connected back to us')
log.runtime(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]
def load_modules(
self,
# debug_mode: bool = False,
debug_mode: bool = False,
) -> None:
'''
Load explicitly enabled python modules from local fs after
process spawn.
Load enabled RPC py-modules locally (after process fork/spawn).
Since this actor may be spawned on a different machine from
the original nursery we need to try and load the local module
code manually (presuming it exists).
code (presuming it exists).
'''
try:
@ -341,21 +350,16 @@ class Actor:
_mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path'])
status: str = 'Attempting to import enabled modules:\n'
for modpath, filepath in self.enable_modules.items():
# XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath))
status += (
f'|_{modpath!r} -> {filepath!r}\n'
)
mod: ModuleType = importlib.import_module(modpath)
log.runtime(f"Attempting to import {modpath}@{filepath}")
mod = importlib.import_module(modpath)
self._mods[modpath] = mod
if modpath == '__main__':
self._mods['__mp_main__'] = mod
log.runtime(status)
except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error
# will be raised later
@ -409,23 +413,21 @@ class Actor:
chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid
con_status: str = ''
# TODO: remove this branch since can never happen?
# NOTE: `.uid` is only set after first contact
con_msg: str = ''
if their_uid:
con_status = (
'IPC Re-connection from already known peer?\n'
# NOTE: `.uid` is only set after first contact
con_msg = (
'IPC Re-connection from already known peer? '
)
else:
con_status = (
'New inbound IPC connection <=\n'
con_msg = (
'New IPC connection to us '
)
con_status += (
con_msg += (
f'<= @{chan.raddr}\n'
f'|_{chan}\n'
# f' |_@{chan.raddr}\n\n'
# ^-TODO-^ remove since alfready in chan.__repr__()?
)
# send/receive initial handshake response
try:
@ -445,13 +447,13 @@ class Actor:
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(
con_status
con_msg
+
' -> But failed to handshake? Ignoring..\n'
)
return
con_status += (
con_msg += (
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
)
# IPC connection tracking for both peers and new children:
@ -464,7 +466,7 @@ class Actor:
None,
)
if event:
con_status += (
con_msg += (
' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
@ -475,7 +477,7 @@ class Actor:
event.set()
else:
con_status += (
con_msg += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore
@ -489,18 +491,13 @@ class Actor:
# TODO: can we just use list-ref directly?
chans.append(chan)
con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
log.runtime(con_msg)
# Begin channel management - respond to remote requests and
# process received reponses.
disconnected: bool = False
last_msg: MsgType
try:
(
disconnected,
last_msg,
) = await process_messages(
disconnected: bool = await process_messages(
self,
chan,
)
@ -601,24 +598,16 @@ class Actor:
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry: tuple|None = local_nursery._children.get(uid)
entry = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and
poll() is None # proc still alive
and poll() is None
):
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
log.cancel(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
@ -627,17 +616,17 @@ class Actor:
# ``Channel`` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = (
f'IPC channel disconnected:\n'
f'<=x uid: {chan.uid}\n'
f' |_{pformat(chan)}\n\n'
log.runtime(
f'Disconnected IPC channel:\n'
f'uid: {chan.uid}\n'
f'|_{pformat(chan)}\n'
)
chans.remove(chan)
# TODO: do we need to be this pedantic?
if not chans:
con_teardown_status += (
f'-> No more channels with {chan.uid}'
log.runtime(
f'No more channels with {chan.uid}'
)
self._peers.pop(uid, None)
@ -651,16 +640,15 @@ class Actor:
f' |_[{i}] {pformat(chan)}\n'
)
con_teardown_status += (
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
log.runtime(
f'Remaining IPC {len(self._peers)} peers:\n'
+ peers_str
)
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
con_teardown_status += (
'Signalling no more peer channel connections'
)
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
@ -735,16 +723,13 @@ class Actor:
# TODO: figure out why this breaks tests..
db_cs.cancel()
log.runtime(con_teardown_status)
# finally block closure
# TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD
async def _deliver_ctx_payload(
self,
chan: Channel,
cid: str,
msg: MsgType|MsgTypeError,
msg: Msg|MsgTypeError,
) -> None|bool:
'''
@ -769,7 +754,7 @@ class Actor:
# XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n'
f'{pretty_struct.pformat(msg)}\n'
f'{pretty_struct.Struct.pformat(msg)}\n'
)
return
@ -911,11 +896,9 @@ class Actor:
cid=cid,
)
log.runtime(
'Sending RPC `Start`\n\n'
'Sending RPC start msg\n\n'
f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n\n'
f'{pretty_struct.pformat(msg)}'
f' |_ {ns}.{func}({kwargs})\n'
)
await chan.send(msg)
@ -972,29 +955,31 @@ class Actor:
if self._spawn_method == "trio":
# Receive post-spawn runtime state from our parent.
# Receive runtime state from our parent
# parent_data: dict[str, Any]
# parent_data = await chan.recv()
# TODO: maybe we should just wrap this directly
# in a `Actor.spawn_info: SpawnInfo` struct?
spawnspec: msgtypes.SpawnSpec = await chan.recv()
self._spawn_spec = spawnspec
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
log.runtime(
'Received runtime spec from parent:\n\n'
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n'
f'{pformat(spawnspec)}\n'
)
# accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
# TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
# rvs = parent_data.pop('_runtime_vars')
rvs = spawnspec._runtime_vars
if rvs['_debug_mode']:
try:
# TODO: maybe return some status msgs upward
# to that we can emit them in `con_status`
# instead?
log.devx(
log.info(
'Enabling `stackscope` traces on SIGUSR1'
)
from .devx import enable_stack_on_sig
@ -1004,6 +989,7 @@ class Actor:
'`stackscope` not installed for use in debug mode!'
)
log.runtime(f'Runtime vars are: {rvs}')
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
@ -1020,12 +1006,18 @@ class Actor:
for val in spawnspec.reg_addrs
]
# TODO: better then monkey patching..
# -[ ] maybe read the actual f#$-in `._spawn_spec` XD
# for attr, value in parent_data.items():
for _, attr, value in pretty_struct.iter_fields(
spawnspec,
):
setattr(self, attr, value)
# if (
# attr == 'reg_addrs'
# and value
# ):
# self.reg_addrs = [tuple(val) for val in value]
# else:
# setattr(self, attr, value)
return (
chan,
@ -1034,11 +1026,12 @@ class Actor:
except OSError: # failed to connect
log.warning(
f'Failed to connect to spawning parent actor!?\n'
f'x=> {parent_addr}\n'
f'Failed to connect to parent!?\n\n'
'Closing IPC [TCP] transport server to\n'
f'{parent_addr}\n'
f'|_{self}\n\n'
)
await self.cancel(req_chan=None) # self cancel
await self.cancel(chan=None) # self cancel
raise
async def _serve_forever(
@ -1116,7 +1109,8 @@ class Actor:
# chan whose lifetime limits the lifetime of its remotely
# requested and locally spawned RPC tasks - similar to the
# supervision semantics of a nursery wherein the actual
# implementation does start all such tasks in a sub-nursery.
# implementation does start all such tasks in
# a sub-nursery.
req_chan: Channel|None,
) -> bool:
@ -1157,7 +1151,7 @@ class Actor:
# other) repr fields instead of doing this all manual..
msg: str = (
f'Runtime cancel request from {requester_type}:\n\n'
f'<= .cancel(): {requesting_uid}\n\n'
f'<= .cancel(): {requesting_uid}\n'
)
# TODO: what happens here when we self-cancel tho?
@ -1172,8 +1166,8 @@ class Actor:
dbcs = _debug.DebugStatus.req_cs
if dbcs is not None:
msg += (
'-> Cancelling active debugger request..\n'
f'|_{_debug.Lock.pformat()}'
'>> Cancelling active debugger request..\n'
f'|_{_debug.Lock}\n'
)
dbcs.cancel()
@ -1424,12 +1418,7 @@ class Actor:
'''
if self._server_n:
# TODO: obvi a different server type when we eventually
# support some others XD
server_prot: str = 'TCP'
log.runtime(
f'Cancelling {server_prot} server'
)
log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel()
return True
@ -1613,7 +1602,6 @@ async def async_main(
assert accept_addrs
try:
# TODO: why is this not with the root nursery?
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
@ -1898,13 +1886,13 @@ class Arbiter(Actor):
sockaddrs: list[tuple[str, int]] = []
sockaddr: tuple[str, int]
mailbox_info: str = 'Actor registry contact infos:\n'
for uid, sockaddr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_sockaddr: {sockaddr}\n\n'
for (aname, _), sockaddr in self._registry.items():
log.runtime(
f'Actor mailbox info:\n'
f'aname: {aname}\n'
f'sockaddr: {sockaddr}\n'
)
if name == uid[0]:
if name == aname:
sockaddrs.append(sockaddr)
if not sockaddrs:
@ -1916,7 +1904,6 @@ class Arbiter(Actor):
if not isinstance(uid, trio.Event):
sockaddrs.append(self._registry[uid])
log.runtime(mailbox_info)
return sockaddrs
async def register_actor(

View File

@ -451,9 +451,10 @@ async def trio_proc(
proc: trio.Process|None = None
try:
try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd)
# TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process(spawn_cmd)
log.runtime(
'Started new child\n'
'Started new sub-proc\n'
f'|_{proc}\n'
)

View File

@ -19,19 +19,13 @@ Per process state
"""
from __future__ import annotations
from contextvars import (
ContextVar,
)
from typing import (
Any,
TYPE_CHECKING,
)
from trio.lowlevel import current_task
if TYPE_CHECKING:
from ._runtime import Actor
from ._context import Context
_current_actor: Actor|None = None # type: ignore # noqa
@ -116,20 +110,3 @@ def debug_mode() -> bool:
def is_root_process() -> bool:
return _runtime_vars['_is_root']
_ctxvar_Context: ContextVar[Context] = ContextVar(
'ipc_context',
default=None,
)
def current_ipc_ctx() -> Context:
ctx: Context = _ctxvar_Context.get()
if not ctx:
from ._exceptions import InternalError
raise InternalError(
'No IPC context has been allocated for this task yet?\n'
f'|_{current_task()}\n'
)
return ctx

View File

@ -364,10 +364,14 @@ class MsgStream(trio.abc.Channel):
if not self._eoc:
message: str = (
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
f'Context stream closed by {self._ctx.side!r}\n'
f'|_{self}\n'
)
log.cancel(message)
log.cancel(
'Stream self-closed before receiving EoC\n\n'
+
message
)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?

View File

@ -30,13 +30,7 @@ from ._debug import (
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem,
mk_pdb as mk_pdb,
)
from ._stackscope import (
enable_stack_on_sig as enable_stack_on_sig,
)
from .pformat import (
add_div as add_div,
pformat_caller_frame as pformat_caller_frame,
pformat_boxed_tb as pformat_boxed_tb,
)

View File

@ -23,8 +23,6 @@ from __future__ import annotations
import inspect
# import msgspec
# from pprint import pformat
import textwrap
import traceback
from types import (
FrameType,
FunctionType,
@ -177,103 +175,3 @@ def find_caller_info(
)
return None
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
'''
Create a "boxed" looking traceback string.
Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
Any other parent/container "fields" can be passed in the
`fields_str` input along with other prefix/indent settings.
'''
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
# orig
# f' |\n'
# f' ------ - ------\n\n'
# f'{tb_str}\n'
# f' ------ - ------\n'
# f' _|\n'
f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str

File diff suppressed because it is too large Load Diff

View File

@ -65,7 +65,7 @@ def dump_task_tree() -> None:
level='cancel',
)
actor: Actor = _state.current_actor()
log.devx(
log.pdb(
f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
@ -104,7 +104,7 @@ def signal_handler(
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.devx(
log.pdb(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'

View File

@ -1,168 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Pretty formatters for use throughout the code base.
Mostly handy for logging and exception message content.
'''
import textwrap
import traceback
from trio import CancelScope
def add_div(
message: str,
div_str: str = '------ - ------',
) -> str:
'''
Add a "divider string" to the input `message` with
a little math to center it underneath.
'''
div_offset: int = (
round(len(message)/2)+1
-
round(len(div_str)/2)+1
)
div_str: str = (
'\n' + ' '*div_offset + f'{div_str}\n'
)
return div_str
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
'''
Create a "boxed" looking traceback string.
Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
Any other parent/container "fields" can be passed in the
`fields_str` input along with other prefix/indent settings.
'''
if (
fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
tb_box: str = (
f'|\n'
f' ------ - ------\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str
def pformat_cs(
cs: CancelScope,
var_name: str = 'cs',
field_prefix: str = ' |_',
) -> str:
'''
Pretty format info about a `trio.CancelScope` including most
of its public state and `._cancel_status`.
The output can be modified to show a "var name" for the
instance as a field prefix, just a simple str before each
line more or less.
'''
fields: str = textwrap.indent(
(
f'cancel_called = {cs.cancel_called}\n'
f'cancelled_caught = {cs.cancelled_caught}\n'
f'_cancel_status = {cs._cancel_status}\n'
f'shield = {cs.shield}\n'
),
prefix=field_prefix,
)
return (
f'{var_name}: {cs}\n'
+
fields
)

View File

@ -21,11 +21,6 @@ Log like a forester!
from collections.abc import Mapping
import sys
import logging
from logging import (
LoggerAdapter,
Logger,
StreamHandler,
)
import colorlog # type: ignore
import trio
@ -58,7 +53,6 @@ LEVELS: dict[str, int] = {
'RUNTIME': 15,
'CANCEL': 16,
'PDB': 500,
'DEVX': 600,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
@ -68,7 +62,6 @@ STD_PALETTE = {
'CRITICAL': 'red',
'ERROR': 'red',
'PDB': 'white',
'DEVX': 'cyan',
'WARNING': 'yellow',
'INFO': 'green',
'CANCEL': 'yellow',
@ -85,7 +78,7 @@ BOLD_PALETTE = {
# TODO: this isn't showing the correct '{filename}'
# as it did before..
class StackLevelAdapter(LoggerAdapter):
class StackLevelAdapter(logging.LoggerAdapter):
def transport(
self,
@ -93,8 +86,7 @@ class StackLevelAdapter(LoggerAdapter):
) -> None:
'''
IPC transport level msg IO; generally anything below
`._ipc.Channel` and friends.
IPC level msg-ing.
'''
return self.log(5, msg)
@ -110,7 +102,7 @@ class StackLevelAdapter(LoggerAdapter):
msg: str,
) -> None:
'''
Cancellation sequencing, mostly for runtime reporting.
Cancellation logging, mostly for runtime reporting.
'''
return self.log(
@ -124,21 +116,11 @@ class StackLevelAdapter(LoggerAdapter):
msg: str,
) -> None:
'''
`pdb`-REPL (debugger) related statuses.
Debugger logging.
'''
return self.log(500, msg)
def devx(
self,
msg: str,
) -> None:
'''
"Developer experience" sub-sys statuses.
'''
return self.log(600, msg)
def log(
self,
level,
@ -242,7 +224,6 @@ def get_logger(
'''Return the package log or a sub-logger for ``name`` if provided.
'''
log: Logger
log = rlog = logging.getLogger(_root_name)
if (
@ -297,7 +278,7 @@ def get_logger(
def get_console_log(
level: str | None = None,
**kwargs,
) -> LoggerAdapter:
) -> logging.LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it.
@ -322,7 +303,7 @@ def get_console_log(
None,
)
):
handler = StreamHandler()
handler = logging.StreamHandler()
formatter = colorlog.ColoredFormatter(
LOG_FORMAT,
datefmt=DATE_FORMAT,
@ -342,19 +323,3 @@ def get_loglevel() -> str:
# global module logger for tractor itself
log = get_logger('tractor')
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False

View File

@ -25,12 +25,12 @@ from contextlib import (
# asynccontextmanager as acm,
contextmanager as cm,
)
from contextvars import ContextVar
from pprint import pformat
from typing import (
Any,
Type,
TYPE_CHECKING,
Union,
# Union,
)
# ------ - ------
from msgspec import (
@ -63,7 +63,7 @@ from .types import (
Started,
Stop,
Yield,
pretty_struct,
# pretty_struct,
)
@ -75,9 +75,6 @@ if TYPE_CHECKING:
log = get_logger(__name__)
_def_any_pldec: MsgDec = mk_dec()
class PldRx(Struct):
'''
A "msg payload receiver".
@ -104,12 +101,9 @@ class PldRx(Struct):
'''
# TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel
_pldec: MsgDec
_ipc: Context|MsgStream|None = None
_msgdec: MsgDec = mk_dec(spec=Any)
@property
def pld_dec(self) -> MsgDec:
return self._pldec
_ipc: Context|MsgStream|None = None
@cm
def apply_to_ipc(
@ -128,29 +122,9 @@ class PldRx(Struct):
finally:
self._ipc = None
@cm
def limit_plds(
self,
spec: Union[Type[Struct]],
) -> MsgDec:
'''
Type-limit the loadable msg payloads via an applied
`MsgDec` given an input spec, revert to prior decoder on
exit.
'''
orig_dec: MsgDec = self._pldec
limit_dec: MsgDec = mk_dec(spec=spec)
try:
self._pldec = limit_dec
yield limit_dec
finally:
self._pldec = orig_dec
@property
def dec(self) -> msgpack.Decoder:
return self._pldec.dec
return self._msgdec.dec
def recv_pld_nowait(
self,
@ -161,10 +135,9 @@ class PldRx(Struct):
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
**dec_msg_kwargs,
**kwargs,
) -> Any|Raw:
__tracebackhide__: bool = True
msg: MsgType = (
ipc_msg
@ -177,7 +150,6 @@ class PldRx(Struct):
msg,
ctx=ctx,
expect_msg=expect_msg,
**dec_msg_kwargs,
)
async def recv_pld(
@ -185,16 +157,14 @@ class PldRx(Struct):
ctx: Context,
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
**dec_msg_kwargs,
**kwargs
) -> Any|Raw:
'''
Receive a `MsgType`, then decode and return its `.pld` field.
'''
__tracebackhide__: bool = hide_tb
msg: MsgType = (
ipc_msg
or
@ -203,20 +173,16 @@ class PldRx(Struct):
await ctx._rx_chan.receive()
)
return self.dec_msg(
msg=msg,
msg,
ctx=ctx,
expect_msg=expect_msg,
**dec_msg_kwargs,
)
def dec_msg(
self,
msg: MsgType,
ctx: Context,
expect_msg: Type[MsgType]|None,
raise_error: bool = True,
hide_tb: bool = True,
expect_msg: Type[MsgType]|None = None,
) -> PayloadT|Raw:
'''
@ -224,7 +190,6 @@ class PldRx(Struct):
return the value or raise an appropriate error.
'''
__tracebackhide__: bool = hide_tb
match msg:
# payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code.
@ -234,12 +199,11 @@ class PldRx(Struct):
|Return(pld=pld) # termination phase
):
try:
pld: PayloadT = self._pldec.decode(pld)
pld: PayloadT = self._msgdec.decode(pld)
log.runtime(
'Decoded msg payload\n\n'
'Decode msg payload\n\n'
f'{msg}\n\n'
f'where payload is\n'
f'|_pld={pld!r}\n'
f'{pld}\n'
)
return pld
@ -247,9 +211,8 @@ class PldRx(Struct):
except ValidationError as src_err:
msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg,
codec=self.pld_dec,
codec=self._dec,
src_validation_error=src_err,
is_invalid_payload=True,
)
msg: Error = pack_from_raise(
local_err=msgterr,
@ -274,62 +237,8 @@ class PldRx(Struct):
case Error():
src_err = MessagingError(
'IPC ctx dialog terminated without `Return`-ing a result\n'
f'Instead it raised {msg.boxed_type_str!r}!'
'IPC dialog termination by msg'
)
# XXX NOTE XXX another super subtle runtime-y thing..
#
# - when user code (transitively) calls into this
# func (usually via a `Context/MsgStream` API) we
# generally want errors to propagate immediately
# and directly so that the user can define how it
# wants to handle them.
#
# HOWEVER,
#
# - for certain runtime calling cases, we don't want to
# directly raise since the calling code might have
# special logic around whether to raise the error
# or supress it silently (eg. a `ContextCancelled`
# received from the far end which was requested by
# this side, aka a self-cancel).
#
# SO, we offer a flag to control this.
if not raise_error:
return src_err
case Stop(cid=cid):
message: str = (
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
f'{ctx.peer_side!r} peer ?\n'
f'|_cid: {cid}\n\n'
f'{pretty_struct.pformat(msg)}\n'
)
if ctx._stream is None:
explain: str = (
f'BUT, no `MsgStream` (was) open(ed) on this '
f'{ctx.side!r}-side of the IPC ctx?\n'
f'Maybe check your code for streaming phase race conditions?\n'
)
log.warning(
message
+
explain
)
# let caller decide what to do when only one
# side opened a stream, don't raise.
return msg
else:
explain: str = (
'Received a `Stop` when it should NEVER be possible!?!?\n'
)
# TODO: this is constructed inside
# `_raise_from_unexpected_msg()` but maybe we
# should pass it in?
# src_err = trio.EndOfChannel(explain)
src_err = None
case _:
src_err = InternalError(
@ -337,9 +246,6 @@ class PldRx(Struct):
f'{msg}\n'
)
# TODO: maybe use the new `.add_note()` from 3.11?
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
#
# fallthrough and raise from `src_err`
_raise_from_unexpected_msg(
ctx=ctx,
@ -347,18 +253,12 @@ class PldRx(Struct):
src_err=src_err,
log=log,
expect_msg=expect_msg,
hide_tb=hide_tb,
hide_tb=False,
)
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
expect_msg: MsgType,
# NOTE: generally speaking only for handling `Stop`-msgs that
# arrive during a call to `drain_to_final_msg()` above!
passthrough_non_pld_msgs: bool = True,
**kwargs,
) -> tuple[MsgType, PayloadT]:
'''
@ -368,102 +268,16 @@ class PldRx(Struct):
'''
msg: MsgType = await ipc._rx_chan.receive()
if passthrough_non_pld_msgs:
match msg:
case Stop():
return msg, None
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.dec_msg(
msg,
ctx=ipc,
expect_msg=expect_msg,
**kwargs,
)
return msg, pld
# Always maintain a task-context-global `PldRx`
_def_pld_rx: PldRx = PldRx(
_pldec=_def_any_pldec,
)
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
'pld_rx',
default=_def_pld_rx,
)
def current_pldrx() -> PldRx:
'''
Return the current `trio.Task.context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
up from `tractor`'s transport layer but BEFORE the data is
yielded to application code, normally via an IPC primitive API
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by
the interchange backend.
- for `msgspec` see <PUTLINKHERE>.
NOTE that the `PldRx` itself is a per-`Context` global sub-system
that normally does not change other then the applied pld-spec
for the current `trio.Task`.
'''
# ctx: context = current_ipc_ctx()
# return ctx._pld_rx
return _ctxvar_PldRx.get()
@cm
def limit_plds(
spec: Union[Type[Struct]],
**kwargs,
) -> MsgDec:
'''
Apply a `MsgCodec` that will natively decode the SC-msg set's
`Msg.pld: Union[Type[Struct]]` payload fields using
tagged-unions of `msgspec.Struct`s from the `payload_types`
for all IPC contexts in use by the current `trio.Task`.
'''
__tracebackhide__: bool = True
try:
# sanity on orig settings
orig_pldrx: PldRx = current_pldrx()
orig_pldec: MsgDec = orig_pldrx.pld_dec
with orig_pldrx.limit_plds(
spec=spec,
**kwargs,
) as pldec:
log.info(
'Applying payload-decoder\n\n'
f'{pldec}\n'
)
yield pldec
finally:
log.info(
'Reverted to previous payload-decoder\n\n'
f'{orig_pldec}\n'
)
assert (
(pldrx := current_pldrx()) is orig_pldrx
and
pldrx.pld_dec is orig_pldec
)
async def drain_to_final_msg(
ctx: Context,
@ -494,33 +308,67 @@ async def drain_to_final_msg(
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
pre_result_drained: list[MsgType] = []
result_msg: Return|Error|None = None
return_msg: Return|None = None
while not (
ctx.maybe_error
and not ctx._final_result_is_set()
):
try:
# receive all msgs, scanning for either a final result
# or error; the underlying call should never raise any
# remote error directly!
msg, pld = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx,
expect_msg=Return,
raise_error=False,
)
# ^-TODO-^ some bad ideas?
# -[ ] wrap final outcome .receive() in a scope so
# it can be cancelled out of band if needed?
# |_with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
#
# -[ ] make sure pause points work here for REPLing
# the runtime itself; i.e. ensure there's no hangs!
# |_from tractor.devx._debug import pause
# await pause()
# TODO: can remove?
# await trio.lowlevel.checkpoint()
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# TODO: bad idea?
# -[ ] wrap final outcome channel wait in a scope so
# it can be cancelled out of band if needed?
#
# with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
# TODO: ensure there's no more hangs, debugging the
# runtime pretty preaase!
# from .devx._debug import pause
# await pause()
# TODO: can remove this finally?
# we have no more need for the sync draining right
# since we're can kinda guarantee the async
# `.receive()` below will never block yah?
#
# if (
# ctx._cancel_called and (
# ctx.cancel_acked
# # or ctx.chan._cancel_called
# )
# # or not ctx._final_result_is_set()
# # ctx.outcome is not
# # or ctx.chan._closed
# ):
# try:
# msg: dict = await ctx._rx_chan.receive_nowait()()
# except trio.WouldBlock:
# log.warning(
# 'When draining already `.cancel_called` ctx!\n'
# 'No final msg arrived..\n'
# )
# break
# else:
# msg: dict = await ctx._rx_chan.receive()
# TODO: don't need it right jefe?
# with trio.move_on_after(1) as cs:
# if cs.cancelled_caught:
# from .devx._debug import pause
# await pause()
# pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._rx_chan.receive()
msg, pld = await ctx._pld_rx.recv_msg_w_pld(ipc=ctx)
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
@ -528,7 +376,7 @@ async def drain_to_final_msg(
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled as taskc:
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
@ -538,7 +386,7 @@ async def drain_to_final_msg(
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise taskc
raise
match msg:
@ -551,14 +399,14 @@ async def drain_to_final_msg(
ctx._result: Any = pld
log.runtime(
'Context delivered final draining msg:\n'
f'{pretty_struct.pformat(msg)}'
f'{pformat(msg)}'
)
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._rx_chan:
# await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right?
result_msg = msg
return_msg = msg
break
# far end task is still streaming to us so discard
@ -587,9 +435,12 @@ async def drain_to_final_msg(
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pretty_struct.pformat(msg)}\n'
f'{pformat(msg)}\n'
)
return (
return_msg,
pre_result_drained,
)
break
# drain up to the `msg_limit` hoping to get
# a final result or error/ctxc.
@ -601,7 +452,7 @@ async def drain_to_final_msg(
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pretty_struct.pformat(msg)}\n'
f'{pformat(msg)}\n'
)
continue
@ -616,7 +467,7 @@ async def drain_to_final_msg(
pre_result_drained.append(msg)
log.cancel(
'Remote stream terminated due to "stop" msg:\n\n'
f'{pretty_struct.pformat(msg)}\n'
f'{pformat(msg)}\n'
)
continue
@ -625,9 +476,9 @@ async def drain_to_final_msg(
case Error():
# TODO: can we replace this with `ctx.maybe_raise()`?
# -[ ] would this be handier for this case maybe?
# |_async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
#
re: Exception|None = ctx._remote_error
if re:
@ -661,7 +512,7 @@ async def drain_to_final_msg(
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
result_msg = msg
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
@ -707,6 +558,6 @@ async def drain_to_final_msg(
)
return (
result_msg,
return_msg,
pre_result_drained,
)

View File

@ -56,7 +56,6 @@ log = get_logger('tractor.msgspec')
PayloadT = TypeVar('PayloadT')
# TODO: PayloadMsg
class Msg(
Struct,
Generic[PayloadT],
@ -82,7 +81,7 @@ class Msg(
tree.
'''
cid: str # call/context-id
cid: str|None # call/context-id
# ^-TODO-^: more explicit type?
# -[ ] use UNSET here?
# https://jcristharif.com/msgspec/supported-types.html#unset
@ -107,7 +106,7 @@ class Msg(
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
# approach is preferred over the generic parameterization
# approach as take by `mk_msg_spec()` below.
pld: Raw
pld: PayloadT|Raw
class Aid(
@ -144,8 +143,6 @@ class SpawnSpec(
`Aid` msg.
'''
# TODO: similar to the `Start` kwargs spec needed below, we need
# a hard `Struct` def for all of these fields!
_parent_main_data: dict
_runtime_vars: dict[str, Any]