Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet cf48fdecfe Unify `MsgTypeError` as a `RemoteActorError` subtype
Since in the receive-side error case the source of the exception is the
sender side (normally causing a local `TypeError` at decode time), might
as well bundle the error in remote-capture-style using boxing semantics
around the causing local type error raised from the
`msgspec.msgpack.Decoder.decode()` and with a traceback packed from
`msgspec`-specific knowledge of any field-type spec matching failure.

Deats on new `MsgTypeError` interface:
- includes a `.msg_dict` to get access to any `Decoder.type`-applied
  load of the original (underlying and offending) IPC msg into
  a `dict` form using a vanilla decoder which is normally packed into
  the instance as a `._msg_dict`.
- a public getter to the "supposed offending msg" via `.payload_msg`
  which attempts to take the above `.msg_dict` and load it manually into
  the corresponding `.msg.types.MsgType` struct.
- a constructor `.from_decode()` to make it simple to build out error
  instances from a failed decode scope where the aforementioned
  `msgdict: dict` from the vanilla decode can be provided directly.
- ALSO, we now pack into `MsgTypeError` directly just like ctxc in
  `unpack_error()`

This also completes the while-standing todo for `RemoteActorError` to
contain a ref to the underlying `Error` msg as `._ipc_msg` with public
`@property` access that `defstruct()`-creates a pretty struct version
via `.ipc_msg`.

Internal tweaks for this include:
- `._ipc_msg` is the internal literal `Error`-msg instance if provided
  with `.ipc_msg` the dynamic wrapper as mentioned above.
- `.__init__()` now can still take variable `**extra_msgdata` (similar
  to the `dict`-msgdata as before) to maintain support for subtypes
  which are constructed manually (not only by `pack_error()`) and insert
  their own attrs which get placed in a `._extra_msgdata: dict` if no
  `ipc_msg: Error` is provided as input.
- the `.msgdata` is now a merge of any `._extra_msgdata` and
  a `dict`-casted form of any `._ipc_msg`.
- adjust all previous `.msgdata` field lookups to try equivalent field
  reads on `._ipc_msg: Error`.
- drop default single ws indent from `.tb_str` and do a failover lookup
  to `.msgdata` when `._ipc_msg is None` for the manually constructed
  subtype-instance case.
- add a new class attr `.extra_body_fields: list[str]` to allow subtypes
  to declare attrs they want shown in the `.__repr__()` output, eg.
  `ContextCancelled.canceller`, `StreamOverrun.sender` and
  `MsgTypeError.payload_msg`.
- ^-rework defaults pertaining to-^ with rename from
  `_msgdata_keys` -> `_ipcmsg_keys` with latter now just loading directly
  from the `Error` fields def and `_body_fields: list[str]` just taking
  that value and removing the not-so-useful-in-REPL or already shown
  (i.e. `.tb_str: str`) field names.
- add a new mod level `.pack_from_raise()` helper for auto-boxing RAE
  subtypes constructed manually into `Error`s which is normally how
  `StreamOverrun` and `MsgTypeError` get created in the runtime.
- in support of the above expose a `src_uid: tuple` override to
  `pack_error()` such that the runtime can provide any remote actor id
  when packing a locally-created yet remotely-caused RAE subtype.
- adjust all typing to expect `Error`s over `dict`-msgs.

Adjust some tests to match these changes:
- context and inter-peer-cancel tests to make their `.msgdata` related
  checks against the new `.ipc_msg` as well and `.tb_str` directly.
- toss in an extra sleep to `sleep_a_bit_then_cancel_peer()` to keep the
  'canceller' ctx child task cancelled by it's parent in the 'root' for
  the rte-raised-during-ctxc-handling case (apparently now it's
  returning too fast, cool?).
2024-04-09 10:07:10 -04:00
Tyler Goodlet b341146bd1 Rename `Actor._push_result()` -> `._deliver_ctx_payload()`
Better describes the internal RPC impl/latest-architecture with the msgs
delivered being those which either define a `.pld: PayloadT` that gets
passed up to user code, or the error-msg subset that similarly is raised
in a ctx-linked task.
2024-04-08 10:25:57 -04:00
Tyler Goodlet 2f451ab9a3 Caps-msging test tweaks to get correct failures
These are likely temporary changes but still needed to actually see the
desired/correct failures (of which 5 of 6 tests are supposed to fail rn)
mostly to do with `Start` and `Return` msgs which are invalid under each
test's applied msg-spec.

Tweak set here:
- bit more `print()`s in root and sub for grokin test flow.
- never use `pytes.fail()` in subactor.. should know this by now XD
- comment out some bits that can't ever pass rn and make the underlying
  expected failues harder to grok:
  - the sub's child-side-of-ctx task doing sends should only fail
    for certain msg types like `Started` + `Return`, `Yield`s are
    processed receiver/parent side.
  - don't expect `sent` list to match predicate set for the same reason
    as last bullet.

The outstanding msg-type-semantic validation questions are:
- how to handle `.open_context()` with an input `kwargs` set that
  doesn't adhere to the currently applied msg-spec?
  - should the initial `@acm` entry fail before sending to the child
    side?
- where should received `MsgTypeError`s be raised, at the `MsgStream`
  `.receive()` or lower in the stack?
  - i'm thinking we should mk `MsgTypeError` derive from
    `RemoteActorError` and then have it be delivered as an error to the
    `Context`/`MsgStream` for per-ctx-task handling; would lead to more
    flexible/modular policy overrides in user code outside any defaults
    we provide.
2024-04-08 10:13:14 -04:00
Tyler Goodlet 8e83455a78 Finally drop masked `chan.send(None)` related code blocks 2024-04-07 18:54:03 -04:00
Tyler Goodlet 38111e8d53 Detail out EoC-by-self log msg 2024-04-07 16:35:00 -04:00
Tyler Goodlet aea5abdd70 Use `object()` when checking for error field value
Since the field value could be `None` or some other type with
truthy-ness evaluating to `False`..
2024-04-07 16:29:21 -04:00
Tyler Goodlet aca6503fcd Flatten out RPC loop with `match:`/`case:`
Mainly expanding out the runtime endpoints for cancellation to separate
cases and flattening them with the main RPC-request-invoke block, moving
the non-cancel runtime case (where we call `getattr(actor, funcname)`)
inside the main `Start` case (for now) which branches on `ns=="self"`.

Also, add a new IPC msg `class CancelAck(Return):` which is always
included in the default msg-spec such that runtime cancellation (and
eventually all) endpoints return that msg (instead of a `Return`) and
thus sidestep any currently applied `MsgCodec` such that the results
(`bool`s for most cancel methods) are never violating the current type
limit(s) on `Msg.pld`. To support this expose a new variable
`return_msg: Return|CancelAck` param from
`_invoke()`/`_invoke_non_context)()` and set it to `CancelAck` in the
appropriate endpoint case-blocks of the msg loop.

