Compare commits
23 Commits
a5a0e6854b
...
343b7c9712
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 343b7c9712 | |
Tyler Goodlet | 45f37870af | |
Tyler Goodlet | 4d528b76a0 | |
Tyler Goodlet | 05b143d9ef | |
Tyler Goodlet | a354732a9e | |
Tyler Goodlet | fbc21a1dec | |
Tyler Goodlet | b278164f83 | |
Tyler Goodlet | 8ffa6a5e68 | |
Tyler Goodlet | 7707e0e75a | |
Tyler Goodlet | 523c24eb72 | |
Tyler Goodlet | 544ff5ab4c | |
Tyler Goodlet | 63c23d6b82 | |
Tyler Goodlet | cca3206fd6 | |
Tyler Goodlet | 54530dcf94 | |
Tyler Goodlet | 338395346d | |
Tyler Goodlet | 30c5896d26 | |
Tyler Goodlet | 88a0e90f82 | |
Tyler Goodlet | 40c972f0ec | |
Tyler Goodlet | f139adddca | |
Tyler Goodlet | 979af79588 | |
Tyler Goodlet | a3429268ea | |
Tyler Goodlet | d285a3479a | |
Tyler Goodlet | 61db040702 |
|
@ -1,3 +1,68 @@
|
|||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
# ------ - ------
|
||||
|
||||
[tool.poetry]
|
||||
name = "tractor"
|
||||
version = "0.1.0a6dev0"
|
||||
description='structured concurrent `trio`-"actors"'
|
||||
authors = ["Tyler Goodlet <goodboy_foss@protonmail.com>"]
|
||||
license = "AGPlv3"
|
||||
readme = "docs/README.rst"
|
||||
|
||||
# TODO: do we need this xontrib loader at all given pep420
|
||||
# and xonsh's xontrib global-autoload-via-setuptools?
|
||||
# https://xon.sh/tutorial_xontrib.html#authoring-xontribs
|
||||
packages = [
|
||||
{include = 'tractor' },
|
||||
# {include = 'tractor.experimental' },
|
||||
# {include = 'tractor.trionics' },
|
||||
# {include = 'tractor.msg' },
|
||||
# {include = 'tractor.devx' },
|
||||
]
|
||||
|
||||
# ------ - ------
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11"
|
||||
|
||||
# trio runtime related
|
||||
# proper range spec:
|
||||
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
|
||||
trio='^0.24'
|
||||
tricycle = "^0.4.1"
|
||||
trio-typing = "^0.10.0"
|
||||
|
||||
msgspec='^0.18.5' # interchange
|
||||
wrapt = "^1.16.0" # decorators
|
||||
colorlog = "^6.8.2" # logging
|
||||
|
||||
# .devx tooling
|
||||
stackscope = "^0.2.2"
|
||||
pdbp = "^1.5.0"
|
||||
|
||||
|
||||
# TODO: distributed transport using
|
||||
# linux kernel networking
|
||||
# 'pyroute2
|
||||
|
||||
# ------ - ------
|
||||
xontrib-vox = "^0.0.1"
|
||||
|
||||
[tool.poetry.group.dev]
|
||||
optional = false
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pytest = "^8.2.0"
|
||||
|
||||
# only for xonsh as sh..
|
||||
xontrib-vox = "^0.0.1"
|
||||
prompt-toolkit = "^3.0.43"
|
||||
xonsh-vox-tabcomplete = "^0.5"
|
||||
|
||||
# ------ - ------
|
||||
|
||||
[tool.towncrier]
|
||||
package = "tractor"
|
||||
filename = "NEWS.rst"
|
||||
|
@ -27,6 +92,7 @@ all_bullets = true
|
|||
name = "Trivial/Internal Changes"
|
||||
showcontent = true
|
||||
|
||||
# ------ - ------
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
minversion = '6.0'
|
||||
|
@ -46,3 +112,26 @@ log_cli = false
|
|||
# TODO: maybe some of these layout choices?
|
||||
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
|
||||
# pythonpath = "src"
|
||||
|
||||
# ------ - ------
|
||||
|
||||
[project]
|
||||
keywords = [
|
||||
'trio',
|
||||
'async',
|
||||
'concurrency',
|
||||
'structured concurrency',
|
||||
'actor model',
|
||||
'distributed',
|
||||
'multiprocessing'
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Operating System :: POSIX :: Linux",
|
||||
"Framework :: Trio",
|
||||
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
|
||||
"Programming Language :: Python :: Implementation :: CPython",
|
||||
"Programming Language :: Python :: 3 :: Only",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Topic :: System :: Distributed Computing",
|
||||
]
|
||||
|
|
|
@ -25,6 +25,7 @@ from tractor._exceptions import (
|
|||
StreamOverrun,
|
||||
ContextCancelled,
|
||||
)
|
||||
from tractor._state import current_ipc_ctx
|
||||
|
||||
from tractor._testing import (
|
||||
tractor_test,
|
||||
|
@ -144,6 +145,8 @@ async def simple_setup_teardown(
|
|||
global _state
|
||||
_state = True
|
||||
|
||||
assert current_ipc_ctx() is ctx
|
||||
|
||||
# signal to parent that we're up
|
||||
await ctx.started(data + 1)
|
||||
|
||||
|
@ -204,6 +207,7 @@ def test_simple_context(
|
|||
block_forever=callee_blocks_forever,
|
||||
) as (ctx, sent),
|
||||
):
|
||||
assert current_ipc_ctx() is ctx
|
||||
assert sent == 11
|
||||
|
||||
if callee_blocks_forever:
|
||||
|
|
|
@ -37,8 +37,9 @@ import inspect
|
|||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
AsyncGenerator,
|
||||
Callable,
|
||||
Mapping,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
Union,
|
||||
|
@ -59,9 +60,11 @@ from ._exceptions import (
|
|||
pack_from_raise,
|
||||
unpack_error,
|
||||
)
|
||||
from .log import get_logger
|
||||
from .log import (
|
||||
get_logger,
|
||||
at_least_level,
|
||||
)
|
||||
from .msg import (
|
||||
_codec,
|
||||
Error,
|
||||
MsgType,
|
||||
MsgCodec,
|
||||
|
@ -84,6 +87,7 @@ from ._streaming import MsgStream
|
|||
from ._state import (
|
||||
current_actor,
|
||||
debug_mode,
|
||||
_ctxvar_Context,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -103,7 +107,6 @@ class Unresolved:
|
|||
a final return value or raised error is resolved.
|
||||
|
||||
'''
|
||||
...
|
||||
|
||||
|
||||
# TODO: make this a .msg.types.Struct!
|
||||
|
@ -116,19 +119,19 @@ class Context:
|
|||
|
||||
NB: This class should **never be instatiated directly**, it is allocated
|
||||
by the runtime in 2 ways:
|
||||
- by entering ``Portal.open_context()`` which is the primary
|
||||
public API for any "caller" task or,
|
||||
- by entering `Portal.open_context()` which is the primary
|
||||
public API for any "parent" task or,
|
||||
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
|
||||
to a remotely scheduled "callee" function.
|
||||
to a remotely scheduled "child" function.
|
||||
|
||||
AND is always constructed using the below ``mk_context()``.
|
||||
AND is always constructed using the below `mk_context()`.
|
||||
|
||||
Allows maintaining task or protocol specific state between
|
||||
2 cancel-scope-linked, communicating and parallel executing
|
||||
`trio.Task`s. Contexts are allocated on each side of any task
|
||||
RPC-linked msg dialog, i.e. for every request to a remote
|
||||
actor from a `Portal`. On the "callee" side a context is
|
||||
always allocated inside ``._rpc._invoke()``.
|
||||
always allocated inside `._rpc._invoke()`.
|
||||
|
||||
TODO: more detailed writeup on cancellation, error and
|
||||
streaming semantics..
|
||||
|
@ -262,7 +265,13 @@ class Context:
|
|||
_strict_started: bool = False
|
||||
_cancel_on_msgerr: bool = True
|
||||
|
||||
def __str__(self) -> str:
|
||||
def pformat(
|
||||
self,
|
||||
extra_fields: dict[str, Any]|None = None,
|
||||
# ^-TODO-^ some built-in extra state fields
|
||||
# we'll want in some devx specific cases?
|
||||
|
||||
) -> str:
|
||||
ds: str = '='
|
||||
# ds: str = ': '
|
||||
|
||||
|
@ -279,11 +288,7 @@ class Context:
|
|||
outcome_str: str = self.repr_outcome(
|
||||
show_error_fields=True
|
||||
)
|
||||
outcome_typ_str: str = self.repr_outcome(
|
||||
type_only=True
|
||||
)
|
||||
|
||||
return (
|
||||
fmtstr: str = (
|
||||
f'<Context(\n'
|
||||
# f'\n'
|
||||
# f' ---\n'
|
||||
|
@ -304,12 +309,12 @@ class Context:
|
|||
# f' -----\n'
|
||||
#
|
||||
# TODO: better state `str`ids?
|
||||
# -[ ] maybe map err-types to strs like 'cancelled',
|
||||
# -[x] maybe map err-types to strs like 'cancelled',
|
||||
# 'errored', 'streaming', 'started', .. etc.
|
||||
# -[ ] as well as a final result wrapper like
|
||||
# `outcome.Value`?
|
||||
#
|
||||
f' |_state: {outcome_typ_str}\n'
|
||||
f' |_state: {self.repr_state!r}\n'
|
||||
|
||||
f' outcome{ds}{outcome_str}\n'
|
||||
f' result{ds}{self._result}\n'
|
||||
|
@ -324,6 +329,16 @@ class Context:
|
|||
# -[ ] remove this ^ right?
|
||||
|
||||
# f' _remote_error={self._remote_error}
|
||||
)
|
||||
if extra_fields:
|
||||
for key, val in extra_fields.items():
|
||||
fmtstr += (
|
||||
f' {key}{ds}{val!r}\n'
|
||||
)
|
||||
|
||||
return (
|
||||
fmtstr
|
||||
+
|
||||
')>\n'
|
||||
)
|
||||
# NOTE: making this return a value that can be passed to
|
||||
|
@ -335,7 +350,8 @@ class Context:
|
|||
# logging perspective over `eval()`-ability since we do NOT
|
||||
# target serializing non-struct instances!
|
||||
# def __repr__(self) -> str:
|
||||
__repr__ = __str__
|
||||
__str__ = pformat
|
||||
__repr__ = pformat
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
|
@ -373,8 +389,12 @@ class Context:
|
|||
|
||||
re: BaseException|None = (
|
||||
remote_error
|
||||
or self._remote_error
|
||||
or
|
||||
self._remote_error
|
||||
)
|
||||
# XXX we only report "this context" as self-cancelled
|
||||
# once we've received a ctxc from our direct-peer task
|
||||
# (aka we're `.cancel_acked`).
|
||||
if not re:
|
||||
return False
|
||||
|
||||
|
@ -385,10 +405,10 @@ class Context:
|
|||
our_canceller = self.canceller
|
||||
|
||||
return bool(
|
||||
isinstance(re, ContextCancelled)
|
||||
isinstance((ctxc := re), ContextCancelled)
|
||||
and from_uid == self.chan.uid
|
||||
and re.canceller == our_uid
|
||||
and our_canceller == from_uid
|
||||
and ctxc.canceller == our_uid
|
||||
and our_canceller == our_uid
|
||||
)
|
||||
|
||||
@property
|
||||
|
@ -608,52 +628,61 @@ class Context:
|
|||
)
|
||||
self._remote_error: BaseException = error
|
||||
|
||||
msgerr: bool = False
|
||||
|
||||
# self-cancel (ack) or,
|
||||
# peer propagated remote cancellation.
|
||||
msgerr: bool = False
|
||||
if isinstance(error, ContextCancelled):
|
||||
# NOTE in the case error is a ctxc the canceller will
|
||||
# either be another peer or us. in the case where it's us
|
||||
# we mark ourself as the canceller of ourselves (a ctx
|
||||
# "self cancel" from this side's perspective), if instead
|
||||
# the far end was cancelled by some other (inter-) peer,
|
||||
# we want to mark our canceller as the actor that was
|
||||
# cancelled, NOT their reported canceller. IOW in the
|
||||
# latter case we're cancelled by someone else getting
|
||||
# cancelled.
|
||||
if (canc := error.canceller) == self._actor.uid:
|
||||
whom: str = 'us'
|
||||
self._canceller = canc
|
||||
else:
|
||||
whom = 'a remote peer (not us)'
|
||||
self._canceller = error.src_uid
|
||||
|
||||
whom: str = (
|
||||
'us' if error.canceller == self._actor.uid
|
||||
else 'peer'
|
||||
)
|
||||
log.cancel(
|
||||
f'IPC context cancelled by {whom}!\n\n'
|
||||
f'IPC context was cancelled by {whom}!\n\n'
|
||||
f'{error}'
|
||||
)
|
||||
|
||||
elif isinstance(error, MsgTypeError):
|
||||
msgerr = True
|
||||
self._canceller = error.src_uid
|
||||
log.error(
|
||||
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
|
||||
|
||||
f'{error}\n'
|
||||
f'{pformat(self)}\n'
|
||||
)
|
||||
|
||||
else:
|
||||
# always record the cancelling actor's uid since its
|
||||
# cancellation state is linked and we want to know
|
||||
# which process was the cause / requester of the
|
||||
# cancellation.
|
||||
maybe_error_src_uid: tuple = getattr(
|
||||
error,
|
||||
'src_uid',
|
||||
None,
|
||||
)
|
||||
# we mark the source actor as our canceller
|
||||
self._canceller = maybe_error_src_uid
|
||||
log.error(
|
||||
f'Remote context error:\n\n'
|
||||
# f'{pformat(self)}\n'
|
||||
f'{error}\n'
|
||||
)
|
||||
|
||||
# always record the cancelling actor's uid since its
|
||||
# cancellation state is linked and we want to know
|
||||
# which process was the cause / requester of the
|
||||
# cancellation.
|
||||
maybe_error_src: tuple = getattr(
|
||||
error,
|
||||
'src_uid',
|
||||
None,
|
||||
)
|
||||
self._canceller = (
|
||||
maybe_error_src
|
||||
or
|
||||
# XXX: in the case we get a non-boxed error?
|
||||
# -> wait but this should never happen right?
|
||||
self.chan.uid
|
||||
)
|
||||
if self._canceller is None:
|
||||
log.error('Ctx has no canceller set!?')
|
||||
|
||||
# Cancel the local `._scope`, catch that
|
||||
# `._scope.cancelled_caught` and re-raise any remote error
|
||||
|
@ -696,29 +725,35 @@ class Context:
|
|||
else:
|
||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||
|
||||
scope_info: str = 'No `self._scope: CancelScope` was set/used ?'
|
||||
if cs:
|
||||
scope_info: str = (
|
||||
f'self._scope: {cs}\n'
|
||||
f'|_ .cancel_called: {cs.cancel_called}\n'
|
||||
f'|_ .cancelled_caught: {cs.cancelled_caught}\n'
|
||||
f'|_ ._cancel_status: {cs._cancel_status}\n\n'
|
||||
|
||||
f'{self}\n'
|
||||
f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n'
|
||||
f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n'
|
||||
|
||||
f'msgerr: {msgerr}\n'
|
||||
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
|
||||
if (
|
||||
cs
|
||||
and
|
||||
at_least_level(log=log, level='cancel')
|
||||
):
|
||||
fmt_str: str = self.pformat(
|
||||
extra_fields={
|
||||
'._is_self_cancelled()': self._is_self_cancelled(),
|
||||
'._cancel_on_msgerr': self._cancel_on_msgerr,
|
||||
}
|
||||
)
|
||||
from .devx.pformat import pformat_cs
|
||||
cs_fmt: str = pformat_cs(
|
||||
cs,
|
||||
var_name='Context._scope',
|
||||
)
|
||||
fmt_str += (
|
||||
'\n'
|
||||
+
|
||||
cs_fmt
|
||||
)
|
||||
log.cancel(
|
||||
message
|
||||
+
|
||||
f'{scope_info}'
|
||||
fmt_str
|
||||
)
|
||||
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
||||
# exists to support cancelling any drain loop hangs?
|
||||
|
||||
# TODO: add to `Channel`?
|
||||
# TODO: also add to `Channel`?
|
||||
@property
|
||||
def dst_maddr(self) -> str:
|
||||
chan: Channel = self.chan
|
||||
|
@ -748,7 +783,7 @@ class Context:
|
|||
)
|
||||
return (
|
||||
# f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:'
|
||||
f'{self._nsf}() -> {outcome_str}:'
|
||||
f'{self._nsf}() -> {outcome_str}'
|
||||
)
|
||||
|
||||
@property
|
||||
|
@ -836,7 +871,7 @@ class Context:
|
|||
if not self._portal:
|
||||
raise InternalError(
|
||||
'No portal found!?\n'
|
||||
'Why is this supposed caller context missing it?'
|
||||
'Why is this supposed {self.side!r}-side ctx task missing it?!?'
|
||||
)
|
||||
|
||||
cid: str = self.cid
|
||||
|
@ -1091,7 +1126,8 @@ class Context:
|
|||
f'ctx id: {self.cid}'
|
||||
)
|
||||
|
||||
# TODO: replace all the instances of this!! XD
|
||||
# TODO: replace all the `._maybe_raise_remote_err()` usage
|
||||
# with instances of this!!
|
||||
def maybe_raise(
|
||||
self,
|
||||
hide_tb: bool = True,
|
||||
|
@ -1102,6 +1138,7 @@ class Context:
|
|||
if re := self._remote_error:
|
||||
return self._maybe_raise_remote_err(
|
||||
re,
|
||||
hide_tb=hide_tb,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
@ -1203,7 +1240,6 @@ class Context:
|
|||
# runtime frames from the tb explicitly?
|
||||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||
# https://stackoverflow.com/a/24752607
|
||||
__tracebackhide__: bool = True
|
||||
raise remote_error # from None
|
||||
|
||||
# TODO: change to `.wait_for_result()`?
|
||||
|
@ -1254,8 +1290,15 @@ class Context:
|
|||
# wait for a final context result/error by "draining"
|
||||
# (by more or less ignoring) any bi-dir-stream "yield"
|
||||
# msgs still in transit from the far end.
|
||||
#
|
||||
# XXX NOTE XXX: this call shouldn't really ever raise
|
||||
# (other then internal error), instead delivering an
|
||||
# `Error`-msg and that being `.maybe_raise()`-ed below
|
||||
# since every message should be delivered via the normal
|
||||
# `._deliver_msg()` route which will appropriately set
|
||||
# any `.maybe_error`.
|
||||
(
|
||||
return_msg,
|
||||
outcome_msg,
|
||||
drained_msgs,
|
||||
) = await msgops.drain_to_final_msg(
|
||||
ctx=self,
|
||||
|
@ -1273,13 +1316,18 @@ class Context:
|
|||
f'{msg}\n'
|
||||
)
|
||||
|
||||
log.cancel(
|
||||
'Ctx drained pre-result msgs:\n'
|
||||
f'{pformat(drained_msgs)}\n\n'
|
||||
|
||||
f'Final return msg:\n'
|
||||
f'{return_msg}\n'
|
||||
drained_status: str = (
|
||||
'Ctx drained to final outcome msg\n\n'
|
||||
f'{outcome_msg}\n'
|
||||
)
|
||||
if drained_msgs:
|
||||
drained_status += (
|
||||
'\n'
|
||||
f'The pre-drained msgs are\n'
|
||||
f'{pformat(drained_msgs)}\n'
|
||||
)
|
||||
|
||||
log.cancel(drained_status)
|
||||
|
||||
self.maybe_raise(
|
||||
# NOTE: obvi we don't care if we
|
||||
|
@ -1310,7 +1358,7 @@ class Context:
|
|||
|
||||
@property
|
||||
def maybe_error(self) -> BaseException|None:
|
||||
le: Exception|None = self._local_error
|
||||
le: BaseException|None = self._local_error
|
||||
re: RemoteActorError|ContextCancelled|None = self._remote_error
|
||||
|
||||
match (le, re):
|
||||
|
@ -1338,7 +1386,7 @@ class Context:
|
|||
# ContextCancelled(canceller=),
|
||||
# ):
|
||||
|
||||
error: Exception|None = le or re
|
||||
error: BaseException|None = le or re
|
||||
if error:
|
||||
return error
|
||||
|
||||
|
@ -1443,6 +1491,76 @@ class Context:
|
|||
repr(self._result)
|
||||
)
|
||||
|
||||
@property
|
||||
def repr_state(self) -> str:
|
||||
'''
|
||||
A `str`-status describing the current state of this
|
||||
inter-actor IPC context in terms of the current "phase" state
|
||||
of the SC shuttling dialog protocol.
|
||||
|
||||
'''
|
||||
merr: Exception|None = self.maybe_error
|
||||
outcome: Unresolved|Exception|Any = self.outcome
|
||||
status: str|None = None
|
||||
match (
|
||||
outcome,
|
||||
merr,
|
||||
):
|
||||
# "graceful" ctx cancellation
|
||||
case (
|
||||
Unresolved,
|
||||
ContextCancelled(),
|
||||
):
|
||||
if self._is_self_cancelled():
|
||||
status = 'self-cancelled'
|
||||
elif (
|
||||
self.canceller
|
||||
and not self._cancel_called
|
||||
):
|
||||
status = 'peer-cancelled'
|
||||
|
||||
# (remote) error condition
|
||||
case (
|
||||
Unresolved,
|
||||
BaseException(), # any error-type
|
||||
):
|
||||
status = 'errored'
|
||||
|
||||
# result already returned
|
||||
case (
|
||||
_, # any non-unresolved value
|
||||
None,
|
||||
) if self._final_result_is_set():
|
||||
status = 'returned'
|
||||
|
||||
# normal operation but still in a pre-`Return`-result
|
||||
# dialog phase
|
||||
case (
|
||||
Unresolved, # noqa (ruff, you so weird..)
|
||||
None, # no (remote) error set
|
||||
):
|
||||
if stream := self._stream:
|
||||
if stream.closed:
|
||||
status = 'streaming-finished'
|
||||
else:
|
||||
status = 'streaming'
|
||||
|
||||
elif self._started_called:
|
||||
status = 'started'
|
||||
|
||||
else:
|
||||
if self.side == 'child':
|
||||
status = 'pre-started'
|
||||
else:
|
||||
status = 'syncing-to-child'
|
||||
|
||||
if status is None:
|
||||
status = '??unknown??'
|
||||
# from tractor.devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
|
||||
return status
|
||||
|
||||
async def started(
|
||||
self,
|
||||
|
||||
|
@ -1451,7 +1569,11 @@ class Context:
|
|||
value: PayloadT|None = None,
|
||||
|
||||
strict_parity: bool = False,
|
||||
complain_no_parity: bool = True,
|
||||
|
||||
# TODO: this will always emit now that we do `.pld: Raw`
|
||||
# passthrough.. so maybe just only complain when above strict
|
||||
# flag is set?
|
||||
complain_no_parity: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -1511,18 +1633,19 @@ class Context:
|
|||
)
|
||||
raise RuntimeError(
|
||||
'Failed to roundtrip `Started` msg?\n'
|
||||
f'{pformat(rt_started)}\n'
|
||||
f'{pretty_struct.pformat(rt_started)}\n'
|
||||
)
|
||||
|
||||
if rt_started != started_msg:
|
||||
# TODO: break these methods out from the struct subtype?
|
||||
|
||||
# TODO: make that one a mod func too..
|
||||
diff = pretty_struct.Struct.__sub__(
|
||||
rt_started,
|
||||
started_msg,
|
||||
)
|
||||
complaint: str = (
|
||||
'Started value does not match after codec rountrip?\n\n'
|
||||
'Started value does not match after roundtrip?\n\n'
|
||||
f'{diff}'
|
||||
)
|
||||
|
||||
|
@ -1538,8 +1661,6 @@ class Context:
|
|||
else:
|
||||
log.warning(complaint)
|
||||
|
||||
# started_msg = rt_started
|
||||
|
||||
await self.chan.send(started_msg)
|
||||
|
||||
# raise any msg type error NO MATTER WHAT!
|
||||
|
@ -1667,7 +1788,6 @@ class Context:
|
|||
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
|
||||
|
||||
f'{flow_body}'
|
||||
|
||||
f'{pformat(re)}\n'
|
||||
)
|
||||
self._cancel_msg: dict = msg
|
||||
|
@ -1932,6 +2052,7 @@ async def open_context_from_portal(
|
|||
)
|
||||
assert ctx._remote_func_type == 'context'
|
||||
assert ctx._caller_info
|
||||
_ctxvar_Context.set(ctx)
|
||||
|
||||
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
||||
# `Started`-msg any cancellation triggered
|
||||
|
@ -2085,7 +2206,7 @@ async def open_context_from_portal(
|
|||
|
||||
# CASE 2: context was cancelled by local task calling
|
||||
# `.cancel()`, we don't raise and the exit block should
|
||||
# exit silently.
|
||||
# finish silently.
|
||||
if (
|
||||
ctx._cancel_called
|
||||
and
|
||||
|
@ -2210,6 +2331,11 @@ async def open_context_from_portal(
|
|||
try:
|
||||
result_or_err: Exception|Any = await ctx.result()
|
||||
except BaseException as berr:
|
||||
# cancelled before (or maybe during?) final result capture
|
||||
# if isinstance(trio.Cancelled, berr):
|
||||
# from .devx import mk_pdb
|
||||
# mk_pdb.set_trace()
|
||||
|
||||
# on normal teardown, if we get some error
|
||||
# raised in `Context.result()` we still want to
|
||||
# save that error on the ctx's state to
|
||||
|
@ -2354,7 +2480,7 @@ async def open_context_from_portal(
|
|||
# FINALLY, remove the context from runtime tracking and
|
||||
# exit!
|
||||
log.runtime(
|
||||
'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
|
||||
f'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
|
||||
f'uid: {uid}\n'
|
||||
f'cid: {ctx.cid}\n'
|
||||
)
|
||||
|
@ -2390,10 +2516,8 @@ def mk_context(
|
|||
from .devx._code import find_caller_info
|
||||
caller_info: CallerInfo|None = find_caller_info()
|
||||
|
||||
pld_rx = msgops.PldRx(
|
||||
# _rx_mc=recv_chan,
|
||||
_msgdec=_codec.mk_dec(spec=pld_spec)
|
||||
)
|
||||
# TODO: when/how do we apply `.limit_plds()` from here?
|
||||
pld_rx: msgops.PldRx = msgops.current_pldrx()
|
||||
|
||||
ctx = Context(
|
||||
chan=chan,
|
||||
|
@ -2407,12 +2531,12 @@ def mk_context(
|
|||
_caller_info=caller_info,
|
||||
**kwargs,
|
||||
)
|
||||
# TODO: we can drop the old placeholder yah?
|
||||
# ctx._result: int | Any = id(ctx)
|
||||
ctx._result = Unresolved
|
||||
return ctx
|
||||
|
||||
|
||||
# TODO: use the new type-parameters to annotate this in 3.13?
|
||||
# -[ ] https://peps.python.org/pep-0718/#unknown-types
|
||||
def context(func: Callable) -> Callable:
|
||||
'''
|
||||
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
||||
|
@ -2426,8 +2550,8 @@ def context(func: Callable) -> Callable:
|
|||
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
|
||||
func._tractor_context_function = True # type: ignore
|
||||
|
||||
sig = inspect.signature(func)
|
||||
params = sig.parameters
|
||||
sig: inspect.Signature = inspect.signature(func)
|
||||
params: Mapping = sig.parameters
|
||||
if 'ctx' not in params:
|
||||
raise TypeError(
|
||||
"The first argument to the context function "
|
||||
|
|
|
@ -20,6 +20,7 @@ Sub-process entry points.
|
|||
"""
|
||||
from __future__ import annotations
|
||||
from functools import partial
|
||||
# import textwrap
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
|
@ -91,7 +92,7 @@ def _mp_main(
|
|||
pass # handle it the same way trio does?
|
||||
|
||||
finally:
|
||||
log.info(f"Actor {actor.uid} terminated")
|
||||
log.info(f"Subactor {actor.uid} terminated")
|
||||
|
||||
|
||||
def _trio_main(
|
||||
|
@ -125,9 +126,11 @@ def _trio_main(
|
|||
f' loglevel: {actor.loglevel}\n'
|
||||
)
|
||||
log.info(
|
||||
'Started new trio process:\n'
|
||||
'Started new trio subactor:\n'
|
||||
+
|
||||
actor_info
|
||||
'>\n' # like a "started/play"-icon from super perspective
|
||||
+
|
||||
actor_info,
|
||||
)
|
||||
|
||||
try:
|
||||
|
@ -146,7 +149,9 @@ def _trio_main(
|
|||
|
||||
finally:
|
||||
log.info(
|
||||
'Actor terminated\n'
|
||||
'Subactor terminated\n'
|
||||
+
|
||||
'x\n' # like a "crossed-out/killed" from super perspective
|
||||
+
|
||||
actor_info
|
||||
)
|
||||
|
|
|
@ -46,7 +46,6 @@ from tractor.msg import (
|
|||
Error,
|
||||
MsgType,
|
||||
Stop,
|
||||
Yield,
|
||||
types as msgtypes,
|
||||
MsgCodec,
|
||||
MsgDec,
|
||||
|
@ -140,71 +139,6 @@ def get_err_type(type_name: str) -> BaseException|None:
|
|||
return type_ref
|
||||
|
||||
|
||||
def pformat_boxed_tb(
|
||||
tb_str: str,
|
||||
fields_str: str|None = None,
|
||||
field_prefix: str = ' |_',
|
||||
|
||||
tb_box_indent: int|None = None,
|
||||
tb_body_indent: int = 1,
|
||||
|
||||
) -> str:
|
||||
if (
|
||||
fields_str
|
||||
and
|
||||
field_prefix
|
||||
):
|
||||
fields: str = textwrap.indent(
|
||||
fields_str,
|
||||
prefix=field_prefix,
|
||||
)
|
||||
else:
|
||||
fields = fields_str or ''
|
||||
|
||||
tb_body = tb_str
|
||||
if tb_body_indent:
|
||||
tb_body: str = textwrap.indent(
|
||||
tb_str,
|
||||
prefix=tb_body_indent * ' ',
|
||||
)
|
||||
|
||||
tb_box: str = (
|
||||
|
||||
# orig
|
||||
# f' |\n'
|
||||
# f' ------ - ------\n\n'
|
||||
# f'{tb_str}\n'
|
||||
# f' ------ - ------\n'
|
||||
# f' _|\n'
|
||||
|
||||
f'|\n'
|
||||
f' ------ - ------\n\n'
|
||||
# f'{tb_str}\n'
|
||||
f'{tb_body}'
|
||||
f' ------ - ------\n'
|
||||
f'_|\n'
|
||||
)
|
||||
tb_box_indent: str = (
|
||||
tb_box_indent
|
||||
or
|
||||
1
|
||||
|
||||
# (len(field_prefix))
|
||||
# ? ^-TODO-^ ? if you wanted another indent level
|
||||
)
|
||||
if tb_box_indent > 0:
|
||||
tb_box: str = textwrap.indent(
|
||||
tb_box,
|
||||
prefix=tb_box_indent * ' ',
|
||||
)
|
||||
|
||||
return (
|
||||
fields
|
||||
+
|
||||
tb_box
|
||||
)
|
||||
|
||||
|
||||
def pack_from_raise(
|
||||
local_err: (
|
||||
ContextCancelled
|
||||
|
@ -277,6 +211,8 @@ class RemoteActorError(Exception):
|
|||
) -> None:
|
||||
super().__init__(message)
|
||||
|
||||
# for manual display without having to muck with `Exception.args`
|
||||
self._message: str = message
|
||||
# TODO: maybe a better name?
|
||||
# - .errtype
|
||||
# - .retype
|
||||
|
@ -504,30 +440,40 @@ class RemoteActorError(Exception):
|
|||
reprol_str: str = (
|
||||
f'{type(self).__name__}' # type name
|
||||
f'[{self.boxed_type_str}]' # parameterized by boxed type
|
||||
'(' # init-style look
|
||||
)
|
||||
|
||||
_repr: str = self._mk_fields_str(
|
||||
self.reprol_fields,
|
||||
end_char=' ',
|
||||
)
|
||||
if _repr:
|
||||
reprol_str += '(' # init-style call
|
||||
|
||||
return (
|
||||
reprol_str
|
||||
+
|
||||
_repr
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
def pformat(self) -> str:
|
||||
'''
|
||||
Nicely formatted boxed error meta data + traceback.
|
||||
Nicely formatted boxed error meta data + traceback, OR just
|
||||
the normal message from `.args` (for eg. as you'd want shown
|
||||
by a locally raised `ContextCancelled`).
|
||||
|
||||
'''
|
||||
tb_str: str = self.tb_str
|
||||
if tb_str:
|
||||
fields: str = self._mk_fields_str(
|
||||
_body_fields
|
||||
+
|
||||
self.extra_body_fields,
|
||||
)
|
||||
from tractor.devx import (
|
||||
pformat_boxed_tb,
|
||||
)
|
||||
body: str = pformat_boxed_tb(
|
||||
tb_str=self.tb_str,
|
||||
tb_str=tb_str,
|
||||
fields_str=fields,
|
||||
field_prefix=' |_',
|
||||
# ^- is so that it's placed like so,
|
||||
|
@ -535,12 +481,20 @@ class RemoteActorError(Exception):
|
|||
# |___ ..
|
||||
tb_body_indent=1,
|
||||
)
|
||||
else:
|
||||
body: str = textwrap.indent(
|
||||
self._message,
|
||||
prefix=' ',
|
||||
) + '\n'
|
||||
return (
|
||||
f'<{type(self).__name__}(\n'
|
||||
f'{body}'
|
||||
')>'
|
||||
)
|
||||
|
||||
__repr__ = pformat
|
||||
__str__ = pformat
|
||||
|
||||
def unwrap(
|
||||
self,
|
||||
) -> BaseException:
|
||||
|
@ -870,12 +824,9 @@ def pack_error(
|
|||
|
||||
def unpack_error(
|
||||
msg: Error,
|
||||
|
||||
chan: Channel|None = None,
|
||||
chan: Channel,
|
||||
box_type: RemoteActorError = RemoteActorError,
|
||||
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> None|Exception:
|
||||
'''
|
||||
Unpack an 'error' message from the wire
|
||||
|
@ -885,12 +836,10 @@ def unpack_error(
|
|||
which is the responsibilitiy of the caller.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
if not isinstance(msg, Error):
|
||||
return None
|
||||
|
||||
# retrieve the remote error's encoded details from fields
|
||||
# retrieve the remote error's msg-encoded details
|
||||
tb_str: str = msg.tb_str
|
||||
message: str = (
|
||||
f'{chan.uid}\n'
|
||||
|
@ -919,7 +868,6 @@ def unpack_error(
|
|||
# original source error.
|
||||
elif boxed_type_str == 'RemoteActorError':
|
||||
assert boxed_type is RemoteActorError
|
||||
# assert len(error_dict['relay_path']) >= 1
|
||||
assert len(msg.relay_path) >= 1
|
||||
|
||||
exc = box_type(
|
||||
|
@ -1004,8 +952,6 @@ def _raise_from_unexpected_msg(
|
|||
raise unpack_error(
|
||||
msg,
|
||||
ctx.chan,
|
||||
hide_tb=hide_tb,
|
||||
|
||||
) from src_err
|
||||
|
||||
# `MsgStream` termination msg.
|
||||
|
@ -1075,6 +1021,7 @@ def _mk_msg_type_err(
|
|||
|
||||
src_validation_error: ValidationError|None = None,
|
||||
src_type_error: TypeError|None = None,
|
||||
is_invalid_payload: bool = False,
|
||||
|
||||
) -> MsgTypeError:
|
||||
'''
|
||||
|
@ -1089,17 +1036,13 @@ def _mk_msg_type_err(
|
|||
'`codec` must be a `MsgCodec` for send-side errors?'
|
||||
)
|
||||
|
||||
from tractor.devx import (
|
||||
pformat_caller_frame,
|
||||
)
|
||||
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
||||
# prolly a manual type-check on our part.
|
||||
if message is None:
|
||||
fmt_stack: str = (
|
||||
'\n'.join(traceback.format_stack(limit=3))
|
||||
)
|
||||
tb_fmt: str = pformat_boxed_tb(
|
||||
tb_str=fmt_stack,
|
||||
field_prefix=' ',
|
||||
indent='',
|
||||
)
|
||||
tb_fmt: str = pformat_caller_frame(stack_limit=3)
|
||||
message: str = (
|
||||
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||
f'{tb_fmt}\n'
|
||||
|
@ -1135,6 +1078,16 @@ def _mk_msg_type_err(
|
|||
return msgtyperr
|
||||
|
||||
# `Channel.recv()` case
|
||||
else:
|
||||
if is_invalid_payload:
|
||||
msg_type: str = type(msg)
|
||||
message: str = (
|
||||
f'invalid `{msg_type.__qualname__}` payload\n\n'
|
||||
f'<{type(msg).__qualname__}(\n'
|
||||
f' |_pld: {codec.pld_spec_str} = {msg.pld!r}'
|
||||
f')>\n'
|
||||
)
|
||||
|
||||
else:
|
||||
# decode the msg-bytes using the std msgpack
|
||||
# interchange-prot (i.e. without any
|
||||
|
@ -1147,12 +1100,9 @@ def _mk_msg_type_err(
|
|||
message: str = (
|
||||
f'invalid `{msg_type_name}` IPC msg\n\n'
|
||||
)
|
||||
if verb_header:
|
||||
message = f'{verb_header} ' + message
|
||||
|
||||
# XXX see if we can determine the exact invalid field
|
||||
# such that we can comprehensively report the
|
||||
# specific field's type problem
|
||||
# XXX be "fancy" and see if we can determine the exact
|
||||
# invalid field such that we can comprehensively report
|
||||
# the specific field's type problem.
|
||||
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
|
||||
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
||||
obj = object()
|
||||
|
@ -1177,6 +1127,9 @@ def _mk_msg_type_err(
|
|||
f')>'
|
||||
)
|
||||
|
||||
if verb_header:
|
||||
message = f'{verb_header} ' + message
|
||||
|
||||
msgtyperr = MsgTypeError.from_decode(
|
||||
message=message,
|
||||
msgdict=msg_dict,
|
||||
|
|
|
@ -68,40 +68,6 @@ if TYPE_CHECKING:
|
|||
log = get_logger(__name__)
|
||||
|
||||
|
||||
# TODO: remove and/or rework?
|
||||
# -[ ] rename to `unwrap_result()` and use
|
||||
# `._raise_from_unexpected_msg()` (after tweak to accept a `chan:
|
||||
# Channel` arg) in key block??
|
||||
# -[ ] pretty sure this is entirely covered by
|
||||
# `_exceptions._raise_from_unexpected_msg()` so REMOVE!
|
||||
# def _unwrap_msg(
|
||||
# msg: Return|Error,
|
||||
# ctx: Context,
|
||||
|
||||
# hide_tb: bool = True,
|
||||
|
||||
# ) -> Any:
|
||||
# '''
|
||||
# Unwrap a final result from a `{return: <Any>}` IPC msg.
|
||||
|
||||
# '''
|
||||
# __tracebackhide__: bool = hide_tb
|
||||
# try:
|
||||
# return msg.pld
|
||||
# except AttributeError as err:
|
||||
|
||||
# # internal error should never get here
|
||||
# # assert msg.get('cid'), (
|
||||
# assert msg.cid, (
|
||||
# "Received internal error at portal?"
|
||||
# )
|
||||
|
||||
# raise unpack_error(
|
||||
# msg,
|
||||
# ctx.chan,
|
||||
# ) from err
|
||||
|
||||
|
||||
class Portal:
|
||||
'''
|
||||
A 'portal' to a memory-domain-separated `Actor`.
|
||||
|
@ -173,12 +139,13 @@ class Portal:
|
|||
portal=self,
|
||||
)
|
||||
|
||||
# @api_frame
|
||||
async def result(self) -> Any:
|
||||
'''
|
||||
Return the result(s) from the remote actor's "main" task.
|
||||
|
||||
'''
|
||||
# __tracebackhide__ = True
|
||||
__tracebackhide__ = True
|
||||
# Check for non-rpc errors slapped on the
|
||||
# channel for which we always raise
|
||||
exc = self.channel._exc
|
||||
|
@ -254,11 +221,11 @@ class Portal:
|
|||
return False
|
||||
|
||||
reminfo: str = (
|
||||
f'`Portal.cancel_actor()` => {self.channel.uid}\n'
|
||||
f'Portal.cancel_actor() => {self.channel.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
log.cancel(
|
||||
f'Sending runtime `.cancel()` request to peer\n\n'
|
||||
f'Requesting runtime cancel for peer\n\n'
|
||||
f'{reminfo}'
|
||||
)
|
||||
|
||||
|
@ -435,7 +402,6 @@ class Portal:
|
|||
yield stream
|
||||
|
||||
finally:
|
||||
|
||||
# cancel the far end task on consumer close
|
||||
# NOTE: this is a special case since we assume that if using
|
||||
# this ``.open_fream_from()`` api, the stream is one a one
|
||||
|
@ -496,7 +462,7 @@ class LocalPortal:
|
|||
async def open_portal(
|
||||
|
||||
channel: Channel,
|
||||
nursery: trio.Nursery|None = None,
|
||||
tn: trio.Nursery|None = None,
|
||||
start_msg_loop: bool = True,
|
||||
shield: bool = False,
|
||||
|
||||
|
@ -504,15 +470,19 @@ async def open_portal(
|
|||
'''
|
||||
Open a ``Portal`` through the provided ``channel``.
|
||||
|
||||
Spawns a background task to handle message processing (normally
|
||||
done by the actor-runtime implicitly).
|
||||
Spawns a background task to handle RPC processing, normally
|
||||
done by the actor-runtime implicitly via a call to
|
||||
`._rpc.process_messages()`. just after connection establishment.
|
||||
|
||||
'''
|
||||
actor = current_actor()
|
||||
assert actor
|
||||
was_connected: bool = False
|
||||
|
||||
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
||||
async with maybe_open_nursery(
|
||||
tn,
|
||||
shield=shield,
|
||||
) as tn:
|
||||
|
||||
if not channel.connected():
|
||||
await channel.connect()
|
||||
|
@ -524,7 +494,7 @@ async def open_portal(
|
|||
msg_loop_cs: trio.CancelScope|None = None
|
||||
if start_msg_loop:
|
||||
from ._runtime import process_messages
|
||||
msg_loop_cs = await nursery.start(
|
||||
msg_loop_cs = await tn.start(
|
||||
partial(
|
||||
process_messages,
|
||||
actor,
|
||||
|
@ -544,7 +514,7 @@ async def open_portal(
|
|||
await channel.aclose()
|
||||
|
||||
# cancel background msg loop task
|
||||
if msg_loop_cs:
|
||||
if msg_loop_cs is not None:
|
||||
msg_loop_cs.cancel()
|
||||
|
||||
nursery.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
|
|
@ -124,8 +124,9 @@ async def open_root_actor(
|
|||
# usage by a clobbered TTY's stdstreams!
|
||||
def block_bps(*args, **kwargs):
|
||||
raise RuntimeError(
|
||||
'`tractor` blocks built-in `breakpoint()` calls by default!\n'
|
||||
'If you need to us it please install `greenback` and set '
|
||||
'Trying to use `breakpoint()` eh?\n'
|
||||
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
|
||||
'If you need to use it please install `greenback` and set '
|
||||
'`debug_mode=True` when opening the runtime '
|
||||
'(either via `.open_nursery()` or `open_root_actor()`)\n'
|
||||
)
|
||||
|
|
160
tractor/_rpc.py
160
tractor/_rpc.py
|
@ -57,6 +57,7 @@ from ._exceptions import (
|
|||
from .devx import (
|
||||
maybe_wait_for_debugger,
|
||||
_debug,
|
||||
add_div,
|
||||
)
|
||||
from . import _state
|
||||
from .log import get_logger
|
||||
|
@ -64,11 +65,13 @@ from .msg import (
|
|||
current_codec,
|
||||
MsgCodec,
|
||||
NamespacePath,
|
||||
pretty_struct,
|
||||
)
|
||||
from tractor.msg.types import (
|
||||
CancelAck,
|
||||
Error,
|
||||
Msg,
|
||||
MsgType,
|
||||
Return,
|
||||
Start,
|
||||
StartAck,
|
||||
|
@ -248,6 +251,9 @@ async def _errors_relayed_via_ipc(
|
|||
|
||||
) -> None:
|
||||
__tracebackhide__: bool = hide_tb
|
||||
# TODO: a debug nursery when in debug mode!
|
||||
# async with maybe_open_debugger_nursery() as debug_tn:
|
||||
# => see matching comment in side `._debug._pause()`
|
||||
try:
|
||||
yield # run RPC invoke body
|
||||
|
||||
|
@ -271,6 +277,8 @@ async def _errors_relayed_via_ipc(
|
|||
|
||||
# TODO: maybe we'll want different "levels" of debugging
|
||||
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||
#
|
||||
# -[ ] this if check is duplicate with `._maybe_enter_pm()`..
|
||||
if not is_multi_cancelled(err):
|
||||
entered_debug: bool = False
|
||||
if (
|
||||
|
@ -294,7 +302,6 @@ async def _errors_relayed_via_ipc(
|
|||
)
|
||||
)
|
||||
):
|
||||
# await _debug.pause()
|
||||
# XXX QUESTION XXX: is there any case where we'll
|
||||
# want to debug IPC disconnects as a default?
|
||||
# => I can't think of a reason that inspecting this
|
||||
|
@ -302,7 +309,14 @@ async def _errors_relayed_via_ipc(
|
|||
# recovery logic - the only case is some kind of
|
||||
# strange bug in our transport layer itself? Going
|
||||
# to keep this open ended for now.
|
||||
entered_debug = await _debug._maybe_enter_pm(err)
|
||||
log.debug(
|
||||
'RPC task crashed, attempting to enter debugger\n'
|
||||
f'|_{ctx}'
|
||||
)
|
||||
entered_debug = await _debug._maybe_enter_pm(
|
||||
err,
|
||||
api_frame=inspect.currentframe(),
|
||||
)
|
||||
if not entered_debug:
|
||||
log.exception(
|
||||
'RPC task crashed\n'
|
||||
|
@ -432,6 +446,8 @@ async def _invoke(
|
|||
)
|
||||
context: bool = False
|
||||
|
||||
assert not _state._ctxvar_Context.get()
|
||||
|
||||
# TODO: deprecate this style..
|
||||
if getattr(func, '_tractor_stream_function', False):
|
||||
# handle decorated ``@tractor.stream`` async functions
|
||||
|
@ -555,6 +571,7 @@ async def _invoke(
|
|||
async with trio.open_nursery() as tn:
|
||||
ctx._scope_nursery = tn
|
||||
ctx._scope = tn.cancel_scope
|
||||
_state._ctxvar_Context.set(ctx)
|
||||
task_status.started(ctx)
|
||||
|
||||
# TODO: should would be nice to have our
|
||||
|
@ -590,7 +607,6 @@ async def _invoke(
|
|||
cs: CancelScope = ctx._scope
|
||||
|
||||
if cs.cancel_called:
|
||||
|
||||
canceller: tuple = ctx.canceller
|
||||
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
||||
|
||||
|
@ -619,23 +635,9 @@ async def _invoke(
|
|||
else:
|
||||
explain += 'a remote peer'
|
||||
|
||||
# TODO: move this "div centering" into
|
||||
# a helper for use elsewhere!
|
||||
div_chars: str = '------ - ------'
|
||||
div_offset: int = (
|
||||
round(len(explain)/2)+1
|
||||
+
|
||||
round(len(div_chars)/2)+1
|
||||
)
|
||||
div_str: str = (
|
||||
'\n'
|
||||
+
|
||||
' '*div_offset
|
||||
+
|
||||
f'{div_chars}\n'
|
||||
)
|
||||
explain += (
|
||||
div_str +
|
||||
add_div(message=explain)
|
||||
+
|
||||
f'<= canceller: {canceller}\n'
|
||||
f'=> cancellee: {our_uid}\n'
|
||||
# TODO: better repr for ctx tasks..
|
||||
|
@ -662,10 +664,10 @@ async def _invoke(
|
|||
boxed_type=trio.Cancelled,
|
||||
canceller=canceller,
|
||||
)
|
||||
# assign local error so that the `.outcome`
|
||||
# resolves to an error for both reporting and
|
||||
# state checks.
|
||||
ctx._local_error = ctxc
|
||||
# does this matter other then for
|
||||
# consistentcy/testing? |_ no user code should be
|
||||
# in this scope at this point..
|
||||
# ctx._local_error = ctxc
|
||||
raise ctxc
|
||||
|
||||
# XXX: do we ever trigger this block any more?
|
||||
|
@ -675,6 +677,13 @@ async def _invoke(
|
|||
BaseException,
|
||||
|
||||
) as scope_error:
|
||||
if (
|
||||
isinstance(scope_error, RuntimeError)
|
||||
and scope_error.args
|
||||
and 'Cancel scope stack corrupted' in scope_error.args[0]
|
||||
):
|
||||
log.exception('Cancel scope stack corrupted!?\n')
|
||||
# _debug.mk_pdb().set_trace()
|
||||
|
||||
# always set this (child) side's exception as the
|
||||
# local error on the context
|
||||
|
@ -708,17 +717,32 @@ async def _invoke(
|
|||
res_type_str,
|
||||
res_str,
|
||||
) = (
|
||||
('error', f'{type(merr)}',)
|
||||
if merr
|
||||
('error', f'{type(merr)}',) if merr
|
||||
else (
|
||||
'result',
|
||||
f'`{repr(ctx.outcome)}`',
|
||||
)
|
||||
)
|
||||
log.runtime(
|
||||
message: str = (
|
||||
f'IPC context terminated with a final {res_type_str}\n\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
if merr:
|
||||
from tractor import RemoteActorError
|
||||
if not isinstance(merr, RemoteActorError):
|
||||
fmt_merr: str = (
|
||||
f'\n{merr!r}\n'
|
||||
# f'{merr.args[0]!r}\n'
|
||||
)
|
||||
else:
|
||||
fmt_merr = f'\n{merr!r}'
|
||||
log.error(
|
||||
message
|
||||
+
|
||||
fmt_merr
|
||||
)
|
||||
else:
|
||||
log.runtime(message)
|
||||
|
||||
|
||||
async def try_ship_error_to_remote(
|
||||
|
@ -774,7 +798,10 @@ async def process_messages(
|
|||
shield: bool = False,
|
||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> bool:
|
||||
) -> (
|
||||
bool, # chan diconnected
|
||||
MsgType, # last msg
|
||||
):
|
||||
'''
|
||||
This is the low-level, per-IPC-channel, RPC task scheduler loop.
|
||||
|
||||
|
@ -816,11 +843,6 @@ async def process_messages(
|
|||
# |_ for ex, from `aioquic` which exposed "stream ids":
|
||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
||||
log.runtime(
|
||||
'Entering RPC msg loop:\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
nursery_cancelled_before_task: bool = False
|
||||
msg: Msg|None = None
|
||||
try:
|
||||
|
@ -834,12 +856,15 @@ async def process_messages(
|
|||
|
||||
async for msg in chan:
|
||||
log.transport( # type: ignore
|
||||
f'<= IPC msg from peer: {chan.uid}\n\n'
|
||||
f'IPC msg from peer\n'
|
||||
f'<= {chan.uid}\n\n'
|
||||
|
||||
# TODO: avoid fmting depending on loglevel for perf?
|
||||
# -[ ] specifically `pformat()` sub-call..?
|
||||
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
|
||||
# - how to only log-level-aware actually call this?
|
||||
# -[ ] use `.msg.pretty_struct` here now instead!
|
||||
f'{pformat(msg)}\n'
|
||||
# f'{pretty_struct.pformat(msg)}\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
match msg:
|
||||
|
@ -952,11 +977,19 @@ async def process_messages(
|
|||
kwargs=kwargs, # type-spec this? see `msg.types`
|
||||
uid=actorid,
|
||||
):
|
||||
log.runtime(
|
||||
'Handling RPC `Start` request from\n'
|
||||
f'peer: {actorid}\n'
|
||||
'\n'
|
||||
f'=> {ns}.{funcname}({kwargs})\n'
|
||||
start_status: str = (
|
||||
'Handling RPC `Start` request\n'
|
||||
f'<= peer: {actorid}\n\n'
|
||||
f' |_{chan}\n'
|
||||
f' |_cid: {cid}\n\n'
|
||||
# f' |_{ns}.{funcname}({kwargs})\n'
|
||||
f'>> {actor.uid}\n'
|
||||
f' |_{actor}\n'
|
||||
f' -> nsp: `{ns}.{funcname}({kwargs})`\n'
|
||||
|
||||
# f' |_{ns}.{funcname}({kwargs})\n\n'
|
||||
|
||||
# f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
|
||||
# runtime-internal endpoint: `Actor.<funcname>`
|
||||
|
@ -985,6 +1018,10 @@ async def process_messages(
|
|||
await chan.send(err_msg)
|
||||
continue
|
||||
|
||||
start_status += (
|
||||
f' -> func: {func}\n'
|
||||
)
|
||||
|
||||
# schedule a task for the requested RPC function
|
||||
# in the actor's main "service nursery".
|
||||
#
|
||||
|
@ -992,18 +1029,8 @@ async def process_messages(
|
|||
# supervision isolation? would avoid having to
|
||||
# manage RPC tasks individually in `._rpc_tasks`
|
||||
# table?
|
||||
log.runtime(
|
||||
f'Spawning task for RPC request\n'
|
||||
f'<= caller: {chan.uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
# ^-TODO-^ maddr style repr?
|
||||
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
|
||||
# f'cid="{cid[-16:]} .."\n\n'
|
||||
|
||||
f'=> {actor}\n'
|
||||
f' |_cid: {cid}\n'
|
||||
f' |>> {func}()\n'
|
||||
)
|
||||
start_status += ' -> scheduling new task..\n'
|
||||
log.runtime(start_status)
|
||||
try:
|
||||
ctx: Context = await actor._service_n.start(
|
||||
partial(
|
||||
|
@ -1031,8 +1058,9 @@ async def process_messages(
|
|||
# scoped exception from ``_invoke()`` itself.
|
||||
if isinstance(err := ctx, Exception):
|
||||
log.warning(
|
||||
'Task for RPC failed?'
|
||||
f'|_ {func}()\n\n'
|
||||
start_status
|
||||
+
|
||||
' -> task for RPC failed?\n\n'
|
||||
f'{err}'
|
||||
)
|
||||
continue
|
||||
|
@ -1097,25 +1125,24 @@ async def process_messages(
|
|||
parent_chan=chan,
|
||||
)
|
||||
|
||||
except (
|
||||
TransportClosed,
|
||||
):
|
||||
except TransportClosed:
|
||||
# channels "breaking" (for TCP streams by EOF or 104
|
||||
# connection-reset) is ok since we don't have a teardown
|
||||
# handshake for them (yet) and instead we simply bail out of
|
||||
# the message loop and expect the teardown sequence to clean
|
||||
# up..
|
||||
# TODO: add a teardown handshake? and,
|
||||
#
|
||||
# TODO: maybe add a teardown handshake? and,
|
||||
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
|
||||
# -[ ] figure out how this will break with other transports?
|
||||
log.runtime(
|
||||
f'channel closed abruptly with\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'IPC channel closed abruptly\n'
|
||||
f'<=x peer: {chan.uid}\n'
|
||||
f' |_{chan.raddr}\n'
|
||||
)
|
||||
|
||||
# transport **WAS** disconnected
|
||||
return True
|
||||
return (True, msg)
|
||||
|
||||
except (
|
||||
Exception,
|
||||
|
@ -1152,12 +1179,17 @@ async def process_messages(
|
|||
|
||||
finally:
|
||||
# msg debugging for when he machinery is brokey
|
||||
log.runtime(
|
||||
if msg is None:
|
||||
message: str = 'Exiting IPC msg loop without receiving a msg?'
|
||||
else:
|
||||
message: str = (
|
||||
'Exiting IPC msg loop with final msg\n\n'
|
||||
f'<= peer: {chan.uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
f'{pformat(msg)}\n\n'
|
||||
f'{pretty_struct.pformat(msg)}'
|
||||
)
|
||||
|
||||
log.runtime(message)
|
||||
|
||||
# transport **WAS NOT** disconnected
|
||||
return False
|
||||
return (False, msg)
|
||||
|
|
|
@ -49,6 +49,7 @@ from pprint import pformat
|
|||
import signal
|
||||
import sys
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
@ -68,7 +69,7 @@ from tractor.msg import (
|
|||
pretty_struct,
|
||||
NamespacePath,
|
||||
types as msgtypes,
|
||||
Msg,
|
||||
MsgType,
|
||||
)
|
||||
from ._ipc import Channel
|
||||
from ._context import (
|
||||
|
@ -96,19 +97,6 @@ from ._rpc import (
|
|||
process_messages,
|
||||
try_ship_error_to_remote,
|
||||
)
|
||||
# from tractor.msg.types import (
|
||||
# Aid,
|
||||
# SpawnSpec,
|
||||
# Start,
|
||||
# StartAck,
|
||||
# Started,
|
||||
# Yield,
|
||||
# Stop,
|
||||
# Return,
|
||||
# Error,
|
||||
# )
|
||||
|
||||
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -315,29 +303,32 @@ class Actor:
|
|||
self._reg_addrs = addrs
|
||||
|
||||
async def wait_for_peer(
|
||||
self, uid: tuple[str, str]
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
|
||||
) -> tuple[trio.Event, Channel]:
|
||||
'''
|
||||
Wait for a connection back from a spawned actor with a `uid`
|
||||
using a `trio.Event` for sync.
|
||||
Wait for a connection back from a (spawned sub-)actor with
|
||||
a `uid` using a `trio.Event` for sync.
|
||||
|
||||
'''
|
||||
log.runtime(f"Waiting for peer {uid} to connect")
|
||||
log.debug(f'Waiting for peer {uid!r} to connect')
|
||||
event = self._peer_connected.setdefault(uid, trio.Event())
|
||||
await event.wait()
|
||||
log.runtime(f"{uid} successfully connected back to us")
|
||||
log.debug(f'{uid!r} successfully connected back to us')
|
||||
return event, self._peers[uid][-1]
|
||||
|
||||
def load_modules(
|
||||
self,
|
||||
debug_mode: bool = False,
|
||||
# debug_mode: bool = False,
|
||||
) -> None:
|
||||
'''
|
||||
Load enabled RPC py-modules locally (after process fork/spawn).
|
||||
Load explicitly enabled python modules from local fs after
|
||||
process spawn.
|
||||
|
||||
Since this actor may be spawned on a different machine from
|
||||
the original nursery we need to try and load the local module
|
||||
code (presuming it exists).
|
||||
code manually (presuming it exists).
|
||||
|
||||
'''
|
||||
try:
|
||||
|
@ -350,16 +341,21 @@ class Actor:
|
|||
_mp_fixup_main._fixup_main_from_path(
|
||||
parent_data['init_main_from_path'])
|
||||
|
||||
status: str = 'Attempting to import enabled modules:\n'
|
||||
for modpath, filepath in self.enable_modules.items():
|
||||
# XXX append the allowed module to the python path which
|
||||
# should allow for relative (at least downward) imports.
|
||||
sys.path.append(os.path.dirname(filepath))
|
||||
log.runtime(f"Attempting to import {modpath}@{filepath}")
|
||||
mod = importlib.import_module(modpath)
|
||||
status += (
|
||||
f'|_{modpath!r} -> {filepath!r}\n'
|
||||
)
|
||||
mod: ModuleType = importlib.import_module(modpath)
|
||||
self._mods[modpath] = mod
|
||||
if modpath == '__main__':
|
||||
self._mods['__mp_main__'] = mod
|
||||
|
||||
log.runtime(status)
|
||||
|
||||
except ModuleNotFoundError:
|
||||
# it is expected the corresponding `ModuleNotExposed` error
|
||||
# will be raised later
|
||||
|
@ -413,21 +409,23 @@ class Actor:
|
|||
chan = Channel.from_stream(stream)
|
||||
their_uid: tuple[str, str]|None = chan.uid
|
||||
|
||||
con_msg: str = ''
|
||||
if their_uid:
|
||||
con_status: str = ''
|
||||
|
||||
# TODO: remove this branch since can never happen?
|
||||
# NOTE: `.uid` is only set after first contact
|
||||
con_msg = (
|
||||
'IPC Re-connection from already known peer? '
|
||||
if their_uid:
|
||||
con_status = (
|
||||
'IPC Re-connection from already known peer?\n'
|
||||
)
|
||||
else:
|
||||
con_msg = (
|
||||
'New IPC connection to us '
|
||||
con_status = (
|
||||
'New inbound IPC connection <=\n'
|
||||
)
|
||||
|
||||
con_msg += (
|
||||
f'<= @{chan.raddr}\n'
|
||||
con_status += (
|
||||
f'|_{chan}\n'
|
||||
# f' |_@{chan.raddr}\n\n'
|
||||
# ^-TODO-^ remove since alfready in chan.__repr__()?
|
||||
)
|
||||
# send/receive initial handshake response
|
||||
try:
|
||||
|
@ -447,13 +445,13 @@ class Actor:
|
|||
# a bound listener on the "arbiter" addr. the reset will be
|
||||
# because the handshake was never meant took place.
|
||||
log.warning(
|
||||
con_msg
|
||||
con_status
|
||||
+
|
||||
' -> But failed to handshake? Ignoring..\n'
|
||||
)
|
||||
return
|
||||
|
||||
con_msg += (
|
||||
con_status += (
|
||||
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
|
||||
)
|
||||
# IPC connection tracking for both peers and new children:
|
||||
|
@ -466,7 +464,7 @@ class Actor:
|
|||
None,
|
||||
)
|
||||
if event:
|
||||
con_msg += (
|
||||
con_status += (
|
||||
' -> Waking subactor spawn waiters: '
|
||||
f'{event.statistics().tasks_waiting}\n'
|
||||
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
||||
|
@ -477,7 +475,7 @@ class Actor:
|
|||
event.set()
|
||||
|
||||
else:
|
||||
con_msg += (
|
||||
con_status += (
|
||||
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
||||
) # type: ignore
|
||||
|
||||
|
@ -491,13 +489,18 @@ class Actor:
|
|||
# TODO: can we just use list-ref directly?
|
||||
chans.append(chan)
|
||||
|
||||
log.runtime(con_msg)
|
||||
con_status += ' -> Entering RPC msg loop..\n'
|
||||
log.runtime(con_status)
|
||||
|
||||
# Begin channel management - respond to remote requests and
|
||||
# process received reponses.
|
||||
disconnected: bool = False
|
||||
last_msg: MsgType
|
||||
try:
|
||||
disconnected: bool = await process_messages(
|
||||
(
|
||||
disconnected,
|
||||
last_msg,
|
||||
) = await process_messages(
|
||||
self,
|
||||
chan,
|
||||
)
|
||||
|
@ -598,16 +601,24 @@ class Actor:
|
|||
# that the IPC layer may have failed
|
||||
# unexpectedly since it may be the cause of
|
||||
# other downstream errors.
|
||||
entry = local_nursery._children.get(uid)
|
||||
entry: tuple|None = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and poll() is None
|
||||
and
|
||||
poll() is None # proc still alive
|
||||
):
|
||||
log.cancel(
|
||||
# TODO: change log level based on
|
||||
# detecting whether chan was created for
|
||||
# ephemeral `.register_actor()` request!
|
||||
# -[ ] also, that should be avoidable by
|
||||
# re-using any existing chan from the
|
||||
# `._discovery.get_registry()` call as
|
||||
# well..
|
||||
log.runtime(
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||
|
@ -616,17 +627,17 @@ class Actor:
|
|||
|
||||
# ``Channel`` teardown and closure sequence
|
||||
# drop ref to channel so it can be gc-ed and disconnected
|
||||
log.runtime(
|
||||
f'Disconnected IPC channel:\n'
|
||||
f'uid: {chan.uid}\n'
|
||||
f'|_{pformat(chan)}\n'
|
||||
con_teardown_status: str = (
|
||||
f'IPC channel disconnected:\n'
|
||||
f'<=x uid: {chan.uid}\n'
|
||||
f' |_{pformat(chan)}\n\n'
|
||||
)
|
||||
chans.remove(chan)
|
||||
|
||||
# TODO: do we need to be this pedantic?
|
||||
if not chans:
|
||||
log.runtime(
|
||||
f'No more channels with {chan.uid}'
|
||||
con_teardown_status += (
|
||||
f'-> No more channels with {chan.uid}'
|
||||
)
|
||||
self._peers.pop(uid, None)
|
||||
|
||||
|
@ -640,15 +651,16 @@ class Actor:
|
|||
f' |_[{i}] {pformat(chan)}\n'
|
||||
)
|
||||
|
||||
log.runtime(
|
||||
f'Remaining IPC {len(self._peers)} peers:\n'
|
||||
+ peers_str
|
||||
con_teardown_status += (
|
||||
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
|
||||
)
|
||||
|
||||
# No more channels to other actors (at all) registered
|
||||
# as connected.
|
||||
if not self._peers:
|
||||
log.runtime("Signalling no more peer channel connections")
|
||||
con_teardown_status += (
|
||||
'Signalling no more peer channel connections'
|
||||
)
|
||||
self._no_more_peers.set()
|
||||
|
||||
# NOTE: block this actor from acquiring the
|
||||
|
@ -723,13 +735,16 @@ class Actor:
|
|||
# TODO: figure out why this breaks tests..
|
||||
db_cs.cancel()
|
||||
|
||||
log.runtime(con_teardown_status)
|
||||
# finally block closure
|
||||
|
||||
# TODO: rename to `._deliver_payload()` since this handles
|
||||
# more then just `result` msgs now obvi XD
|
||||
async def _deliver_ctx_payload(
|
||||
self,
|
||||
chan: Channel,
|
||||
cid: str,
|
||||
msg: Msg|MsgTypeError,
|
||||
msg: MsgType|MsgTypeError,
|
||||
|
||||
) -> None|bool:
|
||||
'''
|
||||
|
@ -754,7 +769,7 @@ class Actor:
|
|||
# XXX don't need right since it's always in msg?
|
||||
# f'=> cid: {cid}\n\n'
|
||||
|
||||
f'{pretty_struct.Struct.pformat(msg)}\n'
|
||||
f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
return
|
||||
|
||||
|
@ -896,9 +911,11 @@ class Actor:
|
|||
cid=cid,
|
||||
)
|
||||
log.runtime(
|
||||
'Sending RPC start msg\n\n'
|
||||
'Sending RPC `Start`\n\n'
|
||||
f'=> peer: {chan.uid}\n'
|
||||
f' |_ {ns}.{func}({kwargs})\n'
|
||||
f' |_ {ns}.{func}({kwargs})\n\n'
|
||||
|
||||
f'{pretty_struct.pformat(msg)}'
|
||||
)
|
||||
await chan.send(msg)
|
||||
|
||||
|
@ -955,31 +972,29 @@ class Actor:
|
|||
|
||||
if self._spawn_method == "trio":
|
||||
|
||||
# Receive runtime state from our parent
|
||||
# parent_data: dict[str, Any]
|
||||
# parent_data = await chan.recv()
|
||||
|
||||
# TODO: maybe we should just wrap this directly
|
||||
# in a `Actor.spawn_info: SpawnInfo` struct?
|
||||
# Receive post-spawn runtime state from our parent.
|
||||
spawnspec: msgtypes.SpawnSpec = await chan.recv()
|
||||
self._spawn_spec = spawnspec
|
||||
|
||||
log.runtime(
|
||||
'Received runtime spec from parent:\n\n'
|
||||
|
||||
# TODO: eventually all these msgs as
|
||||
# `msgspec.Struct` with a special mode that
|
||||
# pformats them in multi-line mode, BUT only
|
||||
# if "trace"/"util" mode is enabled?
|
||||
log.runtime(
|
||||
'Received runtime spec from parent:\n\n'
|
||||
f'{pformat(spawnspec)}\n'
|
||||
f'{pretty_struct.pformat(spawnspec)}\n'
|
||||
)
|
||||
# accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
|
||||
accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
|
||||
|
||||
# rvs = parent_data.pop('_runtime_vars')
|
||||
rvs = spawnspec._runtime_vars
|
||||
# TODO: another `Struct` for rtvs..
|
||||
rvs: dict[str, Any] = spawnspec._runtime_vars
|
||||
if rvs['_debug_mode']:
|
||||
try:
|
||||
log.info(
|
||||
# TODO: maybe return some status msgs upward
|
||||
# to that we can emit them in `con_status`
|
||||
# instead?
|
||||
log.devx(
|
||||
'Enabling `stackscope` traces on SIGUSR1'
|
||||
)
|
||||
from .devx import enable_stack_on_sig
|
||||
|
@ -989,7 +1004,6 @@ class Actor:
|
|||
'`stackscope` not installed for use in debug mode!'
|
||||
)
|
||||
|
||||
log.runtime(f'Runtime vars are: {rvs}')
|
||||
rvs['_is_root'] = False
|
||||
_state._runtime_vars.update(rvs)
|
||||
|
||||
|
@ -1006,18 +1020,12 @@ class Actor:
|
|||
for val in spawnspec.reg_addrs
|
||||
]
|
||||
|
||||
# for attr, value in parent_data.items():
|
||||
# TODO: better then monkey patching..
|
||||
# -[ ] maybe read the actual f#$-in `._spawn_spec` XD
|
||||
for _, attr, value in pretty_struct.iter_fields(
|
||||
spawnspec,
|
||||
):
|
||||
setattr(self, attr, value)
|
||||
# if (
|
||||
# attr == 'reg_addrs'
|
||||
# and value
|
||||
# ):
|
||||
# self.reg_addrs = [tuple(val) for val in value]
|
||||
# else:
|
||||
# setattr(self, attr, value)
|
||||
|
||||
return (
|
||||
chan,
|
||||
|
@ -1026,12 +1034,11 @@ class Actor:
|
|||
|
||||
except OSError: # failed to connect
|
||||
log.warning(
|
||||
f'Failed to connect to parent!?\n\n'
|
||||
'Closing IPC [TCP] transport server to\n'
|
||||
f'{parent_addr}\n'
|
||||
f'Failed to connect to spawning parent actor!?\n'
|
||||
f'x=> {parent_addr}\n'
|
||||
f'|_{self}\n\n'
|
||||
)
|
||||
await self.cancel(chan=None) # self cancel
|
||||
await self.cancel(req_chan=None) # self cancel
|
||||
raise
|
||||
|
||||
async def _serve_forever(
|
||||
|
@ -1109,8 +1116,7 @@ class Actor:
|
|||
# chan whose lifetime limits the lifetime of its remotely
|
||||
# requested and locally spawned RPC tasks - similar to the
|
||||
# supervision semantics of a nursery wherein the actual
|
||||
# implementation does start all such tasks in
|
||||
# a sub-nursery.
|
||||
# implementation does start all such tasks in a sub-nursery.
|
||||
req_chan: Channel|None,
|
||||
|
||||
) -> bool:
|
||||
|
@ -1151,7 +1157,7 @@ class Actor:
|
|||
# other) repr fields instead of doing this all manual..
|
||||
msg: str = (
|
||||
f'Runtime cancel request from {requester_type}:\n\n'
|
||||
f'<= .cancel(): {requesting_uid}\n'
|
||||
f'<= .cancel(): {requesting_uid}\n\n'
|
||||
)
|
||||
|
||||
# TODO: what happens here when we self-cancel tho?
|
||||
|
@ -1166,8 +1172,8 @@ class Actor:
|
|||
dbcs = _debug.DebugStatus.req_cs
|
||||
if dbcs is not None:
|
||||
msg += (
|
||||
'>> Cancelling active debugger request..\n'
|
||||
f'|_{_debug.Lock}\n'
|
||||
'-> Cancelling active debugger request..\n'
|
||||
f'|_{_debug.Lock.pformat()}'
|
||||
)
|
||||
dbcs.cancel()
|
||||
|
||||
|
@ -1418,7 +1424,12 @@ class Actor:
|
|||
|
||||
'''
|
||||
if self._server_n:
|
||||
log.runtime("Shutting down channel server")
|
||||
# TODO: obvi a different server type when we eventually
|
||||
# support some others XD
|
||||
server_prot: str = 'TCP'
|
||||
log.runtime(
|
||||
f'Cancelling {server_prot} server'
|
||||
)
|
||||
self._server_n.cancel_scope.cancel()
|
||||
return True
|
||||
|
||||
|
@ -1602,6 +1613,7 @@ async def async_main(
|
|||
assert accept_addrs
|
||||
|
||||
try:
|
||||
# TODO: why is this not with the root nursery?
|
||||
actor._server_n = await service_nursery.start(
|
||||
partial(
|
||||
actor._serve_forever,
|
||||
|
@ -1886,13 +1898,13 @@ class Arbiter(Actor):
|
|||
sockaddrs: list[tuple[str, int]] = []
|
||||
sockaddr: tuple[str, int]
|
||||
|
||||
for (aname, _), sockaddr in self._registry.items():
|
||||
log.runtime(
|
||||
f'Actor mailbox info:\n'
|
||||
f'aname: {aname}\n'
|
||||
f'sockaddr: {sockaddr}\n'
|
||||
mailbox_info: str = 'Actor registry contact infos:\n'
|
||||
for uid, sockaddr in self._registry.items():
|
||||
mailbox_info += (
|
||||
f'|_uid: {uid}\n'
|
||||
f'|_sockaddr: {sockaddr}\n\n'
|
||||
)
|
||||
if name == aname:
|
||||
if name == uid[0]:
|
||||
sockaddrs.append(sockaddr)
|
||||
|
||||
if not sockaddrs:
|
||||
|
@ -1904,6 +1916,7 @@ class Arbiter(Actor):
|
|||
if not isinstance(uid, trio.Event):
|
||||
sockaddrs.append(self._registry[uid])
|
||||
|
||||
log.runtime(mailbox_info)
|
||||
return sockaddrs
|
||||
|
||||
async def register_actor(
|
||||
|
|
|
@ -451,10 +451,9 @@ async def trio_proc(
|
|||
proc: trio.Process|None = None
|
||||
try:
|
||||
try:
|
||||
# TODO: needs ``trio_typing`` patch?
|
||||
proc = await trio.lowlevel.open_process(spawn_cmd)
|
||||
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd)
|
||||
log.runtime(
|
||||
'Started new sub-proc\n'
|
||||
'Started new child\n'
|
||||
f'|_{proc}\n'
|
||||
)
|
||||
|
||||
|
|
|
@ -19,13 +19,19 @@ Per process state
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from contextvars import (
|
||||
ContextVar,
|
||||
)
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
from trio.lowlevel import current_task
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
from ._context import Context
|
||||
|
||||
|
||||
_current_actor: Actor|None = None # type: ignore # noqa
|
||||
|
@ -110,3 +116,20 @@ def debug_mode() -> bool:
|
|||
|
||||
def is_root_process() -> bool:
|
||||
return _runtime_vars['_is_root']
|
||||
|
||||
|
||||
_ctxvar_Context: ContextVar[Context] = ContextVar(
|
||||
'ipc_context',
|
||||
default=None,
|
||||
)
|
||||
|
||||
|
||||
def current_ipc_ctx() -> Context:
|
||||
ctx: Context = _ctxvar_Context.get()
|
||||
if not ctx:
|
||||
from ._exceptions import InternalError
|
||||
raise InternalError(
|
||||
'No IPC context has been allocated for this task yet?\n'
|
||||
f'|_{current_task()}\n'
|
||||
)
|
||||
return ctx
|
||||
|
|
|
@ -364,14 +364,10 @@ class MsgStream(trio.abc.Channel):
|
|||
|
||||
if not self._eoc:
|
||||
message: str = (
|
||||
f'Context stream closed by {self._ctx.side!r}\n'
|
||||
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
|
||||
f'|_{self}\n'
|
||||
)
|
||||
log.cancel(
|
||||
'Stream self-closed before receiving EoC\n\n'
|
||||
+
|
||||
message
|
||||
)
|
||||
log.cancel(message)
|
||||
self._eoc = trio.EndOfChannel(message)
|
||||
|
||||
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
||||
|
|
|
@ -30,7 +30,13 @@ from ._debug import (
|
|||
open_crash_handler as open_crash_handler,
|
||||
maybe_open_crash_handler as maybe_open_crash_handler,
|
||||
post_mortem as post_mortem,
|
||||
mk_pdb as mk_pdb,
|
||||
)
|
||||
from ._stackscope import (
|
||||
enable_stack_on_sig as enable_stack_on_sig,
|
||||
)
|
||||
from .pformat import (
|
||||
add_div as add_div,
|
||||
pformat_caller_frame as pformat_caller_frame,
|
||||
pformat_boxed_tb as pformat_boxed_tb,
|
||||
)
|
||||
|
|
|
@ -23,6 +23,8 @@ from __future__ import annotations
|
|||
import inspect
|
||||
# import msgspec
|
||||
# from pprint import pformat
|
||||
import textwrap
|
||||
import traceback
|
||||
from types import (
|
||||
FrameType,
|
||||
FunctionType,
|
||||
|
@ -175,3 +177,103 @@ def find_caller_info(
|
|||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def pformat_boxed_tb(
|
||||
tb_str: str,
|
||||
fields_str: str|None = None,
|
||||
field_prefix: str = ' |_',
|
||||
|
||||
tb_box_indent: int|None = None,
|
||||
tb_body_indent: int = 1,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Create a "boxed" looking traceback string.
|
||||
|
||||
Useful for emphasizing traceback text content as being an
|
||||
embedded attribute of some other object (like
|
||||
a `RemoteActorError` or other boxing remote error shuttle
|
||||
container).
|
||||
|
||||
Any other parent/container "fields" can be passed in the
|
||||
`fields_str` input along with other prefix/indent settings.
|
||||
|
||||
'''
|
||||
if (
|
||||
fields_str
|
||||
and
|
||||
field_prefix
|
||||
):
|
||||
fields: str = textwrap.indent(
|
||||
fields_str,
|
||||
prefix=field_prefix,
|
||||
)
|
||||
else:
|
||||
fields = fields_str or ''
|
||||
|
||||
tb_body = tb_str
|
||||
if tb_body_indent:
|
||||
tb_body: str = textwrap.indent(
|
||||
tb_str,
|
||||
prefix=tb_body_indent * ' ',
|
||||
)
|
||||
|
||||
tb_box: str = (
|
||||
|
||||
# orig
|
||||
# f' |\n'
|
||||
# f' ------ - ------\n\n'
|
||||
# f'{tb_str}\n'
|
||||
# f' ------ - ------\n'
|
||||
# f' _|\n'
|
||||
|
||||
f'|\n'
|
||||
f' ------ - ------\n\n'
|
||||
# f'{tb_str}\n'
|
||||
f'{tb_body}'
|
||||
f' ------ - ------\n'
|
||||
f'_|\n'
|
||||
)
|
||||
tb_box_indent: str = (
|
||||
tb_box_indent
|
||||
or
|
||||
1
|
||||
|
||||
# (len(field_prefix))
|
||||
# ? ^-TODO-^ ? if you wanted another indent level
|
||||
)
|
||||
if tb_box_indent > 0:
|
||||
tb_box: str = textwrap.indent(
|
||||
tb_box,
|
||||
prefix=tb_box_indent * ' ',
|
||||
)
|
||||
|
||||
return (
|
||||
fields
|
||||
+
|
||||
tb_box
|
||||
)
|
||||
|
||||
|
||||
def pformat_caller_frame(
|
||||
stack_limit: int = 1,
|
||||
box_tb: bool = True,
|
||||
) -> str:
|
||||
'''
|
||||
Capture and return the traceback text content from
|
||||
`stack_limit` call frames up.
|
||||
|
||||
'''
|
||||
tb_str: str = (
|
||||
'\n'.join(
|
||||
traceback.format_stack(limit=stack_limit)
|
||||
)
|
||||
)
|
||||
if box_tb:
|
||||
tb_str: str = pformat_boxed_tb(
|
||||
tb_str=tb_str,
|
||||
field_prefix=' ',
|
||||
indent='',
|
||||
)
|
||||
return tb_str
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -65,7 +65,7 @@ def dump_task_tree() -> None:
|
|||
level='cancel',
|
||||
)
|
||||
actor: Actor = _state.current_actor()
|
||||
log.pdb(
|
||||
log.devx(
|
||||
f'Dumping `stackscope` tree for actor\n'
|
||||
f'{actor.name}: {actor}\n'
|
||||
f' |_{mp.current_process()}\n\n'
|
||||
|
@ -104,7 +104,7 @@ def signal_handler(
|
|||
subproc: ProcessType
|
||||
subactor: Actor
|
||||
for subactor, subproc, _ in an._children.values():
|
||||
log.pdb(
|
||||
log.devx(
|
||||
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
|
||||
f'{subactor}\n'
|
||||
f' |_{subproc}\n'
|
||||
|
|
|
@ -0,0 +1,168 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Pretty formatters for use throughout the code base.
|
||||
Mostly handy for logging and exception message content.
|
||||
|
||||
'''
|
||||
import textwrap
|
||||
import traceback
|
||||
|
||||
from trio import CancelScope
|
||||
|
||||
|
||||
def add_div(
|
||||
message: str,
|
||||
div_str: str = '------ - ------',
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Add a "divider string" to the input `message` with
|
||||
a little math to center it underneath.
|
||||
|
||||
'''
|
||||
div_offset: int = (
|
||||
round(len(message)/2)+1
|
||||
-
|
||||
round(len(div_str)/2)+1
|
||||
)
|
||||
div_str: str = (
|
||||
'\n' + ' '*div_offset + f'{div_str}\n'
|
||||
)
|
||||
return div_str
|
||||
|
||||
|
||||
def pformat_boxed_tb(
|
||||
tb_str: str,
|
||||
fields_str: str|None = None,
|
||||
field_prefix: str = ' |_',
|
||||
|
||||
tb_box_indent: int|None = None,
|
||||
tb_body_indent: int = 1,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Create a "boxed" looking traceback string.
|
||||
|
||||
Useful for emphasizing traceback text content as being an
|
||||
embedded attribute of some other object (like
|
||||
a `RemoteActorError` or other boxing remote error shuttle
|
||||
container).
|
||||
|
||||
Any other parent/container "fields" can be passed in the
|
||||
`fields_str` input along with other prefix/indent settings.
|
||||
|
||||
'''
|
||||
if (
|
||||
fields_str
|
||||
and
|
||||
field_prefix
|
||||
):
|
||||
fields: str = textwrap.indent(
|
||||
fields_str,
|
||||
prefix=field_prefix,
|
||||
)
|
||||
else:
|
||||
fields = fields_str or ''
|
||||
|
||||
tb_body = tb_str
|
||||
if tb_body_indent:
|
||||
tb_body: str = textwrap.indent(
|
||||
tb_str,
|
||||
prefix=tb_body_indent * ' ',
|
||||
)
|
||||
|
||||
tb_box: str = (
|
||||
f'|\n'
|
||||
f' ------ - ------\n'
|
||||
f'{tb_body}'
|
||||
f' ------ - ------\n'
|
||||
f'_|\n'
|
||||
)
|
||||
tb_box_indent: str = (
|
||||
tb_box_indent
|
||||
or
|
||||
1
|
||||
|
||||
# (len(field_prefix))
|
||||
# ? ^-TODO-^ ? if you wanted another indent level
|
||||
)
|
||||
if tb_box_indent > 0:
|
||||
tb_box: str = textwrap.indent(
|
||||
tb_box,
|
||||
prefix=tb_box_indent * ' ',
|
||||
)
|
||||
|
||||
return (
|
||||
fields
|
||||
+
|
||||
tb_box
|
||||
)
|
||||
|
||||
|
||||
def pformat_caller_frame(
|
||||
stack_limit: int = 1,
|
||||
box_tb: bool = True,
|
||||
) -> str:
|
||||
'''
|
||||
Capture and return the traceback text content from
|
||||
`stack_limit` call frames up.
|
||||
|
||||
'''
|
||||
tb_str: str = (
|
||||
'\n'.join(
|
||||
traceback.format_stack(limit=stack_limit)
|
||||
)
|
||||
)
|
||||
if box_tb:
|
||||
tb_str: str = pformat_boxed_tb(
|
||||
tb_str=tb_str,
|
||||
field_prefix=' ',
|
||||
indent='',
|
||||
)
|
||||
return tb_str
|
||||
|
||||
|
||||
def pformat_cs(
|
||||
cs: CancelScope,
|
||||
var_name: str = 'cs',
|
||||
field_prefix: str = ' |_',
|
||||
) -> str:
|
||||
'''
|
||||
Pretty format info about a `trio.CancelScope` including most
|
||||
of its public state and `._cancel_status`.
|
||||
|
||||
The output can be modified to show a "var name" for the
|
||||
instance as a field prefix, just a simple str before each
|
||||
line more or less.
|
||||
|
||||
'''
|
||||
|
||||
fields: str = textwrap.indent(
|
||||
(
|
||||
f'cancel_called = {cs.cancel_called}\n'
|
||||
f'cancelled_caught = {cs.cancelled_caught}\n'
|
||||
f'_cancel_status = {cs._cancel_status}\n'
|
||||
f'shield = {cs.shield}\n'
|
||||
),
|
||||
prefix=field_prefix,
|
||||
)
|
||||
return (
|
||||
f'{var_name}: {cs}\n'
|
||||
+
|
||||
fields
|
||||
)
|
|
@ -21,6 +21,11 @@ Log like a forester!
|
|||
from collections.abc import Mapping
|
||||
import sys
|
||||
import logging
|
||||
from logging import (
|
||||
LoggerAdapter,
|
||||
Logger,
|
||||
StreamHandler,
|
||||
)
|
||||
import colorlog # type: ignore
|
||||
|
||||
import trio
|
||||
|
@ -53,6 +58,7 @@ LEVELS: dict[str, int] = {
|
|||
'RUNTIME': 15,
|
||||
'CANCEL': 16,
|
||||
'PDB': 500,
|
||||
'DEVX': 600,
|
||||
}
|
||||
# _custom_levels: set[str] = {
|
||||
# lvlname.lower for lvlname in LEVELS.keys()
|
||||
|
@ -62,6 +68,7 @@ STD_PALETTE = {
|
|||
'CRITICAL': 'red',
|
||||
'ERROR': 'red',
|
||||
'PDB': 'white',
|
||||
'DEVX': 'cyan',
|
||||
'WARNING': 'yellow',
|
||||
'INFO': 'green',
|
||||
'CANCEL': 'yellow',
|
||||
|
@ -78,7 +85,7 @@ BOLD_PALETTE = {
|
|||
|
||||
# TODO: this isn't showing the correct '{filename}'
|
||||
# as it did before..
|
||||
class StackLevelAdapter(logging.LoggerAdapter):
|
||||
class StackLevelAdapter(LoggerAdapter):
|
||||
|
||||
def transport(
|
||||
self,
|
||||
|
@ -86,7 +93,8 @@ class StackLevelAdapter(logging.LoggerAdapter):
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
IPC level msg-ing.
|
||||
IPC transport level msg IO; generally anything below
|
||||
`._ipc.Channel` and friends.
|
||||
|
||||
'''
|
||||
return self.log(5, msg)
|
||||
|
@ -102,7 +110,7 @@ class StackLevelAdapter(logging.LoggerAdapter):
|
|||
msg: str,
|
||||
) -> None:
|
||||
'''
|
||||
Cancellation logging, mostly for runtime reporting.
|
||||
Cancellation sequencing, mostly for runtime reporting.
|
||||
|
||||
'''
|
||||
return self.log(
|
||||
|
@ -116,11 +124,21 @@ class StackLevelAdapter(logging.LoggerAdapter):
|
|||
msg: str,
|
||||
) -> None:
|
||||
'''
|
||||
Debugger logging.
|
||||
`pdb`-REPL (debugger) related statuses.
|
||||
|
||||
'''
|
||||
return self.log(500, msg)
|
||||
|
||||
def devx(
|
||||
self,
|
||||
msg: str,
|
||||
) -> None:
|
||||
'''
|
||||
"Developer experience" sub-sys statuses.
|
||||
|
||||
'''
|
||||
return self.log(600, msg)
|
||||
|
||||
def log(
|
||||
self,
|
||||
level,
|
||||
|
@ -224,6 +242,7 @@ def get_logger(
|
|||
'''Return the package log or a sub-logger for ``name`` if provided.
|
||||
|
||||
'''
|
||||
log: Logger
|
||||
log = rlog = logging.getLogger(_root_name)
|
||||
|
||||
if (
|
||||
|
@ -278,7 +297,7 @@ def get_logger(
|
|||
def get_console_log(
|
||||
level: str | None = None,
|
||||
**kwargs,
|
||||
) -> logging.LoggerAdapter:
|
||||
) -> LoggerAdapter:
|
||||
'''Get the package logger and enable a handler which writes to stderr.
|
||||
|
||||
Yeah yeah, i know we can use ``DictConfig``. You do it.
|
||||
|
@ -303,7 +322,7 @@ def get_console_log(
|
|||
None,
|
||||
)
|
||||
):
|
||||
handler = logging.StreamHandler()
|
||||
handler = StreamHandler()
|
||||
formatter = colorlog.ColoredFormatter(
|
||||
LOG_FORMAT,
|
||||
datefmt=DATE_FORMAT,
|
||||
|
@ -323,3 +342,19 @@ def get_loglevel() -> str:
|
|||
|
||||
# global module logger for tractor itself
|
||||
log = get_logger('tractor')
|
||||
|
||||
|
||||
def at_least_level(
|
||||
log: Logger|LoggerAdapter,
|
||||
level: int|str,
|
||||
) -> bool:
|
||||
'''
|
||||
Predicate to test if a given level is active.
|
||||
|
||||
'''
|
||||
if isinstance(level, str):
|
||||
level: int = LEVELS[level.upper()]
|
||||
|
||||
if log.getEffectiveLevel() <= level:
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -25,12 +25,12 @@ from contextlib import (
|
|||
# asynccontextmanager as acm,
|
||||
contextmanager as cm,
|
||||
)
|
||||
from pprint import pformat
|
||||
from contextvars import ContextVar
|
||||
from typing import (
|
||||
Any,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
# Union,
|
||||
Union,
|
||||
)
|
||||
# ------ - ------
|
||||
from msgspec import (
|
||||
|
@ -63,7 +63,7 @@ from .types import (
|
|||
Started,
|
||||
Stop,
|
||||
Yield,
|
||||
# pretty_struct,
|
||||
pretty_struct,
|
||||
)
|
||||
|
||||
|
||||
|
@ -75,6 +75,9 @@ if TYPE_CHECKING:
|
|||
log = get_logger(__name__)
|
||||
|
||||
|
||||
_def_any_pldec: MsgDec = mk_dec()
|
||||
|
||||
|
||||
class PldRx(Struct):
|
||||
'''
|
||||
A "msg payload receiver".
|
||||
|
@ -101,10 +104,13 @@ class PldRx(Struct):
|
|||
'''
|
||||
# TODO: better to bind it here?
|
||||
# _rx_mc: trio.MemoryReceiveChannel
|
||||
_msgdec: MsgDec = mk_dec(spec=Any)
|
||||
|
||||
_pldec: MsgDec
|
||||
_ipc: Context|MsgStream|None = None
|
||||
|
||||
@property
|
||||
def pld_dec(self) -> MsgDec:
|
||||
return self._pldec
|
||||
|
||||
@cm
|
||||
def apply_to_ipc(
|
||||
self,
|
||||
|
@ -122,9 +128,29 @@ class PldRx(Struct):
|
|||
finally:
|
||||
self._ipc = None
|
||||
|
||||
@cm
|
||||
def limit_plds(
|
||||
self,
|
||||
spec: Union[Type[Struct]],
|
||||
|
||||
) -> MsgDec:
|
||||
'''
|
||||
Type-limit the loadable msg payloads via an applied
|
||||
`MsgDec` given an input spec, revert to prior decoder on
|
||||
exit.
|
||||
|
||||
'''
|
||||
orig_dec: MsgDec = self._pldec
|
||||
limit_dec: MsgDec = mk_dec(spec=spec)
|
||||
try:
|
||||
self._pldec = limit_dec
|
||||
yield limit_dec
|
||||
finally:
|
||||
self._pldec = orig_dec
|
||||
|
||||
@property
|
||||
def dec(self) -> msgpack.Decoder:
|
||||
return self._msgdec.dec
|
||||
return self._pldec.dec
|
||||
|
||||
def recv_pld_nowait(
|
||||
self,
|
||||
|
@ -135,9 +161,10 @@ class PldRx(Struct):
|
|||
ipc_msg: MsgType|None = None,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
|
||||
**kwargs,
|
||||
**dec_msg_kwargs,
|
||||
|
||||
) -> Any|Raw:
|
||||
__tracebackhide__: bool = True
|
||||
|
||||
msg: MsgType = (
|
||||
ipc_msg
|
||||
|
@ -150,6 +177,7 @@ class PldRx(Struct):
|
|||
msg,
|
||||
ctx=ctx,
|
||||
expect_msg=expect_msg,
|
||||
**dec_msg_kwargs,
|
||||
)
|
||||
|
||||
async def recv_pld(
|
||||
|
@ -157,14 +185,16 @@ class PldRx(Struct):
|
|||
ctx: Context,
|
||||
ipc_msg: MsgType|None = None,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
hide_tb: bool = True,
|
||||
|
||||
**kwargs
|
||||
**dec_msg_kwargs,
|
||||
|
||||
) -> Any|Raw:
|
||||
'''
|
||||
Receive a `MsgType`, then decode and return its `.pld` field.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
msg: MsgType = (
|
||||
ipc_msg
|
||||
or
|
||||
|
@ -173,16 +203,20 @@ class PldRx(Struct):
|
|||
await ctx._rx_chan.receive()
|
||||
)
|
||||
return self.dec_msg(
|
||||
msg,
|
||||
msg=msg,
|
||||
ctx=ctx,
|
||||
expect_msg=expect_msg,
|
||||
**dec_msg_kwargs,
|
||||
)
|
||||
|
||||
def dec_msg(
|
||||
self,
|
||||
msg: MsgType,
|
||||
ctx: Context,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
expect_msg: Type[MsgType]|None,
|
||||
|
||||
raise_error: bool = True,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> PayloadT|Raw:
|
||||
'''
|
||||
|
@ -190,6 +224,7 @@ class PldRx(Struct):
|
|||
return the value or raise an appropriate error.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
match msg:
|
||||
# payload-data shuttle msg; deliver the `.pld` value
|
||||
# directly to IPC (primitive) client-consumer code.
|
||||
|
@ -199,11 +234,12 @@ class PldRx(Struct):
|
|||
|Return(pld=pld) # termination phase
|
||||
):
|
||||
try:
|
||||
pld: PayloadT = self._msgdec.decode(pld)
|
||||
pld: PayloadT = self._pldec.decode(pld)
|
||||
log.runtime(
|
||||
'Decode msg payload\n\n'
|
||||
'Decoded msg payload\n\n'
|
||||
f'{msg}\n\n'
|
||||
f'{pld}\n'
|
||||
f'where payload is\n'
|
||||
f'|_pld={pld!r}\n'
|
||||
)
|
||||
return pld
|
||||
|
||||
|
@ -211,8 +247,9 @@ class PldRx(Struct):
|
|||
except ValidationError as src_err:
|
||||
msgterr: MsgTypeError = _mk_msg_type_err(
|
||||
msg=msg,
|
||||
codec=self._dec,
|
||||
codec=self.pld_dec,
|
||||
src_validation_error=src_err,
|
||||
is_invalid_payload=True,
|
||||
)
|
||||
msg: Error = pack_from_raise(
|
||||
local_err=msgterr,
|
||||
|
@ -237,8 +274,62 @@ class PldRx(Struct):
|
|||
|
||||
case Error():
|
||||
src_err = MessagingError(
|
||||
'IPC dialog termination by msg'
|
||||
'IPC ctx dialog terminated without `Return`-ing a result\n'
|
||||
f'Instead it raised {msg.boxed_type_str!r}!'
|
||||
)
|
||||
# XXX NOTE XXX another super subtle runtime-y thing..
|
||||
#
|
||||
# - when user code (transitively) calls into this
|
||||
# func (usually via a `Context/MsgStream` API) we
|
||||
# generally want errors to propagate immediately
|
||||
# and directly so that the user can define how it
|
||||
# wants to handle them.
|
||||
#
|
||||
# HOWEVER,
|
||||
#
|
||||
# - for certain runtime calling cases, we don't want to
|
||||
# directly raise since the calling code might have
|
||||
# special logic around whether to raise the error
|
||||
# or supress it silently (eg. a `ContextCancelled`
|
||||
# received from the far end which was requested by
|
||||
# this side, aka a self-cancel).
|
||||
#
|
||||
# SO, we offer a flag to control this.
|
||||
if not raise_error:
|
||||
return src_err
|
||||
|
||||
case Stop(cid=cid):
|
||||
message: str = (
|
||||
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
|
||||
f'{ctx.peer_side!r} peer ?\n'
|
||||
f'|_cid: {cid}\n\n'
|
||||
|
||||
f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
if ctx._stream is None:
|
||||
explain: str = (
|
||||
f'BUT, no `MsgStream` (was) open(ed) on this '
|
||||
f'{ctx.side!r}-side of the IPC ctx?\n'
|
||||
f'Maybe check your code for streaming phase race conditions?\n'
|
||||
)
|
||||
log.warning(
|
||||
message
|
||||
+
|
||||
explain
|
||||
)
|
||||
# let caller decide what to do when only one
|
||||
# side opened a stream, don't raise.
|
||||
return msg
|
||||
|
||||
else:
|
||||
explain: str = (
|
||||
'Received a `Stop` when it should NEVER be possible!?!?\n'
|
||||
)
|
||||
# TODO: this is constructed inside
|
||||
# `_raise_from_unexpected_msg()` but maybe we
|
||||
# should pass it in?
|
||||
# src_err = trio.EndOfChannel(explain)
|
||||
src_err = None
|
||||
|
||||
case _:
|
||||
src_err = InternalError(
|
||||
|
@ -246,6 +337,9 @@ class PldRx(Struct):
|
|||
f'{msg}\n'
|
||||
)
|
||||
|
||||
# TODO: maybe use the new `.add_note()` from 3.11?
|
||||
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
|
||||
#
|
||||
# fallthrough and raise from `src_err`
|
||||
_raise_from_unexpected_msg(
|
||||
ctx=ctx,
|
||||
|
@ -253,12 +347,18 @@ class PldRx(Struct):
|
|||
src_err=src_err,
|
||||
log=log,
|
||||
expect_msg=expect_msg,
|
||||
hide_tb=False,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
|
||||
async def recv_msg_w_pld(
|
||||
self,
|
||||
ipc: Context|MsgStream,
|
||||
expect_msg: MsgType,
|
||||
|
||||
# NOTE: generally speaking only for handling `Stop`-msgs that
|
||||
# arrive during a call to `drain_to_final_msg()` above!
|
||||
passthrough_non_pld_msgs: bool = True,
|
||||
**kwargs,
|
||||
|
||||
) -> tuple[MsgType, PayloadT]:
|
||||
'''
|
||||
|
@ -268,16 +368,102 @@ class PldRx(Struct):
|
|||
'''
|
||||
msg: MsgType = await ipc._rx_chan.receive()
|
||||
|
||||
if passthrough_non_pld_msgs:
|
||||
match msg:
|
||||
case Stop():
|
||||
return msg, None
|
||||
|
||||
# TODO: is there some way we can inject the decoded
|
||||
# payload into an existing output buffer for the original
|
||||
# msg instance?
|
||||
pld: PayloadT = self.dec_msg(
|
||||
msg,
|
||||
ctx=ipc,
|
||||
expect_msg=expect_msg,
|
||||
**kwargs,
|
||||
)
|
||||
return msg, pld
|
||||
|
||||
|
||||
# Always maintain a task-context-global `PldRx`
|
||||
_def_pld_rx: PldRx = PldRx(
|
||||
_pldec=_def_any_pldec,
|
||||
)
|
||||
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
|
||||
'pld_rx',
|
||||
default=_def_pld_rx,
|
||||
)
|
||||
|
||||
|
||||
def current_pldrx() -> PldRx:
|
||||
'''
|
||||
Return the current `trio.Task.context`'s msg-payload-receiver.
|
||||
|
||||
A payload receiver is the IPC-msg processing sub-sys which
|
||||
filters inter-actor-task communicated payload data, i.e. the
|
||||
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
|
||||
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
|
||||
up from `tractor`'s transport layer but BEFORE the data is
|
||||
yielded to application code, normally via an IPC primitive API
|
||||
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
|
||||
|
||||
Modification of the current payload spec via `limit_plds()`
|
||||
allows a `tractor` application to contextually filter IPC
|
||||
payload content with a type specification as supported by
|
||||
the interchange backend.
|
||||
|
||||
- for `msgspec` see <PUTLINKHERE>.
|
||||
|
||||
NOTE that the `PldRx` itself is a per-`Context` global sub-system
|
||||
that normally does not change other then the applied pld-spec
|
||||
for the current `trio.Task`.
|
||||
|
||||
'''
|
||||
# ctx: context = current_ipc_ctx()
|
||||
# return ctx._pld_rx
|
||||
return _ctxvar_PldRx.get()
|
||||
|
||||
|
||||
@cm
|
||||
def limit_plds(
|
||||
spec: Union[Type[Struct]],
|
||||
**kwargs,
|
||||
|
||||
) -> MsgDec:
|
||||
'''
|
||||
Apply a `MsgCodec` that will natively decode the SC-msg set's
|
||||
`Msg.pld: Union[Type[Struct]]` payload fields using
|
||||
tagged-unions of `msgspec.Struct`s from the `payload_types`
|
||||
for all IPC contexts in use by the current `trio.Task`.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
try:
|
||||
# sanity on orig settings
|
||||
orig_pldrx: PldRx = current_pldrx()
|
||||
orig_pldec: MsgDec = orig_pldrx.pld_dec
|
||||
|
||||
with orig_pldrx.limit_plds(
|
||||
spec=spec,
|
||||
**kwargs,
|
||||
) as pldec:
|
||||
log.info(
|
||||
'Applying payload-decoder\n\n'
|
||||
f'{pldec}\n'
|
||||
)
|
||||
yield pldec
|
||||
finally:
|
||||
log.info(
|
||||
'Reverted to previous payload-decoder\n\n'
|
||||
f'{orig_pldec}\n'
|
||||
)
|
||||
assert (
|
||||
(pldrx := current_pldrx()) is orig_pldrx
|
||||
and
|
||||
pldrx.pld_dec is orig_pldec
|
||||
)
|
||||
|
||||
|
||||
async def drain_to_final_msg(
|
||||
ctx: Context,
|
||||
|
||||
|
@ -308,67 +494,33 @@ async def drain_to_final_msg(
|
|||
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||
# from the far end.
|
||||
pre_result_drained: list[MsgType] = []
|
||||
return_msg: Return|None = None
|
||||
result_msg: Return|Error|None = None
|
||||
while not (
|
||||
ctx.maybe_error
|
||||
and not ctx._final_result_is_set()
|
||||
):
|
||||
try:
|
||||
# TODO: can remove?
|
||||
# await trio.lowlevel.checkpoint()
|
||||
|
||||
# NOTE: this REPL usage actually works here dawg! Bo
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# TODO: bad idea?
|
||||
# -[ ] wrap final outcome channel wait in a scope so
|
||||
# receive all msgs, scanning for either a final result
|
||||
# or error; the underlying call should never raise any
|
||||
# remote error directly!
|
||||
msg, pld = await ctx._pld_rx.recv_msg_w_pld(
|
||||
ipc=ctx,
|
||||
expect_msg=Return,
|
||||
raise_error=False,
|
||||
)
|
||||
# ^-TODO-^ some bad ideas?
|
||||
# -[ ] wrap final outcome .receive() in a scope so
|
||||
# it can be cancelled out of band if needed?
|
||||
#
|
||||
# with trio.CancelScope() as res_cs:
|
||||
# |_with trio.CancelScope() as res_cs:
|
||||
# ctx._res_scope = res_cs
|
||||
# msg: dict = await ctx._rx_chan.receive()
|
||||
# if res_cs.cancelled_caught:
|
||||
|
||||
# TODO: ensure there's no more hangs, debugging the
|
||||
# runtime pretty preaase!
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# TODO: can remove this finally?
|
||||
# we have no more need for the sync draining right
|
||||
# since we're can kinda guarantee the async
|
||||
# `.receive()` below will never block yah?
|
||||
#
|
||||
# if (
|
||||
# ctx._cancel_called and (
|
||||
# ctx.cancel_acked
|
||||
# # or ctx.chan._cancel_called
|
||||
# )
|
||||
# # or not ctx._final_result_is_set()
|
||||
# # ctx.outcome is not
|
||||
# # or ctx.chan._closed
|
||||
# ):
|
||||
# try:
|
||||
# msg: dict = await ctx._rx_chan.receive_nowait()()
|
||||
# except trio.WouldBlock:
|
||||
# log.warning(
|
||||
# 'When draining already `.cancel_called` ctx!\n'
|
||||
# 'No final msg arrived..\n'
|
||||
# )
|
||||
# break
|
||||
# else:
|
||||
# msg: dict = await ctx._rx_chan.receive()
|
||||
|
||||
# TODO: don't need it right jefe?
|
||||
# with trio.move_on_after(1) as cs:
|
||||
# if cs.cancelled_caught:
|
||||
# from .devx._debug import pause
|
||||
# -[ ] make sure pause points work here for REPLing
|
||||
# the runtime itself; i.e. ensure there's no hangs!
|
||||
# |_from tractor.devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# pray to the `trio` gawds that we're corrent with this
|
||||
# msg: dict = await ctx._rx_chan.receive()
|
||||
msg, pld = await ctx._pld_rx.recv_msg_w_pld(ipc=ctx)
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
|
@ -376,7 +528,7 @@ async def drain_to_final_msg(
|
|||
# SHOULD NOT raise that far end error,
|
||||
# 2. WE DID NOT REQUEST that cancel and thus
|
||||
# SHOULD RAISE HERE!
|
||||
except trio.Cancelled:
|
||||
except trio.Cancelled as taskc:
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is
|
||||
|
@ -386,7 +538,7 @@ async def drain_to_final_msg(
|
|||
|
||||
# CASE 1: we DID request the cancel we simply
|
||||
# continue to bubble up as normal.
|
||||
raise
|
||||
raise taskc
|
||||
|
||||
match msg:
|
||||
|
||||
|
@ -399,14 +551,14 @@ async def drain_to_final_msg(
|
|||
ctx._result: Any = pld
|
||||
log.runtime(
|
||||
'Context delivered final draining msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
f'{pretty_struct.pformat(msg)}'
|
||||
)
|
||||
# XXX: only close the rx mem chan AFTER
|
||||
# a final result is retreived.
|
||||
# if ctx._rx_chan:
|
||||
# await ctx._rx_chan.aclose()
|
||||
# TODO: ^ we don't need it right?
|
||||
return_msg = msg
|
||||
result_msg = msg
|
||||
break
|
||||
|
||||
# far end task is still streaming to us so discard
|
||||
|
@ -435,12 +587,9 @@ async def drain_to_final_msg(
|
|||
f'=> {ctx._task}\n'
|
||||
f' |_{ctx._stream}\n\n'
|
||||
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
return (
|
||||
return_msg,
|
||||
pre_result_drained,
|
||||
f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
break
|
||||
|
||||
# drain up to the `msg_limit` hoping to get
|
||||
# a final result or error/ctxc.
|
||||
|
@ -452,7 +601,7 @@ async def drain_to_final_msg(
|
|||
f'=> {ctx._task}\n'
|
||||
f' |_{ctx._stream}\n\n'
|
||||
|
||||
f'{pformat(msg)}\n'
|
||||
f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
|
@ -467,7 +616,7 @@ async def drain_to_final_msg(
|
|||
pre_result_drained.append(msg)
|
||||
log.cancel(
|
||||
'Remote stream terminated due to "stop" msg:\n\n'
|
||||
f'{pformat(msg)}\n'
|
||||
f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
|
@ -476,7 +625,7 @@ async def drain_to_final_msg(
|
|||
case Error():
|
||||
# TODO: can we replace this with `ctx.maybe_raise()`?
|
||||
# -[ ] would this be handier for this case maybe?
|
||||
# async with maybe_raise_on_exit() as raises:
|
||||
# |_async with maybe_raise_on_exit() as raises:
|
||||
# if raises:
|
||||
# log.error('some msg about raising..')
|
||||
#
|
||||
|
@ -512,7 +661,7 @@ async def drain_to_final_msg(
|
|||
# raise_overrun_from_self=False,
|
||||
raise_overrun_from_self=raise_overrun,
|
||||
)
|
||||
|
||||
result_msg = msg
|
||||
break # OOOOOF, yeah obvi we need this..
|
||||
|
||||
# XXX we should never really get here
|
||||
|
@ -558,6 +707,6 @@ async def drain_to_final_msg(
|
|||
)
|
||||
|
||||
return (
|
||||
return_msg,
|
||||
result_msg,
|
||||
pre_result_drained,
|
||||
)
|
||||
|
|
|
@ -56,6 +56,7 @@ log = get_logger('tractor.msgspec')
|
|||
PayloadT = TypeVar('PayloadT')
|
||||
|
||||
|
||||
# TODO: PayloadMsg
|
||||
class Msg(
|
||||
Struct,
|
||||
Generic[PayloadT],
|
||||
|
@ -81,7 +82,7 @@ class Msg(
|
|||
tree.
|
||||
|
||||
'''
|
||||
cid: str|None # call/context-id
|
||||
cid: str # call/context-id
|
||||
# ^-TODO-^: more explicit type?
|
||||
# -[ ] use UNSET here?
|
||||
# https://jcristharif.com/msgspec/supported-types.html#unset
|
||||
|
@ -106,7 +107,7 @@ class Msg(
|
|||
# TODO: could also be set to `msgspec.Raw` if the sub-decoders
|
||||
# approach is preferred over the generic parameterization
|
||||
# approach as take by `mk_msg_spec()` below.
|
||||
pld: PayloadT|Raw
|
||||
pld: Raw
|
||||
|
||||
|
||||
class Aid(
|
||||
|
@ -143,6 +144,8 @@ class SpawnSpec(
|
|||
`Aid` msg.
|
||||
|
||||
'''
|
||||
# TODO: similar to the `Start` kwargs spec needed below, we need
|
||||
# a hard `Struct` def for all of these fields!
|
||||
_parent_main_data: dict
|
||||
_runtime_vars: dict[str, Any]
|
||||
|
||||
|
|
Loading…
Reference in New Issue