Compare commits

..

No commits in common. "cf48fdecfeb76cdf67a3f779d98c086e75659dd6" and "b1fd8b2ec36d12cedc79d39e8da27207b964340f" have entirely different histories.

12 changed files with 482 additions and 692 deletions

View File

@ -374,7 +374,7 @@ def enc_type_union(
@tractor.context @tractor.context
async def send_back_values( async def send_back_nsp(
ctx: Context, ctx: Context,
expect_debug: bool, expect_debug: bool,
pld_spec_type_strs: list[str], pld_spec_type_strs: list[str],
@ -388,8 +388,6 @@ async def send_back_values(
and ensure we can round trip a func ref with our parent. and ensure we can round trip a func ref with our parent.
''' '''
uid: tuple = tractor.current_actor().uid
# debug mode sanity check (prolly superfluous but, meh) # debug mode sanity check (prolly superfluous but, meh)
assert expect_debug == _state.debug_mode() assert expect_debug == _state.debug_mode()
@ -416,7 +414,7 @@ async def send_back_values(
) )
print( print(
f'{uid}: attempting `Started`-bytes DECODE..\n' 'CHILD attempting `Started`-bytes DECODE..\n'
) )
try: try:
msg: Started = nsp_codec.decode(started_msg_bytes) msg: Started = nsp_codec.decode(started_msg_bytes)
@ -438,7 +436,7 @@ async def send_back_values(
raise raise
else: else:
print( print(
f'{uid}: (correctly) unable to DECODE `Started`-bytes\n' 'CHILD (correctly) unable to DECODE `Started`-bytes\n'
f'{started_msg_bytes}\n' f'{started_msg_bytes}\n'
) )
@ -447,7 +445,7 @@ async def send_back_values(
for send_value, expect_send in iter_send_val_items: for send_value, expect_send in iter_send_val_items:
try: try:
print( print(
f'{uid}: attempting to `.started({send_value})`\n' f'CHILD attempting to `.started({send_value})`\n'
f'=> expect_send: {expect_send}\n' f'=> expect_send: {expect_send}\n'
f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n' f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n'
f'AND, codec: {codec}\n' f'AND, codec: {codec}\n'
@ -462,6 +460,7 @@ async def send_back_values(
# await tractor.pause() # await tractor.pause()
raise RuntimeError( raise RuntimeError(
# pytest.fail(
f'NOT-EXPECTED able to roundtrip value given spec:\n' f'NOT-EXPECTED able to roundtrip value given spec:\n'
f'ipc_pld_spec -> {ipc_pld_spec}\n' f'ipc_pld_spec -> {ipc_pld_spec}\n'
f'value -> {send_value}: {type(send_value)}\n' f'value -> {send_value}: {type(send_value)}\n'
@ -469,77 +468,54 @@ async def send_back_values(
break # move on to streaming block.. break # move on to streaming block..
except NotImplementedError:
print('FAILED ENCODE!')
except tractor.MsgTypeError: except tractor.MsgTypeError:
# await tractor.pause() # await tractor.pause()
if expect_send: if expect_send:
raise RuntimeError( pytest.fail(
f'EXPECTED to `.started()` value given spec:\n' f'EXPECTED to `.started()` value given spec:\n'
f'ipc_pld_spec -> {ipc_pld_spec}\n' f'ipc_pld_spec -> {ipc_pld_spec}\n'
f'value -> {send_value}: {type(send_value)}\n' f'value -> {send_value}: {type(send_value)}\n'
) )
async with ctx.open_stream() as ipc: async with ctx.open_stream() as ipc:
print(
f'{uid}: Entering streaming block to send remaining values..'
)
for send_value, expect_send in iter_send_val_items: for send_value, expect_send in iter_send_val_items:
send_type: Type = type(send_value) send_type: Type = type(send_value)
print( print(
'------ - ------\n' 'CHILD report on send value\n'
f'{uid}: SENDING NEXT VALUE\n'
f'ipc_pld_spec: {ipc_pld_spec}\n' f'ipc_pld_spec: {ipc_pld_spec}\n'
f'expect_send: {expect_send}\n' f'expect_send: {expect_send}\n'
f'val: {send_value}\n' f'val: {send_value}\n'
'------ - ------\n'
) )
try: try:
await ipc.send(send_value) await ipc.send(send_value)
print(f'***\n{uid}-CHILD sent {send_value!r}\n***\n')
sent.append(send_value) sent.append(send_value)
if not expect_send:
# NOTE: should only raise above on pytest.fail(
# `.started()` or a `Return` f'NOT-EXPECTED able to roundtrip value given spec:\n'
# if not expect_send: f'ipc_pld_spec -> {ipc_pld_spec}\n'
# raise RuntimeError( f'value -> {send_value}: {send_type}\n'
# f'NOT-EXPECTED able to roundtrip value given spec:\n' )
# f'ipc_pld_spec -> {ipc_pld_spec}\n'
# f'value -> {send_value}: {send_type}\n'
# )
except ValidationError: except ValidationError:
print(f'{uid} FAILED TO SEND {send_value}!')
# await tractor.pause()
if expect_send: if expect_send:
raise RuntimeError( pytest.fail(
f'EXPECTED to roundtrip value given spec:\n' f'EXPECTED to roundtrip value given spec:\n'
f'ipc_pld_spec -> {ipc_pld_spec}\n' f'ipc_pld_spec -> {ipc_pld_spec}\n'
f'value -> {send_value}: {send_type}\n' f'value -> {send_value}: {send_type}\n'
) )
# continue continue
else: assert (
print( len(sent)
f'{uid}: finished sending all values\n' ==
'Should be exiting stream block!\n' len([val
for val, expect in
expect_ipc_send.values()
if expect is True])
) )
print(f'{uid}: exited streaming block!')
# TODO: this won't be true bc in streaming phase we DO NOT
# msgspec check outbound msgs!
# -[ ] once we implement the receiver side `InvalidMsg`
# then we can expect it here?
# assert (
# len(sent)
# ==
# len([val
# for val, expect in
# expect_ipc_send.values()
# if expect is True])
# )
def ex_func(*args): def ex_func(*args):
print(f'ex_func({args})') print(f'ex_func({args})')
@ -659,7 +635,7 @@ def test_codec_hooks_mod(
async with ( async with (
p.open_context( p.open_context(
send_back_values, send_back_nsp,
expect_debug=debug_mode, expect_debug=debug_mode,
pld_spec_type_strs=pld_spec_type_strs, pld_spec_type_strs=pld_spec_type_strs,
add_hooks=add_codec_hooks, add_hooks=add_codec_hooks,
@ -689,13 +665,10 @@ def test_codec_hooks_mod(
async for next_sent in ipc: async for next_sent in ipc:
print( print(
'Parent: child sent next value\n' 'Child sent next value\n'
f'{next_sent}: {type(next_sent)}\n' f'{next_sent}: {type(next_sent)}\n'
) )
if expect_to_send:
expect_to_send.remove(next_sent) expect_to_send.remove(next_sent)
else:
print('PARENT should terminate stream loop + block!')
# all sent values should have arrived! # all sent values should have arrived!
assert not expect_to_send assert not expect_to_send

View File

@ -796,12 +796,10 @@ async def test_callee_cancels_before_started(
# raises a special cancel signal # raises a special cancel signal
except tractor.ContextCancelled as ce: except tractor.ContextCancelled as ce:
_ce = ce # for debug on crash
ce.boxed_type == trio.Cancelled ce.boxed_type == trio.Cancelled
# the traceback should be informative # the traceback should be informative
assert 'itself' in ce.tb_str assert 'itself' in ce.msgdata['tb_str']
assert ce.tb_str == ce.msgdata['tb_str']
# teardown the actor # teardown the actor
await portal.cancel_actor() await portal.cancel_actor()
@ -1159,8 +1157,7 @@ def test_maybe_allow_overruns_stream(
elif slow_side == 'parent': elif slow_side == 'parent':
assert err.boxed_type == tractor.RemoteActorError assert err.boxed_type == tractor.RemoteActorError
assert 'StreamOverrun' in err.tb_str assert 'StreamOverrun' in err.msgdata['tb_str']
assert err.tb_str == err.msgdata['tb_str']
else: else:
# if this hits the logic blocks from above are not # if this hits the logic blocks from above are not

View File

@ -185,10 +185,6 @@ async def sleep_a_bit_then_cancel_peer(
await trio.sleep(cancel_after) await trio.sleep(cancel_after)
await peer.cancel_actor() await peer.cancel_actor()
# such that we're cancelled by our rent ctx-task
await trio.sleep(3)
print('CANCELLER RETURNING!')
@tractor.context @tractor.context
async def stream_ints( async def stream_ints(
@ -249,12 +245,6 @@ async def stream_from_peer(
assert peer_ctx._remote_error is ctxerr assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
# XXX YES, bc exact same msg instances
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
# XXX NO, bc new one always created for property accesss
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
# the peer ctx is the canceller even though it's canceller # the peer ctx is the canceller even though it's canceller
# is the "canceller" XD # is the "canceller" XD
assert peer_name in peer_ctx.canceller assert peer_name in peer_ctx.canceller

View File

@ -44,10 +44,9 @@ from ._state import (
is_root_process as is_root_process, is_root_process as is_root_process,
) )
from ._exceptions import ( from ._exceptions import (
ContextCancelled as ContextCancelled,
ModuleNotExposed as ModuleNotExposed,
MsgTypeError as MsgTypeError,
RemoteActorError as RemoteActorError, RemoteActorError as RemoteActorError,
ModuleNotExposed as ModuleNotExposed,
ContextCancelled as ContextCancelled,
) )
from .devx import ( from .devx import (
breakpoint as breakpoint, breakpoint as breakpoint,

View File

@ -1207,7 +1207,7 @@ class Context:
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside # context from the runtime msg loop otherwise inside
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in # ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via # the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock # a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the # where the root is waiting on the lock to clear but the
@ -1698,11 +1698,11 @@ class Context:
# raise any msg type error NO MATTER WHAT! # raise any msg type error NO MATTER WHAT!
except msgspec.ValidationError as verr: except msgspec.ValidationError as verr:
from tractor._ipc import _mk_msg_type_err from tractor._ipc import _raise_msg_type_err
raise _mk_msg_type_err( _raise_msg_type_err(
msg=msg_bytes, msg=msg_bytes,
codec=codec, codec=codec,
src_validation_error=verr, validation_err=verr,
verb_header='Trying to send payload' verb_header='Trying to send payload'
# > 'invalid `Started IPC msgs\n' # > 'invalid `Started IPC msgs\n'
) )
@ -2415,7 +2415,7 @@ async def open_context_from_portal(
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside # context from the runtime msg loop otherwise inside
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in # ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via # the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock # a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the # where the root is waiting on the lock to clear but the

View File

@ -31,10 +31,7 @@ import textwrap
import traceback import traceback
import trio import trio
from msgspec import ( from msgspec import structs
structs,
defstruct,
)
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
@ -43,8 +40,6 @@ from tractor.msg import (
Msg, Msg,
Stop, Stop,
Yield, Yield,
pretty_struct,
types as msgtypes,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -69,38 +64,21 @@ class InternalError(RuntimeError):
''' '''
_body_fields: list[str] = [
'boxed_type',
'src_type',
# TODO: format this better if we're going to include it.
# 'relay_path',
'src_uid',
# NOTE: more or less should be close to these: # only in sub-types
# 'boxed_type', 'canceller',
# 'src_type', 'sender',
# 'src_uid',
# 'canceller',
# 'sender',
# TODO: format this better if we're going to include it.
# 'relay_path',
#
_ipcmsg_keys: list[str] = [
fi.name
for fi, k, v
in pretty_struct.iter_fields(Error)
] ]
_body_fields: list[str] = list( _msgdata_keys: list[str] = [
set(_ipcmsg_keys)
# NOTE: don't show fields that either don't provide
# any extra useful info or that are already shown
# as part of `.__repr__()` output.
- {
'src_type_str',
'boxed_type_str', 'boxed_type_str',
'tb_str', ] + _body_fields
'relay_path',
'_msg_dict',
'cid',
}
)
def get_err_type(type_name: str) -> BaseException|None: def get_err_type(type_name: str) -> BaseException|None:
@ -174,40 +152,10 @@ def pformat_boxed_tb(
+ +
body body
) )
# return body
def pack_from_raise( # TODO: rename to just `RemoteError`?
local_err: (
ContextCancelled
|StreamOverrun
|MsgTypeError
),
cid: str,
**rae_fields,
) -> Error:
'''
Raise the provided `RemoteActorError` subtype exception
instance locally to get a traceback and pack it into an IPC
`Error`-msg using `pack_error()` to extract the tb info.
'''
try:
raise local_err
except type(local_err) as local_err:
err_msg: dict[str, dict] = pack_error(
local_err,
cid=cid,
**rae_fields,
)
return err_msg
# TODO: better compat with IPC msg structs?
# -[ ] rename to just `RemoteError` like in `mp.manager`?
# -[ ] make a `Struct`-subtype by using the .__post_init__()`?
# https://jcristharif.com/msgspec/structs.html#post-init-processing
class RemoteActorError(Exception): class RemoteActorError(Exception):
''' '''
A box(ing) type which bundles a remote actor `BaseException` for A box(ing) type which bundles a remote actor `BaseException` for
@ -222,28 +170,12 @@ class RemoteActorError(Exception):
'src_uid', 'src_uid',
# 'relay_path', # 'relay_path',
] ]
extra_body_fields: list[str] = [
'cid',
'boxed_type',
]
def __init__( def __init__(
self, self,
message: str, message: str,
ipc_msg: Error|None = None,
boxed_type: Type[BaseException]|None = None, boxed_type: Type[BaseException]|None = None,
**msgdata
# NOTE: only provided by subtypes (ctxc and overruns)
# wishing to both manually instantiate and add field
# values defined on `Error` without having to construct an
# `Error()` before the exception is processed by
# `pack_error()`.
#
# TODO: a better way to support this without the extra
# private `._extra_msgdata`?
# -[ ] ctxc constructed inside `._rpc._invoke()` L:638
# -[ ] overrun @ `._context.Context._deliver_msg()` L:1958
**extra_msgdata,
) -> None: ) -> None:
super().__init__(message) super().__init__(message)
@ -256,24 +188,14 @@ class RemoteActorError(Exception):
# - .remote_type # - .remote_type
# also pertains to our long long oustanding issue XD # also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5 # https://github.com/goodboy/tractor/issues/5
#
# TODO: always set ._boxed_type` as `None` by default
# and instead render if from `.boxed_type_str`?
self._boxed_type: BaseException = boxed_type self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None self._src_type: BaseException|None = None
self._ipc_msg: Error|None = ipc_msg
if ( # TODO: make this a `.errmsg: Error` throughout?
extra_msgdata self.msgdata: dict[str, Any] = msgdata
and ipc_msg
):
# XXX mutate the orig msg directly from
# manually provided input params.
for k, v in extra_msgdata.items():
setattr(
self._ipc_msg,
k,
v,
)
else:
self._extra_msgdata = extra_msgdata
# TODO: mask out eventually or place in `pack_error()` # TODO: mask out eventually or place in `pack_error()`
# pre-`return` lines? # pre-`return` lines?
@ -292,55 +214,13 @@ class RemoteActorError(Exception):
# either by customizing `ContextCancelled.__init__()` or # either by customizing `ContextCancelled.__init__()` or
# through a special factor func? # through a special factor func?
elif boxed_type: elif boxed_type:
boxed_type_str: str = type(boxed_type).__name__ if not self.msgdata.get('boxed_type_str'):
if ( self.msgdata['boxed_type_str'] = str(
ipc_msg type(boxed_type).__name__
and not self._ipc_msg.boxed_type_str
):
self._ipc_msg.boxed_type_str = boxed_type_str
assert self.boxed_type_str == self._ipc_msg.boxed_type_str
else:
self._extra_msgdata['boxed_type_str'] = boxed_type_str
assert self.boxed_type is boxed_type
@property
def ipc_msg(self) -> pretty_struct.Struct:
'''
Re-render the underlying `._ipc_msg: Msg` as
a `pretty_struct.Struct` for introspection such that the
returned value is a read-only copy of the original.
'''
if self._ipc_msg is None:
return None
msg_type: Msg = type(self._ipc_msg)
fields: dict[str, Any] = {
k: v for _, k, v in
pretty_struct.iter_fields(self._ipc_msg)
}
return defstruct(
msg_type.__name__,
fields=fields.keys(),
bases=(msg_type, pretty_struct.Struct),
)(**fields)
@property
def msgdata(self) -> dict[str, Any]:
'''
The (remote) error data provided by a merge of the
`._ipc_msg: Error` msg and any input `._extra_msgdata: dict`
(provided by subtypes via `.__init__()`).
'''
msgdata: dict = (
structs.asdict(self._ipc_msg)
if self._ipc_msg
else {}
) )
return self._extra_msgdata | msgdata
assert self.boxed_type_str == self.msgdata['boxed_type_str']
assert self.boxed_type is boxed_type
@property @property
def src_type_str(self) -> str: def src_type_str(self) -> str:
@ -351,7 +231,7 @@ class RemoteActorError(Exception):
at the first relay/hop's receiving actor. at the first relay/hop's receiving actor.
''' '''
return self._ipc_msg.src_type_str return self.msgdata['src_type_str']
@property @property
def src_type(self) -> str: def src_type(self) -> str:
@ -361,7 +241,7 @@ class RemoteActorError(Exception):
''' '''
if self._src_type is None: if self._src_type is None:
self._src_type = get_err_type( self._src_type = get_err_type(
self._ipc_msg.src_type_str self.msgdata['src_type_str']
) )
return self._src_type return self._src_type
@ -372,7 +252,7 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type. String-name of the (last hop's) boxed error type.
''' '''
return self._ipc_msg.boxed_type_str return self.msgdata['boxed_type_str']
@property @property
def boxed_type(self) -> str: def boxed_type(self) -> str:
@ -382,7 +262,7 @@ class RemoteActorError(Exception):
''' '''
if self._boxed_type is None: if self._boxed_type is None:
self._boxed_type = get_err_type( self._boxed_type = get_err_type(
self._ipc_msg.boxed_type_str self.msgdata['boxed_type_str']
) )
return self._boxed_type return self._boxed_type
@ -395,45 +275,41 @@ class RemoteActorError(Exception):
actor's hop. actor's hop.
NOTE: a `list` field with the same name is expected to be NOTE: a `list` field with the same name is expected to be
passed/updated in `.ipc_msg`. passed/updated in `.msgdata`.
''' '''
return self._ipc_msg.relay_path return self.msgdata['relay_path']
@property @property
def relay_uid(self) -> tuple[str, str]|None: def relay_uid(self) -> tuple[str, str]|None:
return tuple( return tuple(
self._ipc_msg.relay_path[-1] self.msgdata['relay_path'][-1]
) )
@property @property
def src_uid(self) -> tuple[str, str]|None: def src_uid(self) -> tuple[str, str]|None:
if src_uid := ( if src_uid := (
self._ipc_msg.src_uid self.msgdata.get('src_uid')
): ):
return tuple(src_uid) return tuple(src_uid)
# TODO: use path lookup instead? # TODO: use path lookup instead?
# return tuple( # return tuple(
# self._ipc_msg.relay_path[0] # self.msgdata['relay_path'][0]
# ) # )
@property @property
def tb_str( def tb_str(
self, self,
indent: str = '', indent: str = ' ',
) -> str: ) -> str:
remote_tb: str = '' if remote_tb := self.msgdata.get('tb_str'):
if self._ipc_msg:
remote_tb: str = self._ipc_msg.tb_str
else:
remote_tb = self.msgdata.get('tb_str')
return textwrap.indent( return textwrap.indent(
remote_tb or '', remote_tb,
prefix=indent, prefix=indent,
) )
return ''
def _mk_fields_str( def _mk_fields_str(
self, self,
fields: list[str], fields: list[str],
@ -444,17 +320,14 @@ class RemoteActorError(Exception):
val: Any|None = ( val: Any|None = (
getattr(self, key, None) getattr(self, key, None)
or or
getattr( self.msgdata.get(key)
self._ipc_msg,
key,
None,
)
) )
# TODO: for `.relay_path` on multiline? # TODO: for `.relay_path` on multiline?
# if not isinstance(val, str): # if not isinstance(val, str):
# val_str = pformat(val) # val_str = pformat(val)
# else: # else:
val_str: str = repr(val) val_str: str = repr(val)
if val: if val:
_repr += f'{key}={val_str}{end_char}' _repr += f'{key}={val_str}{end_char}'
@ -485,9 +358,7 @@ class RemoteActorError(Exception):
''' '''
fields: str = self._mk_fields_str( fields: str = self._mk_fields_str(
_body_fields _body_fields,
+
self.extra_body_fields,
) )
body: str = pformat_boxed_tb( body: str = pformat_boxed_tb(
tb_str=self.tb_str, tb_str=self.tb_str,
@ -544,6 +415,15 @@ class RemoteActorError(Exception):
# raise NotImplementedError # raise NotImplementedError
class InternalActorError(RemoteActorError):
'''
(Remote) internal `tractor` error indicating failure of some
primitive, machinery state or lowlevel task that should never
occur.
'''
class ContextCancelled(RemoteActorError): class ContextCancelled(RemoteActorError):
''' '''
Inter-actor task context was cancelled by either a call to Inter-actor task context was cancelled by either a call to
@ -553,10 +433,6 @@ class ContextCancelled(RemoteActorError):
reprol_fields: list[str] = [ reprol_fields: list[str] = [
'canceller', 'canceller',
] ]
extra_body_fields: list[str] = [
'cid',
'canceller',
]
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str]|None:
''' '''
@ -578,7 +454,7 @@ class ContextCancelled(RemoteActorError):
|_`._cancel_task()` |_`._cancel_task()`
''' '''
value: tuple[str, str]|None = self._ipc_msg.canceller value = self.msgdata.get('canceller')
if value: if value:
return tuple(value) return tuple(value)
@ -592,132 +468,6 @@ class ContextCancelled(RemoteActorError):
# src_actor_uid = canceller # src_actor_uid = canceller
class MsgTypeError(
RemoteActorError,
):
'''
Equivalent of a runtime `TypeError` for IPC dialogs.
Raise when any IPC wire-message is decoded to have invalid
field values (due to type) or for other `MsgCodec` related
violations such as having no extension-type for a field with
a custom type but no `enc/dec_hook()` support.
Can be raised on the send or recv side of an IPC `Channel`
depending on the particular msg.
Msgs which cause this to be raised on the `.send()` side (aka
in the "ctl" dialog phase) include:
- `Start`
- `Started`
- `Return`
Those which cause it on on the `.recv()` side (aka the "nasty
streaming" dialog phase) are:
- `Yield`
- TODO: any embedded `.pld` type defined by user code?
Normally the source of an error is re-raised from some `.msg._codec`
decode which itself raises in a backend interchange
lib (eg. a `msgspec.ValidationError`).
'''
reprol_fields: list[str] = [
'ipc_msg',
]
extra_body_fields: list[str] = [
'cid',
'payload_msg',
]
@property
def msg_dict(self) -> dict[str, Any]:
'''
If the underlying IPC `Msg` was received from a remote
actor but was unable to be decoded to a native
`Yield`|`Started`|`Return` struct, the interchange backend
native format decoder can be used to stash a `dict`
version for introspection by the invalidating RPC task.
'''
return self.msgdata.get('_msg_dict')
@property
def payload_msg(self) -> Msg|None:
'''
Attempt to construct what would have been the original
`Msg`-with-payload subtype (i.e. an instance from the set
of msgs in `.msg.types._payload_msgs`) which failed
validation.
'''
msg_dict: dict = self.msg_dict.copy()
name: str = msg_dict.pop('msg_type')
msg_type: Msg = getattr(
msgtypes,
name,
Msg,
)
return msg_type(**msg_dict)
@property
def cid(self) -> str:
# pre-packed using `.from_decode()` constructor
return self.msgdata.get('cid')
@classmethod
def from_decode(
cls,
message: str,
msgdict: dict,
) -> MsgTypeError:
return cls(
message=message,
# NOTE: original "vanilla decode" of the msg-bytes
# is placed inside a value readable from
# `.msgdata['_msg_dict']`
_msg_dict=msgdict,
# expand and pack all RAE compat fields
# into the `._extra_msgdata` aux `dict`.
**{
k: v
for k, v in msgdict.items()
if k in _ipcmsg_keys
},
)
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
reprol_fields: list[str] = [
'sender',
]
'''
This stream was overrun by its sender and can be optionally
handled by app code using `MsgStream.send()/.receive()`.
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self._ipc_msg.sender
if value:
return tuple(value)
# class InternalActorError(RemoteActorError):
# '''
# Boxed (Remote) internal `tractor` error indicating failure of some
# primitive, machinery state or lowlevel task that should never
# occur.
# '''
class TransportClosed(trio.ClosedResourceError): class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use" "Underlying channel transport was closed prior to use"
@ -734,6 +484,23 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
class StreamOverrun(
RemoteActorError,
trio.TooSlowError,
):
reprol_fields: list[str] = [
'sender',
]
'''
This stream was overrun by sender
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self.msgdata.get('sender')
if value:
return tuple(value)
class AsyncioCancelled(Exception): class AsyncioCancelled(Exception):
''' '''
@ -751,12 +518,23 @@ class MessagingError(Exception):
''' '''
class MsgTypeError(MessagingError):
'''
Equivalent of a `TypeError` for an IPC wire-message
due to an invalid field value (type).
Normally this is re-raised from some `.msg._codec`
decode error raised by a backend interchange lib
like `msgspec` or `pycapnproto`.
'''
def pack_error( def pack_error(
exc: BaseException|RemoteActorError, exc: BaseException|RemoteActorError,
tb: str|None = None, tb: str|None = None,
cid: str|None = None, cid: str|None = None,
src_uid: tuple[str, str]|None = None,
) -> Error: ) -> Error:
''' '''
@ -782,8 +560,7 @@ def pack_error(
): ):
error_msg.update(exc.msgdata) error_msg.update(exc.msgdata)
# an onion/inception we need to pack as a nested and relayed # an onion/inception we need to pack
# remotely boxed error.
if ( if (
type(exc) is RemoteActorError type(exc) is RemoteActorError
and (boxed := exc.boxed_type) and (boxed := exc.boxed_type)
@ -807,7 +584,7 @@ def pack_error(
error_msg['boxed_type_str'] = 'RemoteActorError' error_msg['boxed_type_str'] = 'RemoteActorError'
else: else:
error_msg['src_uid'] = src_uid or our_uid error_msg['src_uid'] = our_uid
error_msg['src_type_str'] = type(exc).__name__ error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__ error_msg['boxed_type_str'] = type(exc).__name__
@ -819,7 +596,7 @@ def pack_error(
# XXX NOTE: always ensure the traceback-str is from the # XXX NOTE: always ensure the traceback-str is from the
# locally raised error (**not** the prior relay's boxed # locally raised error (**not** the prior relay's boxed
# content's in `._ipc_msg.tb_str`). # content's `.msgdata`).
error_msg['tb_str'] = tb_str error_msg['tb_str'] = tb_str
if cid is not None: if cid is not None:
@ -829,7 +606,7 @@ def pack_error(
def unpack_error( def unpack_error(
msg: Error, msg: dict[str, Any]|Error,
chan: Channel|None = None, chan: Channel|None = None,
box_type: RemoteActorError = RemoteActorError, box_type: RemoteActorError = RemoteActorError,
@ -847,10 +624,16 @@ def unpack_error(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
error_dict: dict[str, dict]|None
if not isinstance(msg, Error): if not isinstance(msg, Error):
# if (
# error_dict := msg.get('error')
# ) is None:
# no error field, nothing to unpack.
return None return None
# retrieve the remote error's encoded details from fields # retrieve the remote error's msg encoded details
# tb_str: str = error_dict.get('tb_str', '')
tb_str: str = msg.tb_str tb_str: str = msg.tb_str
message: str = ( message: str = (
f'{chan.uid}\n' f'{chan.uid}\n'
@ -868,10 +651,6 @@ def unpack_error(
box_type = ContextCancelled box_type = ContextCancelled
assert boxed_type is box_type assert boxed_type is box_type
elif boxed_type_str == 'MsgTypeError':
box_type = MsgTypeError
assert boxed_type is box_type
# TODO: already included by `_this_mod` in else loop right? # TODO: already included by `_this_mod` in else loop right?
# #
# we have an inception/onion-error so ensure # we have an inception/onion-error so ensure
@ -882,9 +661,12 @@ def unpack_error(
# assert len(error_dict['relay_path']) >= 1 # assert len(error_dict['relay_path']) >= 1
assert len(msg.relay_path) >= 1 assert len(msg.relay_path) >= 1
# TODO: mk RAE just take the `Error` instance directly?
error_dict: dict = structs.asdict(msg)
exc = box_type( exc = box_type(
message, message,
ipc_msg=msg, **error_dict,
) )
return exc return exc

View File

@ -54,8 +54,7 @@ from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
_codec, _codec,
MsgCodec, MsgCodec,
types as msgtypes, types,
pretty_struct,
) )
log = get_logger(__name__) log = get_logger(__name__)
@ -73,7 +72,6 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple:
) )
# TODO: this should be our `Union[*msgtypes.__spec__]` now right?
MsgType = TypeVar("MsgType") MsgType = TypeVar("MsgType")
# TODO: consider using a generic def and indexing with our eventual # TODO: consider using a generic def and indexing with our eventual
@ -118,74 +116,6 @@ class MsgTransport(Protocol[MsgType]):
... ...
def _raise_msg_type_err(
msg: Any|bytes,
codec: MsgCodec,
validation_err: msgspec.ValidationError|None = None,
verb_header: str = '',
) -> None:
# if side == 'send':
if validation_err is None: # send-side
import traceback
from tractor._exceptions import pformat_boxed_tb
fmt_spec: str = '\n'.join(
map(str, codec.msg_spec.__args__)
)
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ',
indent='',
)
raise MsgTypeError(
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n'
f'{fmt_spec}\n'
)
else:
# decode the msg-bytes using the std msgpack
# interchange-prot (i.e. without any
# `msgspec.Struct` handling) so that we can
# determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgspec.msgpack.decode(msg)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(msgtypes, msg_type_name)
errmsg: str = (
f'invalid `{msg_type_name}` IPC msg\n\n'
)
if verb_header:
errmsg = f'{verb_header} ' + errmsg
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = validation_err.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_type: Union[Type] = msg_type.__signature__.parameters[
maybe_field
].annotation
errmsg += (
f'{msg.rstrip("`")}\n\n'
f'{msg_type}\n'
f' |_.{maybe_field}: {field_type} = {field_val!r}\n'
)
raise MsgTypeError(errmsg) from validation_err
# TODO: not sure why we have to inherit here, but it seems to be an # TODO: not sure why we have to inherit here, but it seems to be an
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; # issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# probably should make a `mypy` issue? # probably should make a `mypy` issue?
@ -245,10 +175,9 @@ class MsgpackTCPStream(MsgTransport):
or or
_codec._ctxvar_MsgCodec.get() _codec._ctxvar_MsgCodec.get()
) )
# TODO: mask out before release? log.critical(
log.runtime( '!?!: USING STD `tractor` CODEC !?!?\n'
f'New {self} created with codec\n' f'{self._codec}\n'
f'codec: {self._codec}\n'
) )
async def _iter_packets(self) -> AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
@ -292,18 +221,16 @@ class MsgpackTCPStream(MsgTransport):
# NOTE: lookup the `trio.Task.context`'s var for # NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`. # the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get() codec: MsgCodec = _ctxvar_MsgCodec.get()
# TODO: mask out before release?
if self._codec.pld_spec != codec.pld_spec: if self._codec.pld_spec != codec.pld_spec:
# assert ( # assert (
# task := trio.lowlevel.current_task() # task := trio.lowlevel.current_task()
# ) is not self._task # ) is not self._task
# self._task = task # self._task = task
self._codec = codec self._codec = codec
log.runtime( log.critical(
'Using new codec in {self}.recv()\n' '.recv() USING NEW CODEC !?!?\n'
f'codec: {self._codec}\n\n' f'{self._codec}\n\n'
f'msg_bytes: {msg_bytes}\n' f'msg_bytes -> {msg_bytes}\n'
) )
yield codec.decode(msg_bytes) yield codec.decode(msg_bytes)
@ -325,13 +252,36 @@ class MsgpackTCPStream(MsgTransport):
# and always raise such that spec violations # and always raise such that spec violations
# are never allowed to be caught silently! # are never allowed to be caught silently!
except msgspec.ValidationError as verr: except msgspec.ValidationError as verr:
# re-raise as type error
_raise_msg_type_err( # decode the msg-bytes using the std msgpack
msg=msg_bytes, # interchange-prot (i.e. without any
codec=codec, # `msgspec.Struct` handling) so that we can
validation_err=verr, # determine what `.msg.types.Msg` is the culprit
# by reporting the received value.
msg_dict: dict = msgspec.msgpack.decode(msg_bytes)
msg_type_name: str = msg_dict['msg_type']
msg_type = getattr(types, msg_type_name)
errmsg: str = (
f'Received invalid IPC `{msg_type_name}` msg\n\n'
) )
# XXX see if we can determine the exact invalid field
# such that we can comprehensively report the
# specific field's type problem
msgspec_msg: str = verr.args[0].rstrip('`')
msg, _, maybe_field = msgspec_msg.rpartition('$.')
if field_val := msg_dict.get(maybe_field):
field_type: Union[Type] = msg_type.__signature__.parameters[
maybe_field
].annotation
errmsg += (
f'{msg.rstrip("`")}\n\n'
f'{msg_type}\n'
f' |_.{maybe_field}: {field_type} = {field_val}\n'
)
raise MsgTypeError(errmsg) from verr
except ( except (
msgspec.DecodeError, msgspec.DecodeError,
UnicodeDecodeError, UnicodeDecodeError,
@ -357,16 +307,12 @@ class MsgpackTCPStream(MsgTransport):
async def send( async def send(
self, self,
msg: msgtypes.Msg, msg: Any,
strict_types: bool = True,
# hide_tb: bool = False, # hide_tb: bool = False,
) -> None: ) -> None:
''' '''
Send a msgpack encoded py-object-blob-as-msg over TCP. Send a msgpack coded blob-as-msg over TCP.
If `strict_types == True` then a `MsgTypeError` will be raised on any
invalid msg type
''' '''
# __tracebackhide__: bool = hide_tb # __tracebackhide__: bool = hide_tb
@ -375,40 +321,25 @@ class MsgpackTCPStream(MsgTransport):
# NOTE: lookup the `trio.Task.context`'s var for # NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`. # the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get() codec: MsgCodec = _ctxvar_MsgCodec.get()
# if self._codec != codec:
# TODO: mask out before release?
if self._codec.pld_spec != codec.pld_spec: if self._codec.pld_spec != codec.pld_spec:
self._codec = codec self._codec = codec
log.runtime( log.critical(
'Using new codec in {self}.send()\n' '.send() using NEW CODEC !?!?\n'
f'codec: {self._codec}\n\n' f'{self._codec}\n\n'
f'msg: {msg}\n' f'OBJ -> {msg}\n'
) )
if type(msg) not in types.__spec__:
if type(msg) not in msgtypes.__msg_types__:
if strict_types:
_raise_msg_type_err(
msg,
codec=codec,
)
else:
log.warning( log.warning(
'Sending non-`Msg`-spec msg?\n\n' 'Sending non-`Msg`-spec msg?\n\n'
f'{msg}\n' f'{msg}\n'
) )
try:
bytes_data: bytes = codec.encode(msg) bytes_data: bytes = codec.encode(msg)
except TypeError as typerr:
raise MsgTypeError(
'A msg field violates the current spec\n'
f'{codec.pld_spec}\n\n'
f'{pretty_struct.Struct.pformat(msg)}'
) from typerr
# supposedly the fastest says, # supposedly the fastest says,
# https://stackoverflow.com/a/54027962 # https://stackoverflow.com/a/54027962
size: bytes = struct.pack("<I", len(bytes_data)) size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
@property @property
@ -636,6 +567,7 @@ class Channel:
f'{pformat(payload)}\n' f'{pformat(payload)}\n'
) # type: ignore ) # type: ignore
assert self._transport assert self._transport
await self._transport.send( await self._transport.send(
payload, payload,
# hide_tb=hide_tb, # hide_tb=hide_tb,
@ -645,11 +577,6 @@ class Channel:
assert self._transport assert self._transport
return await self._transport.recv() return await self._transport.recv()
# TODO: auto-reconnect features like 0mq/nanomsg?
# -[ ] implement it manually with nods to SC prot
# possibly on multiple transport backends?
# -> seems like that might be re-inventing scalability
# prots tho no?
# try: # try:
# return await self._transport.recv() # return await self._transport.recv()
# except trio.BrokenResourceError: # except trio.BrokenResourceError:

View File

@ -502,7 +502,7 @@ async def open_portal(
''' '''
actor = current_actor() actor = current_actor()
assert actor assert actor
was_connected: bool = False was_connected = False
async with maybe_open_nursery(nursery, shield=shield) as nursery: async with maybe_open_nursery(nursery, shield=shield) as nursery:
@ -533,7 +533,9 @@ async def open_portal(
await portal.aclose() await portal.aclose()
if was_connected: if was_connected:
await channel.aclose() # gracefully signal remote channel-msg loop
await channel.send(None)
# await channel.aclose()
# cancel background msg loop task # cancel background msg loop task
if msg_loop_cs: if msg_loop_cs:

View File

@ -55,21 +55,20 @@ from ._exceptions import (
TransportClosed, TransportClosed,
) )
from .devx import ( from .devx import (
pause,
maybe_wait_for_debugger, maybe_wait_for_debugger,
_debug, _debug,
) )
from . import _state from . import _state
from .log import get_logger from .log import get_logger
from tractor.msg.types import ( from tractor.msg.types import (
CancelAck,
Error,
Msg,
Return,
Start, Start,
StartAck, StartAck,
Started, Started,
Stop, Stop,
Yield, Yield,
Return,
Error,
) )
@ -91,7 +90,6 @@ async def _invoke_non_context(
treat_as_gen: bool, treat_as_gen: bool,
is_rpc: bool, is_rpc: bool,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -100,6 +98,7 @@ async def _invoke_non_context(
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
# await chan.send({
await chan.send( await chan.send(
StartAck( StartAck(
cid=cid, cid=cid,
@ -125,6 +124,11 @@ async def _invoke_non_context(
# to_send = await chan.recv_nowait() # to_send = await chan.recv_nowait()
# if to_send is not None: # if to_send is not None:
# to_yield = await coro.asend(to_send) # to_yield = await coro.asend(to_send)
# await chan.send({
# # Yield()
# 'cid': cid,
# 'yield': item,
# })
await chan.send( await chan.send(
Yield( Yield(
cid=cid, cid=cid,
@ -139,6 +143,11 @@ async def _invoke_non_context(
await chan.send( await chan.send(
Stop(cid=cid) Stop(cid=cid)
) )
# await chan.send({
# # Stop(
# 'cid': cid,
# 'stop': True,
# })
# one way @stream func that gets treated like an async gen # one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
@ -149,6 +158,11 @@ async def _invoke_non_context(
functype='asyncgen', functype='asyncgen',
) )
) )
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'asyncgen',
# })
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must # back values like an async-generator would but must
# manualy construct the response dict-packet-responses as # manualy construct the response dict-packet-responses as
@ -164,6 +178,11 @@ async def _invoke_non_context(
await chan.send( await chan.send(
Stop(cid=cid) Stop(cid=cid)
) )
# await chan.send({
# # Stop(
# 'cid': cid,
# 'stop': True,
# })
else: else:
# regular async function/method # regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()` # XXX: possibly just a scheduled `Actor._cancel_task()`
@ -181,6 +200,11 @@ async def _invoke_non_context(
functype='asyncfunc', functype='asyncfunc',
) )
) )
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'asyncfunc',
# })
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
@ -214,8 +238,13 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
# await chan.send({
# # Return()
# 'cid': cid,
# 'return': result,
# })
await chan.send( await chan.send(
return_msg( Return(
cid=cid, cid=cid,
pld=result, pld=result,
) )
@ -380,7 +409,6 @@ async def _invoke(
is_rpc: bool = True, is_rpc: bool = True,
hide_tb: bool = True, hide_tb: bool = True,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -401,6 +429,8 @@ async def _invoke(
# XXX for .pause_from_sync()` usage we need to make sure # XXX for .pause_from_sync()` usage we need to make sure
# `greenback` is boostrapped in the subactor! # `greenback` is boostrapped in the subactor!
await _debug.maybe_init_greenback() await _debug.maybe_init_greenback()
# else:
# await pause()
# TODO: possibly a specially formatted traceback # TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)? # (not sure what typing is for this..)?
@ -490,7 +520,6 @@ async def _invoke(
kwargs, kwargs,
treat_as_gen, treat_as_gen,
is_rpc, is_rpc,
return_msg,
task_status, task_status,
) )
# below is only for `@context` funcs # below is only for `@context` funcs
@ -521,6 +550,11 @@ async def _invoke(
functype='context', functype='context',
) )
) )
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'context',
# })
# TODO: should we also use an `.open_context()` equiv # TODO: should we also use an `.open_context()` equiv
# for this callee side by factoring the impl from # for this callee side by factoring the impl from
@ -545,11 +579,16 @@ async def _invoke(
# deliver final result to caller side. # deliver final result to caller side.
await chan.send( await chan.send(
return_msg( Return(
cid=cid, cid=cid,
pld=res, pld=res,
) )
) )
# await chan.send({
# # Return()
# 'cid': cid,
# 'return': res,
# })
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,
@ -638,6 +677,7 @@ async def _invoke(
ctxc = ContextCancelled( ctxc = ContextCancelled(
msg, msg,
boxed_type=trio.Cancelled, boxed_type=trio.Cancelled,
# boxed_type_str='Cancelled',
canceller=canceller, canceller=canceller,
) )
# assign local error so that the `.outcome` # assign local error so that the `.outcome`
@ -738,12 +778,12 @@ async def try_ship_error_to_remote(
trio.BrokenResourceError, trio.BrokenResourceError,
BrokenPipeError, BrokenPipeError,
): ):
# err_msg: dict = msg['error']['tb_str']
log.critical( log.critical(
'IPC transport failure -> ' 'IPC transport failure -> '
f'failed to ship error to {remote_descr}!\n\n' f'failed to ship error to {remote_descr}!\n\n'
f'X=> {channel.uid}\n\n' f'X=> {channel.uid}\n\n'
# f'{err_msg}\n'
# TODO: use `.msg.preetty_struct` for this!
f'{msg}\n' f'{msg}\n'
) )
@ -785,8 +825,6 @@ async def process_messages(
''' '''
assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
# https://github.com/python-trio/trio/issues/467 # https://github.com/python-trio/trio/issues/467
@ -796,7 +834,7 @@ async def process_messages(
f'|_{chan}\n' f'|_{chan}\n'
) )
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: Msg|None = None msg: dict | None = None
try: try:
# NOTE: this internal scope allows for keeping this # NOTE: this internal scope allows for keeping this
# message loop running despite the current task having # message loop running despite the current task having
@ -805,49 +843,122 @@ async def process_messages(
# using ``scope = Nursery.start()`` # using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs: with CancelScope(shield=shield) as loop_cs:
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
log.transport( # type: ignore log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n' f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: avoid fmting depending on loglevel for perf? # TODO: conditionally avoid fmting depending
# -[ ] specifically `pformat()` sub-call..? # on log level (for perf)?
# -[ ] use `.msg.pretty_struct` here now instead! # => specifically `pformat()` sub-call..?
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
match msg: match msg:
# msg for an ongoing IPC ctx session, deliver msg to
# local task. # if msg is None:
# dedicated loop terminate sentinel
case None:
tasks: dict[
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = actor._rpc_tasks.copy()
log.cancel(
f'Peer IPC channel terminated via `None` setinel msg?\n'
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
for (channel, cid) in tasks:
if channel is chan:
await actor._cancel_task(
cid,
channel,
requesting_uid=channel.uid,
ipc_msg=msg,
)
break
# cid = msg.get('cid')
# if cid:
case ( case (
StartAck(cid=cid) StartAck(cid=cid)
| Started(cid=cid) | Started(cid=cid)
| Yield(cid=cid) | Yield(cid=cid)
| Stop(cid=cid) | Stop(cid=cid)
| Return(cid=cid) | Return(cid=cid)
| CancelAck(cid=cid) | Error(cid=cid)
| Error(cid=cid) # RPC-task ctx specific
): ):
# deliver response to local caller/waiter # deliver response to local caller/waiter
# via its per-remote-context memory channel. # via its per-remote-context memory channel.
await actor._deliver_ctx_payload( await actor._push_result(
chan, chan,
cid, cid,
msg, msg,
) )
# `Actor`(-internal) runtime cancel requests log.runtime(
case Start( 'Waiting on next IPC msg from\n'
ns='self', f'peer: {chan.uid}:\n'
func='cancel', f'|_{chan}\n'
cid=cid,
kwargs=kwargs,
):
kwargs |= {'req_chan': chan}
# XXX NOTE XXX don't start entire actor # f'last msg: {msg}\n'
# runtime cancellation if this actor is )
# currently in debug mode! continue
# process a 'cmd' request-msg upack
# TODO: impl with native `msgspec.Struct` support !!
# -[ ] implement with ``match:`` syntax?
# -[ ] discard un-authed msgs as per,
# <TODO put issue for typed msging structs>
case Start(
cid=cid,
ns=ns,
func=funcname,
kwargs=kwargs,
uid=actorid,
):
# try:
# (
# ns,
# funcname,
# kwargs,
# actorid,
# cid,
# ) = msg['cmd']
# # TODO: put in `case Error():` right?
# except KeyError:
# # This is the non-rpc error case, that is, an
# # error **not** raised inside a call to ``_invoke()``
# # (i.e. no cid was provided in the msg - see above).
# # Push this error to all local channel consumers
# # (normally portals) by marking the channel as errored
# assert chan.uid
# exc = unpack_error(msg, chan=chan)
# chan._exc = exc
# raise exc
log.runtime(
'Handling RPC `Start` request from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
# case Start(
# ns='self',
# funcname='cancel',
# ):
if ns == 'self':
if funcname == 'cancel':
func: Callable = actor.cancel
kwargs |= {
'req_chan': chan,
}
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete: if pdb_complete:
await pdb_complete.wait() await pdb_complete.wait()
@ -862,10 +973,9 @@ async def process_messages(
actor, actor,
cid, cid,
chan, chan,
actor.cancel, func,
kwargs, kwargs,
is_rpc=False, is_rpc=False,
return_msg=CancelAck,
) )
log.runtime( log.runtime(
@ -875,31 +985,41 @@ async def process_messages(
loop_cs.cancel() loop_cs.cancel()
break break
case Start( # case Start(
ns='self', # ns='self',
func='_cancel_task', # funcname='_cancel_task',
cid=cid, # ):
kwargs=kwargs, if funcname == '_cancel_task':
): func: Callable = actor._cancel_task
# we immediately start the runtime machinery
# shutdown
# with CancelScope(shield=True):
target_cid: str = kwargs['cid'] target_cid: str = kwargs['cid']
kwargs |= { kwargs |= {
'requesting_uid': chan.uid, # NOTE: ONLY the rpc-task-owning
'ipc_msg': msg,
# XXX NOTE! ONLY the rpc-task-owning
# parent IPC channel should be able to # parent IPC channel should be able to
# cancel it! # cancel it!
'parent_chan': chan, 'parent_chan': chan,
'requesting_uid': chan.uid,
'ipc_msg': msg,
} }
# TODO: remove? already have emit in meth.
# log.runtime(
# f'Rx RPC task cancel request\n'
# f'<= canceller: {chan.uid}\n'
# f' |_{chan}\n\n'
# f'=> {actor}\n'
# f' |_cid: {target_cid}\n'
# )
try: try:
await _invoke( await _invoke(
actor, actor,
cid, cid,
chan, chan,
actor._cancel_task, func,
kwargs, kwargs,
is_rpc=False, is_rpc=False,
return_msg=CancelAck,
) )
except BaseException: except BaseException:
log.exception( log.exception(
@ -909,44 +1029,29 @@ async def process_messages(
f'=> {actor}\n' f'=> {actor}\n'
f' |_cid: {target_cid}\n' f' |_cid: {target_cid}\n'
) )
continue
# the "MAIN" RPC endpoint to schedule-a-`trio.Task` # case Start(
# ------ - ------ # ns='self',
# -[x] discard un-authed msgs as per, # funcname='register_actor',
# <TODO put issue for typed msging structs> # ):
case Start( else:
cid=cid, # normally registry methods, eg.
ns=ns, # ``.register_actor()`` etc.
func=funcname,
kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid,
):
log.runtime(
'Handling RPC `Start` request from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
)
# runtime-internal endpoint: `Actor.<funcname>`
# only registry methods exist now yah,
# like ``.register_actor()`` etc. ?
if ns == 'self':
func: Callable = getattr(actor, funcname) func: Callable = getattr(actor, funcname)
# application RPC endpoint # case Start(
# ns=str(),
# funcname=funcname,
# ):
else: else:
# complain to client about restricted modules
try: try:
func: Callable = actor._get_rpc_func( func = actor._get_rpc_func(ns, funcname)
ns,
funcname,
)
except ( except (
ModuleNotExposed, ModuleNotExposed,
AttributeError, AttributeError,
) as err: ) as err:
# always complain to requester
# client about un-enabled modules
err_msg: dict[str, dict] = pack_error( err_msg: dict[str, dict] = pack_error(
err, err,
cid=cid, cid=cid,
@ -956,7 +1061,6 @@ async def process_messages(
# schedule a task for the requested RPC function # schedule a task for the requested RPC function
# in the actor's main "service nursery". # in the actor's main "service nursery".
#
# TODO: possibly a service-tn per IPC channel for # TODO: possibly a service-tn per IPC channel for
# supervision isolation? would avoid having to # supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks` # manage RPC tasks individually in `._rpc_tasks`
@ -965,7 +1069,7 @@ async def process_messages(
f'Spawning task for RPC request\n' f'Spawning task for RPC request\n'
f'<= caller: {chan.uid}\n' f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n' f' |_{chan}\n\n'
# ^-TODO-^ maddr style repr? # TODO: maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' # f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n' # f'cid="{cid[-16:]} .."\n\n'
@ -973,6 +1077,7 @@ async def process_messages(
f' |_cid: {cid}\n' f' |_cid: {cid}\n'
f' |>> {func}()\n' f' |>> {func}()\n'
) )
assert actor._service_n # wait why? do it at top?
try: try:
ctx: Context = await actor._service_n.start( ctx: Context = await actor._service_n.start(
partial( partial(
@ -1002,12 +1107,13 @@ async def process_messages(
log.warning( log.warning(
'Task for RPC failed?' 'Task for RPC failed?'
f'|_ {func}()\n\n' f'|_ {func}()\n\n'
f'{err}' f'{err}'
) )
continue continue
else: else:
# mark our global state with ongoing rpc tasks # mark that we have ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event() actor._ongoing_rpc_tasks = trio.Event()
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
@ -1018,26 +1124,23 @@ async def process_messages(
trio.Event(), trio.Event(),
) )
# XXX remote (runtime scoped) error or uknown case Error()|_:
# msg (type). # This is the non-rpc error case, that is, an
case Error() | _: # error **not** raised inside a call to ``_invoke()``
# NOTE: this is the non-rpc error case, # (i.e. no cid was provided in the msg - see above).
# that is, an error **not** raised inside # Push this error to all local channel consumers
# a call to ``_invoke()`` (i.e. no cid was # (normally portals) by marking the channel as errored
# provided in the msg - see above). Push
# this error to all local channel
# consumers (normally portals) by marking
# the channel as errored
log.exception( log.exception(
f'Unhandled IPC msg:\n\n' f'Unhandled IPC msg:\n\n'
f'{msg}\n' f'{msg}\n'
) )
# assert chan.uid assert chan.uid
chan._exc: Exception = unpack_error( exc = unpack_error(
msg, msg,
chan=chan, chan=chan,
) )
raise chan._exc chan._exc = exc
raise exc
log.runtime( log.runtime(
'Waiting on next IPC msg from\n' 'Waiting on next IPC msg from\n'
@ -1045,12 +1148,10 @@ async def process_messages(
f'|_{chan}\n' f'|_{chan}\n'
) )
# END-OF `async for`: # end of async for, channel disconnect vis
# IPC disconnected via `trio.EndOfChannel`, likely # ``trio.EndOfChannel``
# due to a (graceful) `Channel.aclose()`.
log.runtime( log.runtime(
f'channel for {chan.uid} disconnected, cancelling RPC tasks\n' f"{chan} for {chan.uid} disconnected, cancelling tasks"
f'|_{chan}\n'
) )
await actor.cancel_rpc_tasks( await actor.cancel_rpc_tasks(
req_uid=actor.uid, req_uid=actor.uid,
@ -1067,10 +1168,9 @@ async def process_messages(
# connection-reset) is ok since we don't have a teardown # connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of # handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean # the message loop and expect the teardown sequence to clean
# up.. # up.
# TODO: add a teardown handshake? and, # TODO: don't show this msg if it's an emphemeral
# -[ ] don't show this msg if it's an ephemeral discovery ep call? # discovery ep call?
# -[ ] figure out how this will break with other transports?
log.runtime( log.runtime(
f'channel closed abruptly with\n' f'channel closed abruptly with\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'

View File

@ -65,12 +65,7 @@ from trio import (
TaskStatus, TaskStatus,
) )
from tractor.msg import ( from .msg import NamespacePath
pretty_struct,
NamespacePath,
types as msgtypes,
Msg,
)
from ._ipc import Channel from ._ipc import Channel
from ._context import ( from ._context import (
mk_context, mk_context,
@ -78,10 +73,9 @@ from ._context import (
) )
from .log import get_logger from .log import get_logger
from ._exceptions import ( from ._exceptions import (
ContextCancelled,
ModuleNotExposed,
MsgTypeError,
unpack_error, unpack_error,
ModuleNotExposed,
ContextCancelled,
TransportClosed, TransportClosed,
) )
from .devx import ( from .devx import (
@ -97,6 +91,10 @@ from ._rpc import (
process_messages, process_messages,
try_ship_error_to_remote, try_ship_error_to_remote,
) )
from tractor.msg import (
types as msgtypes,
pretty_struct,
)
# from tractor.msg.types import ( # from tractor.msg.types import (
# Aid, # Aid,
# SpawnSpec, # SpawnSpec,
@ -166,15 +164,18 @@ class Actor:
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: dict[str, str] _parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope|None = None _parent_chan_cs: CancelScope|None = None
_spawn_spec: msgtypes.SpawnSpec|None = None _spawn_spec: SpawnSpec|None = None
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_server_down: trio.Event|None = None _server_down: trio.Event|None = None
# user toggled crash handling (including monkey-patched in
# `trio.open_nursery()` via `.trionics._supervisor` B)
_debug_mode: bool = False
# if started on ``asycio`` running ``trio`` in guest mode # if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False _infected_aio: bool = False
# TODO: nursery tracking like `trio` does?
# _ans: dict[ # _ans: dict[
# tuple[str, str], # tuple[str, str],
# list[ActorNursery], # list[ActorNursery],
@ -395,9 +396,8 @@ class Actor:
raise mne raise mne
# TODO: maybe change to mod-func and rename for implied
# multi-transport semantics?
async def _stream_handler( async def _stream_handler(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
@ -559,7 +559,7 @@ class Actor:
cid: str|None = msg.cid cid: str|None = msg.cid
if cid: if cid:
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._deliver_ctx_payload( await self._push_result(
chan, chan,
cid, cid,
msg, msg,
@ -716,13 +716,43 @@ class Actor:
# TODO: figure out why this breaks tests.. # TODO: figure out why this breaks tests..
db_cs.cancel() db_cs.cancel()
# XXX: is this necessary (GC should do it)?
# XXX WARNING XXX
# Be AWARE OF THE INDENT LEVEL HERE
# -> ONLY ENTER THIS BLOCK WHEN ._peers IS
# EMPTY!!!!
if (
not self._peers
and chan.connected()
):
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.runtime(
'Terminating channel with `None` setinel msg\n'
f'|_{chan}\n'
)
try:
# send msg loop terminate sentinel which
# triggers cancellation of all remotely
# started tasks.
await chan.send(None)
# XXX: do we want this? no right?
# causes "[104] connection reset by peer" on other end
# await chan.aclose()
except trio.BrokenResourceError:
log.runtime(f"Channel {chan.uid} was already closed")
# TODO: rename to `._deliver_payload()` since this handles # TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD # more then just `result` msgs now obvi XD
async def _deliver_ctx_payload( async def _push_result(
self, self,
chan: Channel, chan: Channel,
cid: str, cid: str,
msg: Msg|MsgTypeError, msg: dict[str, Any],
) -> None|bool: ) -> None|bool:
''' '''
@ -744,16 +774,12 @@ class Actor:
log.warning( log.warning(
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n' f'<= sender: {uid}\n'
# XXX don't need right since it's always in msg? f'=> cid: {cid}\n\n'
# f'=> cid: {cid}\n\n'
f'{pretty_struct.Struct.pformat(msg)}\n' f'{msg}\n'
) )
return return
# if isinstance(msg, MsgTypeError):
# return await ctx._deliver_bad_msg()
return await ctx._deliver_msg(msg) return await ctx._deliver_msg(msg)
def get_context( def get_context(
@ -1411,7 +1437,7 @@ class Actor:
) )
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> bool: def cancel_server(self) -> None:
''' '''
Cancel the internal IPC transport server nursery thereby Cancel the internal IPC transport server nursery thereby
preventing any new inbound IPC connections establishing. preventing any new inbound IPC connections establishing.
@ -1420,9 +1446,6 @@ class Actor:
if self._server_n: if self._server_n:
log.runtime("Shutting down channel server") log.runtime("Shutting down channel server")
self._server_n.cancel_scope.cancel() self._server_n.cancel_scope.cancel()
return True
return False
@property @property
def accept_addrs(self) -> list[tuple[str, int]]: def accept_addrs(self) -> list[tuple[str, int]]:

View File

@ -46,6 +46,7 @@ from .trionics import (
from tractor.msg import ( from tractor.msg import (
Stop, Stop,
Yield, Yield,
Error,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -183,7 +184,7 @@ class MsgStream(trio.abc.Channel):
# - via a received `{'stop': ...}` msg from remote side. # - via a received `{'stop': ...}` msg from remote side.
# |_ NOTE: previously this was triggered by calling # |_ NOTE: previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel inside # ``._rx_chan.aclose()`` on the send side of the channel inside
# `Actor._deliver_ctx_payload()`, but now the 'stop' message handling # `Actor._push_result()`, but now the 'stop' message handling
# has been put just above inside `_raise_from_no_key_in_msg()`. # has been put just above inside `_raise_from_no_key_in_msg()`.
except ( except (
trio.EndOfChannel, trio.EndOfChannel,
@ -390,11 +391,11 @@ class MsgStream(trio.abc.Channel):
if not self._eoc: if not self._eoc:
log.cancel( log.cancel(
'Stream closed by self before it received an EoC?\n' 'Stream closed before it received an EoC?\n'
'Setting eoc manually..\n..' 'Setting eoc manually..\n..'
) )
self._eoc: bool = trio.EndOfChannel( self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by self({self._ctx.side})\n' f'Context stream closed by {self._ctx.side}\n'
f'|_{self}\n' f'|_{self}\n'
) )
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?

View File

@ -454,10 +454,6 @@ _runtime_msgs: list[Msg] = [
# emission from `MsgStream.aclose()` # emission from `MsgStream.aclose()`
Stop, Stop,
# `Return` sub-type that we always accept from
# runtime-internal cancel endpoints
CancelAck,
# box remote errors, normally subtypes # box remote errors, normally subtypes
# of `RemoteActorError`. # of `RemoteActorError`.
Error, Error,