Clean out all the lingering legacy `chan.send(<dict-msg>)` commented
codez from the invoker funcs, with more cleaning likely to come B)
2024-04-07 10:40:01 -04:00
Tyler Goodlet b9a61ded0a Drop `None`-sentinel cancels RPC loop mechanism
Pretty sure we haven't *needed it* for a while, it was always generally
hazardous in terms of IPC msg types, AND it's definitely incompatible
with a dynamically applied typed msg spec: you can't just expect
a `None` to be willy nilly handled all the time XD

For now I'm masking out all the code and leaving very detailed
surrounding notes but am not removing it quite yet in case for strange
reason it is needed by some edge case (though I haven't found according
to the test suite).

Backstory:
------ - ------
Originally (i'm pretty sure anyway) it was added as a super naive
"remote cancellation" mechanism (back before there were specific `Actor`
methods for such things) that was mostly (only?) used before IPC
`Channel` closures to "more gracefully cancel" the connection's parented
RPC tasks. Since we now have explicit runtime-RPC endpoints for
conducting remote cancellation of both tasks and full actors, it should
really be removed anyway, because:
- a `None`-msg setinel is inconsistent with other RPC endpoint handling
  input patterns which (even prior to typed msging) had specific
  msg-value triggers.
- the IPC endpoint's (block) implementation should use
  `Actor.cancel_rpc_tasks(parent_chan=chan)` instead of a manual loop
  through a `Actor._rpc_tasks.copy()`..

Deats:
- mask the `Channel.send(None)` calls from both the `Actor._stream_handler()` tail
  as well as from the `._portal.open_portal()` was connected block.
- mask the msg loop endpoint block and toss in lotsa notes.

Unrelated tweaks:
- drop `Actor._debug_mode`; unused.
- make `Actor.cancel_server()` return a `bool`.
- use `.msg.pretty_struct.Struct.pformat()` to show any msg that is
  ignored (bc invalid) in `._push_result()`.
2024-04-05 19:07:12 -04:00
Tyler Goodlet 4cfe4979ff Factor `MsgpackTCPStream` msg-type checks
Add both the `.send()` and `.recv()` handling blocks to a common
`_raise_msg_type_err()` which includes detailed error msg formatting:

- the `.recv()` side case does introspection of the `Msg` fields and
  attempting to report the exact (field type related) issue
- `.send()` side does some boxed-error style tb formatting like
  `RemoteActorError`.
- add a `strict_types: bool` to `.send()` to allow for just
  warning on bad inputs versus raising, but always raise from any
  `Encoder` type error.
2024-04-05 18:33:46 -04:00
Tyler Goodlet 97bfbdbc1c Expose `MsgTypeError` from pkg 2024-04-05 16:32:15 -04:00
12 changed files with 691 additions and 481 deletions

View File

@ -374,7 +374,7 @@ def enc_type_union(
@tractor.context
async def send_back_nsp(
async def send_back_values(
ctx: Context,
expect_debug: bool,
pld_spec_type_strs: list[str],
@ -388,6 +388,8 @@ async def send_back_nsp(
and ensure we can round trip a func ref with our parent.
'''
uid: tuple = tractor.current_actor().uid
# debug mode sanity check (prolly superfluous but, meh)
assert expect_debug == _state.debug_mode()
@ -414,7 +416,7 @@ async def send_back_nsp(
)
print(
'CHILD attempting `Started`-bytes DECODE..\n'
f'{uid}: attempting `Started`-bytes DECODE..\n'
)
try:
msg: Started = nsp_codec.decode(started_msg_bytes)
@ -436,7 +438,7 @@ async def send_back_nsp(
raise
else:
print(
'CHILD (correctly) unable to DECODE `Started`-bytes\n'
f'{uid}: (correctly) unable to DECODE `Started`-bytes\n'
f'{started_msg_bytes}\n'
)
@ -445,7 +447,7 @@ async def send_back_nsp(
for send_value, expect_send in iter_send_val_items:
try:
print(
f'CHILD attempting to `.started({send_value})`\n'
f'{uid}: attempting to `.started({send_value})`\n'
f'=> expect_send: {expect_send}\n'
f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n'
f'AND, codec: {codec}\n'
@ -460,7 +462,6 @@ async def send_back_nsp(
# await tractor.pause()
raise RuntimeError(
# pytest.fail(
f'NOT-EXPECTED able to roundtrip value given spec:\n'
f'ipc_pld_spec -> {ipc_pld_spec}\n'
f'value -> {send_value}: {type(send_value)}\n'
@ -468,53 +469,76 @@ async def send_back_nsp(
break # move on to streaming block..
except NotImplementedError:
print('FAILED ENCODE!')
except tractor.MsgTypeError:
# await tractor.pause()
if expect_send:
pytest.fail(
raise RuntimeError(
f'EXPECTED to `.started()` value given spec:\n'
f'ipc_pld_spec -> {ipc_pld_spec}\n'
f'value -> {send_value}: {type(send_value)}\n'
)
async with ctx.open_stream() as ipc:
print(
f'{uid}: Entering streaming block to send remaining values..'
)
for send_value, expect_send in iter_send_val_items:
send_type: Type = type(send_value)
print(
'CHILD report on send value\n'
'------ - ------\n'
f'{uid}: SENDING NEXT VALUE\n'
f'ipc_pld_spec: {ipc_pld_spec}\n'
f'expect_send: {expect_send}\n'
f'val: {send_value}\n'
'------ - ------\n'
)
try:
await ipc.send(send_value)
print(f'***\n{uid}-CHILD sent {send_value!r}\n***\n')
sent.append(send_value)
if not expect_send:
pytest.fail(
f'NOT-EXPECTED able to roundtrip value given spec:\n'
f'ipc_pld_spec -> {ipc_pld_spec}\n'
f'value -> {send_value}: {send_type}\n'
)
# NOTE: should only raise above on
# `.started()` or a `Return`
# if not expect_send:
# raise RuntimeError(
# f'NOT-EXPECTED able to roundtrip value given spec:\n'
# f'ipc_pld_spec -> {ipc_pld_spec}\n'
# f'value -> {send_value}: {send_type}\n'
# )
except ValidationError:
print(f'{uid} FAILED TO SEND {send_value}!')
# await tractor.pause()
if expect_send:
pytest.fail(
raise RuntimeError(
f'EXPECTED to roundtrip value given spec:\n'
f'ipc_pld_spec -> {ipc_pld_spec}\n'
f'value -> {send_value}: {send_type}\n'
)
continue
# continue
assert (
len(sent)
==
len([val
for val, expect in
expect_ipc_send.values()
if expect is True])
)
else:
print(
f'{uid}: finished sending all values\n'
'Should be exiting stream block!\n'
)
print(f'{uid}: exited streaming block!')
# TODO: this won't be true bc in streaming phase we DO NOT
# msgspec check outbound msgs!
# -[ ] once we implement the receiver side `InvalidMsg`
# then we can expect it here?
# assert (
# len(sent)
# ==
# len([val
# for val, expect in
# expect_ipc_send.values()
# if expect is True])
# )
def ex_func(*args):
@ -635,7 +659,7 @@ def test_codec_hooks_mod(
async with (
p.open_context(
send_back_nsp,
send_back_values,
expect_debug=debug_mode,
pld_spec_type_strs=pld_spec_type_strs,
add_hooks=add_codec_hooks,
@ -665,10 +689,13 @@ def test_codec_hooks_mod(
async for next_sent in ipc:
print(
'Child sent next value\n'
'Parent: child sent next value\n'
f'{next_sent}: {type(next_sent)}\n'
)
expect_to_send.remove(next_sent)
if expect_to_send:
expect_to_send.remove(next_sent)
else:
print('PARENT should terminate stream loop + block!')
# all sent values should have arrived!
assert not expect_to_send

View File

@ -796,10 +796,12 @@ async def test_callee_cancels_before_started(
# raises a special cancel signal
except tractor.ContextCancelled as ce:
_ce = ce # for debug on crash
ce.boxed_type == trio.Cancelled
# the traceback should be informative
assert 'itself' in ce.msgdata['tb_str']
assert 'itself' in ce.tb_str
assert ce.tb_str == ce.msgdata['tb_str']
# teardown the actor
await portal.cancel_actor()
@ -1157,7 +1159,8 @@ def test_maybe_allow_overruns_stream(
elif slow_side == 'parent':
assert err.boxed_type == tractor.RemoteActorError
assert 'StreamOverrun' in err.msgdata['tb_str']
assert 'StreamOverrun' in err.tb_str
assert err.tb_str == err.msgdata['tb_str']
else:
# if this hits the logic blocks from above are not

View File

@ -185,6 +185,10 @@ async def sleep_a_bit_then_cancel_peer(
await trio.sleep(cancel_after)
await peer.cancel_actor()
# such that we're cancelled by our rent ctx-task
await trio.sleep(3)
print('CANCELLER RETURNING!')
@tractor.context
async def stream_ints(
@ -245,6 +249,12 @@ async def stream_from_peer(
assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
# XXX YES, bc exact same msg instances
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
# XXX NO, bc new one always created for property accesss
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
# the peer ctx is the canceller even though it's canceller
# is the "canceller" XD
assert peer_name in peer_ctx.canceller

View File

@ -44,9 +44,10 @@ from ._state import (
is_root_process as is_root_process,
)
from ._exceptions import (
RemoteActorError as RemoteActorError,
ModuleNotExposed as ModuleNotExposed,
ContextCancelled as ContextCancelled,
ModuleNotExposed as ModuleNotExposed,
MsgTypeError as MsgTypeError,
RemoteActorError as RemoteActorError,
)
from .devx import (
breakpoint as breakpoint,

View File

@ -1207,7 +1207,7 @@ class Context:
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
@ -1698,11 +1698,11 @@ class Context:
# raise any msg type error NO MATTER WHAT!
except msgspec.ValidationError as verr:
from tractor._ipc import _raise_msg_type_err
_raise_msg_type_err(
from tractor._ipc import _mk_msg_type_err
raise _mk_msg_type_err(
msg=msg_bytes,
codec=codec,
validation_err=verr,
src_validation_error=verr,
verb_header='Trying to send payload'
# > 'invalid `Started IPC msgs\n'
)
@ -2415,7 +2415,7 @@ async def open_context_from_portal(
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the

View File

@ -31,7 +31,10 @@ import textwrap
import traceback
import trio
from msgspec import structs
from msgspec import (
structs,
defstruct,
)
from tractor._state import current_actor
from tractor.log import get_logger
@ -40,6 +43,8 @@ from tractor.msg import (
Msg,
Stop,
Yield,
pretty_struct,
types as msgtypes,
)
if TYPE_CHECKING:
@ -64,21 +69,38 @@ class InternalError(RuntimeError):
'''
_body_fields: list[str] = [
'boxed_type',
'src_type',
# TODO: format this better if we're going to include it.
# 'relay_path',
'src_uid',
# only in sub-types
'canceller',
'sender',
# NOTE: more or less should be close to these:
# 'boxed_type',
# 'src_type',
# 'src_uid',
# 'canceller',
# 'sender',
# TODO: format this better if we're going to include it.
# 'relay_path',
#
_ipcmsg_keys: list[str] = [
fi.name
for fi, k, v
in pretty_struct.iter_fields(Error)
]
_msgdata_keys: list[str] = [
'boxed_type_str',
] + _body_fields
_body_fields: list[str] = list(
set(_ipcmsg_keys)
# NOTE: don't show fields that either don't provide
# any extra useful info or that are already shown
# as part of `.__repr__()` output.
- {
'src_type_str',
'boxed_type_str',
'tb_str',
'relay_path',
'_msg_dict',
'cid',
}
)
def get_err_type(type_name: str) -> BaseException|None:
@ -137,7 +159,7 @@ def pformat_boxed_tb(
f'|\n'
f' ------ - ------\n\n'
f'{tb_str}\n'
f' ------ - ------\n'
f' ------ - ------\n'
f'_|\n'
)
if len(indent):
@ -152,10 +174,40 @@ def pformat_boxed_tb(
+
body
)
# return body
# TODO: rename to just `RemoteError`?
def pack_from_raise(
local_err: (
ContextCancelled
|StreamOverrun
|MsgTypeError
),
cid: str,
**rae_fields,
) -> Error:
'''
Raise the provided `RemoteActorError` subtype exception
instance locally to get a traceback and pack it into an IPC
`Error`-msg using `pack_error()` to extract the tb info.
'''
try:
raise local_err
except type(local_err) as local_err:
err_msg: dict[str, dict] = pack_error(
local_err,
cid=cid,
**rae_fields,
)
return err_msg
# TODO: better compat with IPC msg structs?
# -[ ] rename to just `RemoteError` like in `mp.manager`?
# -[ ] make a `Struct`-subtype by using the .__post_init__()`?
# https://jcristharif.com/msgspec/structs.html#post-init-processing
class RemoteActorError(Exception):
'''
A box(ing) type which bundles a remote actor `BaseException` for
@ -170,12 +222,28 @@ class RemoteActorError(Exception):
'src_uid',
# 'relay_path',
]
extra_body_fields: list[str] = [
'cid',
'boxed_type',
]
def __init__(
self,
message: str,
ipc_msg: Error|None = None,
boxed_type: Type[BaseException]|None = None,
**msgdata
# NOTE: only provided by subtypes (ctxc and overruns)
# wishing to both manually instantiate and add field
# values defined on `Error` without having to construct an
# `Error()` before the exception is processed by
# `pack_error()`.
#
# TODO: a better way to support this without the extra
# private `._extra_msgdata`?
# -[ ] ctxc constructed inside `._rpc._invoke()` L:638
# -[ ] overrun @ `._context.Context._deliver_msg()` L:1958
**extra_msgdata,
) -> None:
super().__init__(message)
@ -188,14 +256,24 @@ class RemoteActorError(Exception):
# - .remote_type
# also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5
#
# TODO: always set ._boxed_type` as `None` by default
# and instead render if from `.boxed_type_str`?
self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None
self._ipc_msg: Error|None = ipc_msg
# TODO: make this a `.errmsg: Error` throughout?
self.msgdata: dict[str, Any] = msgdata
if (
extra_msgdata
and ipc_msg
):
# XXX mutate the orig msg directly from
# manually provided input params.
for k, v in extra_msgdata.items():
setattr(
self._ipc_msg,
k,
v,
)
else:
self._extra_msgdata = extra_msgdata
# TODO: mask out eventually or place in `pack_error()`
# pre-`return` lines?
@ -214,14 +292,56 @@ class RemoteActorError(Exception):
# either by customizing `ContextCancelled.__init__()` or
# through a special factor func?
elif boxed_type:
if not self.msgdata.get('boxed_type_str'):
self.msgdata['boxed_type_str'] = str(
type(boxed_type).__name__
)
boxed_type_str: str = type(boxed_type).__name__
if (
ipc_msg
and not self._ipc_msg.boxed_type_str
):
self._ipc_msg.boxed_type_str = boxed_type_str
assert self.boxed_type_str == self._ipc_msg.boxed_type_str
else:
self._extra_msgdata['boxed_type_str'] = boxed_type_str
assert self.boxed_type_str == self.msgdata['boxed_type_str']
assert self.boxed_type is boxed_type
@property
def ipc_msg(self) -> pretty_struct.Struct:
'''
Re-render the underlying `._ipc_msg: Msg` as
a `pretty_struct.Struct` for introspection such that the
returned value is a read-only copy of the original.
'''
if self._ipc_msg is None:
return None
msg_type: Msg = type(self._ipc_msg)
fields: dict[str, Any] = {
k: v for _, k, v in
pretty_struct.iter_fields(self._ipc_msg)
}
return defstruct(
msg_type.__name__,
fields=fields.keys(),
bases=(msg_type, pretty_struct.Struct),
)(**fields)
@property
def msgdata(self) -> dict[str, Any]:
'''
The (remote) error data provided by a merge of the
`._ipc_msg: Error` msg and any input `._extra_msgdata: dict`
(provided by subtypes via `.__init__()`).
'''
msgdata: dict = (
structs.asdict(self._ipc_msg)
if self._ipc_msg
else {}
)
return self._extra_msgdata | msgdata
@property
def src_type_str(self) -> str:
'''
@ -231,7 +351,7 @@ class RemoteActorError(Exception):
at the first relay/hop's receiving actor.
'''
return self.msgdata['src_type_str']
return self._ipc_msg.src_type_str
@property
def src_type(self) -> str:
@ -241,7 +361,7 @@ class RemoteActorError(Exception):
'''
if self._src_type is None:
self._src_type = get_err_type(
self.msgdata['src_type_str']
self._ipc_msg.src_type_str
)
return self._src_type
@ -252,7 +372,7 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type.
'''
return self.msgdata['boxed_type_str']
return self._ipc_msg.boxed_type_str
@property
def boxed_type(self) -> str:
@ -262,7 +382,7 @@ class RemoteActorError(Exception):
'''
if self._boxed_type is None:
self._boxed_type = get_err_type(
self.msgdata['boxed_type_str']
self._ipc_msg.boxed_type_str
)
return self._boxed_type
@ -275,40 +395,44 @@ class RemoteActorError(Exception):
actor's hop.
NOTE: a `list` field with the same name is expected to be
passed/updated in `.msgdata`.
passed/updated in `.ipc_msg`.
'''
return self.msgdata['relay_path']
return self._ipc_msg.relay_path
@property
def relay_uid(self) -> tuple[str, str]|None:
return tuple(
self.msgdata['relay_path'][-1]
self._ipc_msg.relay_path[-1]
)
@property
def src_uid(self) -> tuple[str, str]|None:
if src_uid := (
self.msgdata.get('src_uid')
self._ipc_msg.src_uid
):
return tuple(src_uid)
# TODO: use path lookup instead?
# return tuple(
# self.msgdata['relay_path'][0]
# self._ipc_msg.relay_path[0]
# )
@property
def tb_str(
self,
indent: str = ' ',
indent: str = '',
) -> str:
if remote_tb := self.msgdata.get('tb_str'):
return textwrap.indent(
remote_tb,
prefix=indent,
)
remote_tb: str = ''
return ''
if self._ipc_msg:
remote_tb: str = self._ipc_msg.tb_str
else:
remote_tb = self.msgdata.get('tb_str')
return textwrap.indent(
remote_tb or '',
prefix=indent,
)
def _mk_fields_str(
self,
@ -320,14 +444,17 @@ class RemoteActorError(Exception):
val: Any|None = (
getattr(self, key, None)
or
self.msgdata.get(key)
getattr(
self._ipc_msg,
key,
None,
)
)
# TODO: for `.relay_path` on multiline?
# if not isinstance(val, str):
# val_str = pformat(val)
# else:
val_str: str = repr(val)
if val:
_repr += f'{key}={val_str}{end_char}'
@ -358,7 +485,9 @@ class RemoteActorError(Exception):
'''
fields: str = self._mk_fields_str(
_body_fields,
_body_fields
+
self.extra_body_fields,
)
body: str = pformat_boxed_tb(
tb_str=self.tb_str,
@ -415,15 +544,6 @@ class RemoteActorError(Exception):
# raise NotImplementedError
class InternalActorError(RemoteActorError):
'''
(Remote) internal `tractor` error indicating failure of some
primitive, machinery state or lowlevel task that should never
occur.
'''
class ContextCancelled(RemoteActorError):
'''
Inter-actor task context was cancelled by either a call to
@ -433,6 +553,10 @@ class ContextCancelled(RemoteActorError):
reprol_fields: list[str] = [
'canceller',
]
extra_body_fields: list[str] = [
'cid',
'canceller',
]
@property
def canceller(self) -> tuple[str, str]|None:
'''
@ -454,7 +578,7 @@ class ContextCancelled(RemoteActorError):
|_`._cancel_task()`
'''
value = self.msgdata.get('canceller')
value: tuple[str, str]|None = self._ipc_msg.canceller
if value:
return tuple(value)
@ -468,6 +592,132 @@ class ContextCancelled(RemoteActorError):
# src_actor_uid = canceller
class MsgTypeError(
RemoteActorError,
):
'''
Equivalent of a runtime `TypeError` for IPC dialogs.
Raise when any IPC wire-message is decoded to have invalid
field values (due to type) or for other `MsgCodec` related
violations such as having no extension-type for a field with
a custom type but no `enc/dec_hook()` support.
Can be raised on the send or recv side of an IPC `Channel`
depending on the particular msg.
Msgs which cause this to be raised on the `.send()` side (aka
in the "ctl" dialog phase) include:
- `Start`
- `Started`
- `Return`
Those which cause it on on the `.recv()` side (aka the "nasty
streaming" dialog phase) are:
- `Yield`
- TODO: any embedded `.pld` type defined by user code?
Normally the source of an error is re-raised from some `.msg._codec`
decode which itself raises in a backend interchange
lib (eg. a `msgspec.ValidationError`).
'''
reprol_fields: list[str] = [
'ipc_msg',
]
extra_body_fields: list[str] = [
'cid',
'payload_msg',
]
@property
def msg_dict(self) -> dict[str, Any]:
'''
If the underlying IPC `Msg` was received from a remote
actor but was unable to be decoded to a native
`Yield`|`Started`|`Return` struct, the interchange backend
native format decoder can be used to stash a `dict`
version for introspection by the invalidating RPC task.
'''
return self.msgdata.get('_msg_dict')
@property
def payload_msg(self) -> Msg|None:
'''
Attempt to construct what would have been the original
`Msg`-with-payload subtype (i.e. an instance from the set
of msgs in `.msg.types._payload_msgs`) which failed
validation.
'''
msg_dict: dict = self.msg_dict.copy()
name: str = msg_dict.pop('msg_type')
msg_type: Msg = getattr(
msgtypes,
name,
Msg,
)
return msg_type(**msg_dict)
@property
def cid(self) -> str:
# pre-packed using `.from_decode()` constructor
return self.msgdata.get('cid')
@classmethod
def from_decode(
cls,
message: str,
msgdict: dict,
) -> MsgTypeError:
return cls(
message=message,
# NOTE: original "vanilla decode" of the msg-bytes
# is placed inside a value readable from
# `.msgdata['_msg_dict']`
_msg_dict=msgdict,
# expand and pack all RAE compat fields
# into the `._extra_msgdata` aux `dict`.
**{
k: v
for k, v in msgdict.items()
if k in _ipcmsg_keys
},
)
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
reprol_fields: list[str] = [
'sender',
]
'''
This stream was overrun by its sender and can be optionally
handled by app code using `MsgStream.send()/.receive()`.
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self._ipc_msg.sender
if value:
return tuple(value)
# class InternalActorError(RemoteActorError):
# '''
# Boxed (Remote) internal `tractor` error indicating failure of some
# primitive, machinery state or lowlevel task that should never
# occur.
# '''
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
@ -484,23 +734,6 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
reprol_fields: list[str] = [
'sender',
]
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class AsyncioCancelled(Exception):
'''
@ -518,23 +751,12 @@ class MessagingError(Exception):
'''
class MsgTypeError(MessagingError):
'''
Equivalent of a `TypeError` for an IPC wire-message
due to an invalid field value (type).
Normally this is re-raised from some `.msg._codec`
decode error raised by a backend interchange lib
like `msgspec` or `pycapnproto`.
'''
def pack_error(
exc: BaseException|RemoteActorError,
tb: str|None = None,
cid: str|None = None,
src_uid: tuple[str, str]|None = None,
) -> Error:
'''
@ -560,7 +782,8 @@ def pack_error(
):
error_msg.update(exc.msgdata)
# an onion/inception we need to pack
# an onion/inception we need to pack as a nested and relayed
# remotely boxed error.
if (
type(exc) is RemoteActorError
and (boxed := exc.boxed_type)
@ -584,7 +807,7 @@ def pack_error(
error_msg['boxed_type_str'] = 'RemoteActorError'
else:
error_msg['src_uid'] = our_uid
error_msg['src_uid'] = src_uid or our_uid
error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__
@ -596,7 +819,7 @@ def pack_error(
# XXX NOTE: always ensure the traceback-str is from the
# locally raised error (**not** the prior relay's boxed
# content's `.msgdata`).
# content's in `._ipc_msg.tb_str`).
error_msg['tb_str'] = tb_str
if cid is not None:
@ -606,7 +829,7 @@ def pack_error(
def unpack_error(
msg: dict[str, Any]|Error,
msg: Error,
chan: Channel|None = None,
box_type: RemoteActorError = RemoteActorError,
@ -624,16 +847,10 @@ def unpack_error(
'''
__tracebackhide__: bool = hide_tb
error_dict: dict[str, dict]|None
if not isinstance(msg, Error):
# if (
# error_dict := msg.get('error')
# ) is None:
# no error field, nothing to unpack.
return None
# retrieve the remote error's msg encoded details
# tb_str: str = error_dict.get('tb_str', '')
# retrieve the remote error's encoded details from fields
tb_str: str = msg.tb_str
message: str = (
f'{chan.uid}\n'
@ -651,6 +868,10 @@ def unpack_error(
box_type = ContextCancelled
assert boxed_type is box_type
elif boxed_type_str == 'MsgTypeError':
box_type = MsgTypeError
assert boxed_type is box_type
# TODO: already included by `_this_mod` in else loop right?
#
# we have an inception/onion-error so ensure
@ -661,12 +882,9 @@ def unpack_error(
# assert len(error_dict['relay_path']) >= 1
assert len(msg.relay_path) >= 1
# TODO: mk RAE just take the `Error` instance directly?
error_dict: dict = structs.asdict(msg)
exc = box_type(
message,
**error_dict,
ipc_msg=msg,
)
return exc

View File

@ -54,7 +54,8 @@ from tractor.msg import (
_ctxvar_MsgCodec,
_codec,
MsgCodec,
types,
types as msgtypes,
pretty_struct,
)
log = get_logger(__name__)
@ -72,6 +73,7 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple:
)
# TODO: this should be our `Union[*msgtypes.__spec__]` now right?
MsgType = TypeVar("MsgType")
# TODO: consider using a generic def and indexing with our eventual
@ -116,6 +118,74 @@ class MsgTransport(Protocol[MsgType]):
...
def _raise_msg_type_err(
msg: Any|bytes,
codec: MsgCodec,
validation_err: msgspec.ValidationError|None = None,
verb_header: str = '',
) -> None:
# if side == 'send':
if validation_err is None: # send-side
import traceback
from tractor._exceptions import pformat_boxed_tb
fmt_spec: str = '\n'.join(
map(str, codec.msg_spec.__args__)
)
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ',
indent='',
)
raise MsgTypeError(
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n'
f'{fmt_spec}\n'
)
else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgspec.msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
errmsg: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
if verb_header:
errmsg = f'{verb_header} ' + errmsg
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = validation_err.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_type: Union[Type] = msg_type.__signature__.parameters[
maybe_field
].annotation
errmsg += (
f'{msg.rstrip("`")}\n\n'
f'{msg_type}\n'
f' |_.{maybe_field}: {field_type} = {field_val!r}\n'
)
raise MsgTypeError(errmsg) from validation_err
# TODO: not sure why we have to inherit here, but it seems to be an
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# probably should make a `mypy` issue?
@ -175,9 +245,10 @@ class MsgpackTCPStream(MsgTransport):
or
_codec._ctxvar_MsgCodec.get()
)
log.critical(
'!?!: USING STD `tractor` CODEC !?!?\n'
f'{self._codec}\n'
# TODO: mask out before release?
log.runtime(
f'New {self} created with codec\n'
f'codec: {self._codec}\n'
)
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
@ -221,16 +292,18 @@ class MsgpackTCPStream(MsgTransport):
# NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get()
# TODO: mask out before release?
if self._codec.pld_spec != codec.pld_spec:
# assert (
# task := trio.lowlevel.current_task()
# ) is not self._task
# self._task = task
self._codec = codec
log.critical(
'.recv() USING NEW CODEC !?!?\n'
f'{self._codec}\n\n'
f'msg_bytes -> {msg_bytes}\n'
log.runtime(
'Using new codec in {self}.recv()\n'
f'codec: {self._codec}\n\n'
f'msg_bytes: {msg_bytes}\n'
)
yield codec.decode(msg_bytes)
@ -252,36 +325,13 @@ class MsgpackTCPStream(MsgTransport):
# and always raise such that spec violations
# are never allowed to be caught silently!
except msgspec.ValidationError as verr:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgspec.msgpack.decode(msg_bytes)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(types, msg_type_name)
errmsg: str = (
f'Received invalid IPC `{msg_type_name}` msg\n\n'
# re-raise as type error
_raise_msg_type_err(
msg=msg_bytes,
codec=codec,
validation_err=verr,
)
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = verr.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
if field_val := msg_dict.get(maybe_field):
field_type: Union[Type] = msg_type.__signature__.parameters[
maybe_field
].annotation
errmsg += (
f'{msg.rstrip("`")}\n\n'
f'{msg_type}\n'
f' |_.{maybe_field}: {field_type} = {field_val}\n'
)
raise MsgTypeError(errmsg) from verr
except (
msgspec.DecodeError,
UnicodeDecodeError,
@ -307,12 +357,16 @@ class MsgpackTCPStream(MsgTransport):
async def send(
self,
msg: Any,
msg: msgtypes.Msg,
strict_types: bool = True,
# hide_tb: bool = False,
) -> None:
'''
Send a msgpack coded blob-as-msg over TCP.
Send a msgpack encoded py-object-blob-as-msg over TCP.
If `strict_types == True` then a `MsgTypeError` will be raised on any
invalid msg type
'''
# __tracebackhide__: bool = hide_tb
@ -321,25 +375,40 @@ class MsgpackTCPStream(MsgTransport):
# NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get()
# if self._codec != codec:
# TODO: mask out before release?
if self._codec.pld_spec != codec.pld_spec:
self._codec = codec
log.critical(
'.send() using NEW CODEC !?!?\n'
f'{self._codec}\n\n'
f'OBJ -> {msg}\n'
log.runtime(
'Using new codec in {self}.send()\n'
f'codec: {self._codec}\n\n'
f'msg: {msg}\n'
)
if type(msg) not in types.__spec__:
log.warning(
'Sending non-`Msg`-spec msg?\n\n'
f'{msg}\n'
)
bytes_data: bytes = codec.encode(msg)
if type(msg) not in msgtypes.__msg_types__:
if strict_types:
_raise_msg_type_err(
msg,
codec=codec,
)
else:
log.warning(
'Sending non-`Msg`-spec msg?\n\n'
f'{msg}\n'
)
try:
bytes_data: bytes = codec.encode(msg)
except TypeError as typerr:
raise MsgTypeError(
'A msg field violates the current spec\n'
f'{codec.pld_spec}\n\n'
f'{pretty_struct.Struct.pformat(msg)}'
) from typerr
# supposedly the fastest says,
# https://stackoverflow.com/a/54027962
size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
@property
@ -567,7 +636,6 @@ class Channel:
f'{pformat(payload)}\n'
) # type: ignore
assert self._transport
await self._transport.send(
payload,
# hide_tb=hide_tb,
@ -577,6 +645,11 @@ class Channel:
assert self._transport
return await self._transport.recv()
# TODO: auto-reconnect features like 0mq/nanomsg?
# -[ ] implement it manually with nods to SC prot
# possibly on multiple transport backends?
# -> seems like that might be re-inventing scalability
# prots tho no?
# try:
# return await self._transport.recv()
# except trio.BrokenResourceError:

View File

@ -502,7 +502,7 @@ async def open_portal(
'''
actor = current_actor()
assert actor
was_connected = False
was_connected: bool = False
async with maybe_open_nursery(nursery, shield=shield) as nursery:
@ -533,9 +533,7 @@ async def open_portal(
await portal.aclose()
if was_connected:
# gracefully signal remote channel-msg loop
await channel.send(None)
# await channel.aclose()
await channel.aclose()
# cancel background msg loop task
if msg_loop_cs:

View File

@ -55,20 +55,21 @@ from ._exceptions import (
TransportClosed,
)
from .devx import (
pause,
maybe_wait_for_debugger,
_debug,
)
from . import _state
from .log import get_logger
from tractor.msg.types import (
CancelAck,
Error,
Msg,
Return,
Start,
StartAck,
Started,
Stop,
Yield,
Return,
Error,
)
@ -90,6 +91,7 @@ async def _invoke_non_context(
treat_as_gen: bool,
is_rpc: bool,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
@ -98,7 +100,6 @@ async def _invoke_non_context(
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
# await chan.send({
await chan.send(
StartAck(
cid=cid,
@ -124,11 +125,6 @@ async def _invoke_non_context(
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
# await chan.send({
# # Yield()
# 'cid': cid,
# 'yield': item,
# })
await chan.send(
Yield(
cid=cid,
@ -143,11 +139,6 @@ async def _invoke_non_context(
await chan.send(
Stop(cid=cid)
)
# await chan.send({
# # Stop(
# 'cid': cid,
# 'stop': True,
# })
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
@ -158,11 +149,6 @@ async def _invoke_non_context(
functype='asyncgen',
)
)
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'asyncgen',
# })
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
@ -178,11 +164,6 @@ async def _invoke_non_context(
await chan.send(
Stop(cid=cid)
)
# await chan.send({
# # Stop(
# 'cid': cid,
# 'stop': True,
# })
else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
@ -200,11 +181,6 @@ async def _invoke_non_context(
functype='asyncfunc',
)
)
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'asyncfunc',
# })
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
@ -238,13 +214,8 @@ async def _invoke_non_context(
and chan.connected()
):
try:
# await chan.send({
# # Return()
# 'cid': cid,
# 'return': result,
# })
await chan.send(
Return(
return_msg(
cid=cid,
pld=result,
)
@ -409,6 +380,7 @@ async def _invoke(
is_rpc: bool = True,
hide_tb: bool = True,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[
Context | BaseException
@ -429,8 +401,6 @@ async def _invoke(
# XXX for .pause_from_sync()` usage we need to make sure
# `greenback` is boostrapped in the subactor!
await _debug.maybe_init_greenback()
# else:
# await pause()
# TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)?
@ -520,6 +490,7 @@ async def _invoke(
kwargs,
treat_as_gen,
is_rpc,
return_msg,
task_status,
)
# below is only for `@context` funcs
@ -550,11 +521,6 @@ async def _invoke(
functype='context',
)
)
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'context',
# })
# TODO: should we also use an `.open_context()` equiv
# for this callee side by factoring the impl from
@ -579,16 +545,11 @@ async def _invoke(
# deliver final result to caller side.
await chan.send(
Return(
return_msg(
cid=cid,
pld=res,
)
)
# await chan.send({
# # Return()
# 'cid': cid,
# 'return': res,
# })
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
@ -677,7 +638,6 @@ async def _invoke(
ctxc = ContextCancelled(
msg,
boxed_type=trio.Cancelled,
# boxed_type_str='Cancelled',
canceller=canceller,
)
# assign local error so that the `.outcome`
@ -778,12 +738,12 @@ async def try_ship_error_to_remote(
trio.BrokenResourceError,
BrokenPipeError,
):
# err_msg: dict = msg['error']['tb_str']
log.critical(
'IPC transport failure -> '
f'failed to ship error to {remote_descr}!\n\n'
f'X=> {channel.uid}\n\n'
# f'{err_msg}\n'
# TODO: use `.msg.preetty_struct` for this!
f'{msg}\n'
)
@ -825,6 +785,8 @@ async def process_messages(
'''
assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
# https://github.com/python-trio/trio/issues/467
@ -834,7 +796,7 @@ async def process_messages(
f'|_{chan}\n'
)
nursery_cancelled_before_task: bool = False
msg: dict | None = None
msg: Msg|None = None
try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having
@ -843,215 +805,148 @@ async def process_messages(
# using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs:
task_status.started(loop_cs)
async for msg in chan:
log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: conditionally avoid fmting depending
# on log level (for perf)?
# => specifically `pformat()` sub-call..?
# TODO: avoid fmting depending on loglevel for perf?
# -[ ] specifically `pformat()` sub-call..?
# -[ ] use `.msg.pretty_struct` here now instead!
f'{pformat(msg)}\n'
)
match msg:
# if msg is None:
# dedicated loop terminate sentinel
case None:
tasks: dict[
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = actor._rpc_tasks.copy()
log.cancel(
f'Peer IPC channel terminated via `None` setinel msg?\n'
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
for (channel, cid) in tasks:
if channel is chan:
await actor._cancel_task(
cid,
channel,
requesting_uid=channel.uid,
ipc_msg=msg,
)
break
# cid = msg.get('cid')
# if cid:
# msg for an ongoing IPC ctx session, deliver msg to
# local task.
case (
StartAck(cid=cid)
| Started(cid=cid)
| Yield(cid=cid)
| Stop(cid=cid)
| Return(cid=cid)
| Error(cid=cid)
| CancelAck(cid=cid)
| Error(cid=cid) # RPC-task ctx specific
):
# deliver response to local caller/waiter
# via its per-remote-context memory channel.
await actor._push_result(
await actor._deliver_ctx_payload(
chan,
cid,
msg,
)
# `Actor`(-internal) runtime cancel requests
case Start(
ns='self',
func='cancel',
cid=cid,
kwargs=kwargs,
):
kwargs |= {'req_chan': chan}
# XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is
# currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
await _invoke(
actor,
cid,
chan,
actor.cancel,
kwargs,
is_rpc=False,
return_msg=CancelAck,
)
log.runtime(
'Waiting on next IPC msg from\n'
f'peer: {chan.uid}:\n'
'Cancelling IPC transport msg-loop with peer:\n'
f'|_{chan}\n'
# f'last msg: {msg}\n'
)
continue
loop_cs.cancel()
break
# process a 'cmd' request-msg upack
# TODO: impl with native `msgspec.Struct` support !!
# -[ ] implement with ``match:`` syntax?
# -[ ] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
case Start(
ns='self',
func='_cancel_task',
cid=cid,
kwargs=kwargs,
):
target_cid: str = kwargs['cid']
kwargs |= {
'requesting_uid': chan.uid,
'ipc_msg': msg,
# XXX NOTE! ONLY the rpc-task-owning
# parent IPC channel should be able to
# cancel it!
'parent_chan': chan,
}
try:
await _invoke(
actor,
cid,
chan,
actor._cancel_task,
kwargs,
is_rpc=False,
return_msg=CancelAck,
)
except BaseException:
log.exception(
'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n'
f' |_{chan}\n\n'
f'=> {actor}\n'
f' |_cid: {target_cid}\n'
)
# the "MAIN" RPC endpoint to schedule-a-`trio.Task`
# ------ - ------
# -[x] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
case Start(
cid=cid,
ns=ns,
func=funcname,
kwargs=kwargs,
kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid,
):
# try:
# (
# ns,
# funcname,
# kwargs,
# actorid,
# cid,
# ) = msg['cmd']
# # TODO: put in `case Error():` right?
# except KeyError:
# # This is the non-rpc error case, that is, an
# # error **not** raised inside a call to ``_invoke()``
# # (i.e. no cid was provided in the msg - see above).
# # Push this error to all local channel consumers
# # (normally portals) by marking the channel as errored
# assert chan.uid
# exc = unpack_error(msg, chan=chan)
# chan._exc = exc
# raise exc
log.runtime(
'Handling RPC `Start` request from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
# case Start(
# ns='self',
# funcname='cancel',
# ):
# runtime-internal endpoint: `Actor.<funcname>`
# only registry methods exist now yah,
# like ``.register_actor()`` etc. ?
if ns == 'self':
if funcname == 'cancel':
func: Callable = actor.cancel
kwargs |= {
'req_chan': chan,
}
func: Callable = getattr(actor, funcname)
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
log.runtime(
'Cancelling IPC transport msg-loop with peer:\n'
f'|_{chan}\n'
)
loop_cs.cancel()
break
# case Start(
# ns='self',
# funcname='_cancel_task',
# ):
if funcname == '_cancel_task':
func: Callable = actor._cancel_task
# we immediately start the runtime machinery
# shutdown
# with CancelScope(shield=True):
target_cid: str = kwargs['cid']
kwargs |= {
# NOTE: ONLY the rpc-task-owning
# parent IPC channel should be able to
# cancel it!
'parent_chan': chan,
'requesting_uid': chan.uid,
'ipc_msg': msg,
}
# TODO: remove? already have emit in meth.
# log.runtime(
# f'Rx RPC task cancel request\n'
# f'<= canceller: {chan.uid}\n'
# f' |_{chan}\n\n'
# f'=> {actor}\n'
# f' |_cid: {target_cid}\n'
# )
try:
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
except BaseException:
log.exception(
'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n'
f' |_{chan}\n\n'
f'=> {actor}\n'
f' |_cid: {target_cid}\n'
)
continue
# case Start(
# ns='self',
# funcname='register_actor',
# ):
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func: Callable = getattr(actor, funcname)
# case Start(
# ns=str(),
# funcname=funcname,
# ):
# application RPC endpoint
else:
# complain to client about restricted modules
try:
func = actor._get_rpc_func(ns, funcname)
func: Callable = actor._get_rpc_func(
ns,
funcname,
)
except (
ModuleNotExposed,
AttributeError,
) as err:
# always complain to requester
# client about un-enabled modules
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
@ -1061,6 +956,7 @@ async def process_messages(
# schedule a task for the requested RPC function
# in the actor's main "service nursery".
#
# TODO: possibly a service-tn per IPC channel for
# supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks`
@ -1069,7 +965,7 @@ async def process_messages(
f'Spawning task for RPC request\n'
f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n'
# TODO: maddr style repr?
# ^-TODO-^ maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n'
@ -1077,7 +973,6 @@ async def process_messages(
f' |_cid: {cid}\n'
f' |>> {func}()\n'
)
assert actor._service_n # wait why? do it at top?
try:
ctx: Context = await actor._service_n.start(
partial(
@ -1107,13 +1002,12 @@ async def process_messages(
log.warning(
'Task for RPC failed?'
f'|_ {func}()\n\n'
f'{err}'
)
continue
else:
# mark that we have ongoing rpc tasks
# mark our global state with ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event()
# store cancel scope such that the rpc task can be
@ -1124,23 +1018,26 @@ async def process_messages(
trio.Event(),
)
case Error()|_:
# This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()``
# (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers
# (normally portals) by marking the channel as errored
# XXX remote (runtime scoped) error or uknown
# msg (type).
case Error() | _:
# NOTE: this is the non-rpc error case,
# that is, an error **not** raised inside
# a call to ``_invoke()`` (i.e. no cid was
# provided in the msg - see above). Push
# this error to all local channel
# consumers (normally portals) by marking
# the channel as errored
log.exception(
f'Unhandled IPC msg:\n\n'
f'{msg}\n'
)
assert chan.uid
exc = unpack_error(
# assert chan.uid
chan._exc: Exception = unpack_error(
msg,
chan=chan,
)
chan._exc = exc
raise exc
raise chan._exc
log.runtime(
'Waiting on next IPC msg from\n'
@ -1148,10 +1045,12 @@ async def process_messages(
f'|_{chan}\n'
)
# end of async for, channel disconnect vis
# ``trio.EndOfChannel``
# END-OF `async for`:
# IPC disconnected via `trio.EndOfChannel`, likely
# due to a (graceful) `Channel.aclose()`.
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
f'channel for {chan.uid} disconnected, cancelling RPC tasks\n'
f'|_{chan}\n'
)
await actor.cancel_rpc_tasks(
req_uid=actor.uid,
@ -1168,9 +1067,10 @@ async def process_messages(
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
# TODO: don't show this msg if it's an emphemeral
# discovery ep call?
# up..
# TODO: add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
# -[ ] figure out how this will break with other transports?
log.runtime(
f'channel closed abruptly with\n'
f'peer: {chan.uid}\n'

View File

@ -65,7 +65,12 @@ from trio import (
TaskStatus,
)
from .msg import NamespacePath
from tractor.msg import (
pretty_struct,
NamespacePath,
types as msgtypes,
Msg,
)
from ._ipc import Channel
from ._context import (
mk_context,
@ -73,9 +78,10 @@ from ._context import (
)
from .log import get_logger
from ._exceptions import (
unpack_error,
ModuleNotExposed,
ContextCancelled,
ModuleNotExposed,
MsgTypeError,
unpack_error,
TransportClosed,
)
from .devx import (
@ -91,10 +97,6 @@ from ._rpc import (
process_messages,
try_ship_error_to_remote,
)
from tractor.msg import (
types as msgtypes,
pretty_struct,
)
# from tractor.msg.types import (
# Aid,
# SpawnSpec,
@ -164,18 +166,15 @@ class Actor:
# Information about `__main__` from parent
_parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope|None = None
_spawn_spec: SpawnSpec|None = None
_spawn_spec: msgtypes.SpawnSpec|None = None
# syncs for setup/teardown sequences
_server_down: trio.Event|None = None
# user toggled crash handling (including monkey-patched in
# `trio.open_nursery()` via `.trionics._supervisor` B)
_debug_mode: bool = False
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
# TODO: nursery tracking like `trio` does?
# _ans: dict[
# tuple[str, str],
# list[ActorNursery],
@ -396,8 +395,9 @@ class Actor:
raise mne
# TODO: maybe change to mod-func and rename for implied
# multi-transport semantics?
async def _stream_handler(
self,
stream: trio.SocketStream,
@ -559,7 +559,7 @@ class Actor:
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await self._push_result(
await self._deliver_ctx_payload(
chan,
cid,
msg,
@ -716,43 +716,13 @@ class Actor:
# TODO: figure out why this breaks tests..
db_cs.cancel()
# XXX: is this necessary (GC should do it)?
# XXX WARNING XXX
# Be AWARE OF THE INDENT LEVEL HERE
# -> ONLY ENTER THIS BLOCK WHEN ._peers IS
# EMPTY!!!!
if (
not self._peers
and chan.connected()
):
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.runtime(
'Terminating channel with `None` setinel msg\n'
f'|_{chan}\n'
)
try:
# send msg loop terminate sentinel which
# triggers cancellation of all remotely
# started tasks.
await chan.send(None)
# XXX: do we want this? no right?
# causes "[104] connection reset by peer" on other end
# await chan.aclose()
except trio.BrokenResourceError:
log.runtime(f"Channel {chan.uid} was already closed")
# TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD
async def _push_result(
async def _deliver_ctx_payload(
self,
chan: Channel,
cid: str,
msg: dict[str, Any],
msg: Msg|MsgTypeError,
) -> None|bool:
'''
@ -774,12 +744,16 @@ class Actor:
log.warning(
'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n'
f'=> cid: {cid}\n\n'
# XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n'
f'{msg}\n'
f'{pretty_struct.Struct.pformat(msg)}\n'
)
return
# if isinstance(msg, MsgTypeError):
# return await ctx._deliver_bad_msg()
return await ctx._deliver_msg(msg)
def get_context(
@ -1437,7 +1411,7 @@ class Actor:
)
await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None:
def cancel_server(self) -> bool:
'''
Cancel the internal IPC transport server nursery thereby
preventing any new inbound IPC connections establishing.
@ -1446,6 +1420,9 @@ class Actor:
if self._server_n:
log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel()
return True
return False
@property
def accept_addrs(self) -> list[tuple[str, int]]:

View File

@ -46,7 +46,6 @@ from .trionics import (
from tractor.msg import (
Stop,
Yield,
Error,
)
if TYPE_CHECKING:
@ -184,7 +183,7 @@ class MsgStream(trio.abc.Channel):
# - via a received `{'stop': ...}` msg from remote side.
# |_ NOTE: previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel inside
# `Actor._push_result()`, but now the 'stop' message handling
# `Actor._deliver_ctx_payload()`, but now the 'stop' message handling
# has been put just above inside `_raise_from_no_key_in_msg()`.
except (
trio.EndOfChannel,
@ -391,11 +390,11 @@ class MsgStream(trio.abc.Channel):
if not self._eoc:
log.cancel(
'Stream closed before it received an EoC?\n'
'Stream closed by self before it received an EoC?\n'
'Setting eoc manually..\n..'
)
self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by {self._ctx.side}\n'
f'Context stream closed by self({self._ctx.side})\n'
f'|_{self}\n'
)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?

View File

@ -454,6 +454,10 @@ _runtime_msgs: list[Msg] = [
# emission from `MsgStream.aclose()`
Stop,
# `Return` sub-type that we always accept from
# runtime-internal cancel endpoints
CancelAck,
# box remote errors, normally subtypes
# of `RemoteActorError`.
Error,