Compare commits
10 Commits
b1fd8b2ec3
...
cf48fdecfe
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | cf48fdecfe | |
Tyler Goodlet | b341146bd1 | |
Tyler Goodlet | 2f451ab9a3 | |
Tyler Goodlet | 8e83455a78 | |
Tyler Goodlet | 38111e8d53 | |
Tyler Goodlet | aea5abdd70 | |
Tyler Goodlet | aca6503fcd | |
Tyler Goodlet | b9a61ded0a | |
Tyler Goodlet | 4cfe4979ff | |
Tyler Goodlet | 97bfbdbc1c |
|
@ -374,7 +374,7 @@ def enc_type_union(
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def send_back_nsp(
|
async def send_back_values(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
expect_debug: bool,
|
expect_debug: bool,
|
||||||
pld_spec_type_strs: list[str],
|
pld_spec_type_strs: list[str],
|
||||||
|
@ -388,6 +388,8 @@ async def send_back_nsp(
|
||||||
and ensure we can round trip a func ref with our parent.
|
and ensure we can round trip a func ref with our parent.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
uid: tuple = tractor.current_actor().uid
|
||||||
|
|
||||||
# debug mode sanity check (prolly superfluous but, meh)
|
# debug mode sanity check (prolly superfluous but, meh)
|
||||||
assert expect_debug == _state.debug_mode()
|
assert expect_debug == _state.debug_mode()
|
||||||
|
|
||||||
|
@ -414,7 +416,7 @@ async def send_back_nsp(
|
||||||
)
|
)
|
||||||
|
|
||||||
print(
|
print(
|
||||||
'CHILD attempting `Started`-bytes DECODE..\n'
|
f'{uid}: attempting `Started`-bytes DECODE..\n'
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
msg: Started = nsp_codec.decode(started_msg_bytes)
|
msg: Started = nsp_codec.decode(started_msg_bytes)
|
||||||
|
@ -436,7 +438,7 @@ async def send_back_nsp(
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
print(
|
print(
|
||||||
'CHILD (correctly) unable to DECODE `Started`-bytes\n'
|
f'{uid}: (correctly) unable to DECODE `Started`-bytes\n'
|
||||||
f'{started_msg_bytes}\n'
|
f'{started_msg_bytes}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -445,7 +447,7 @@ async def send_back_nsp(
|
||||||
for send_value, expect_send in iter_send_val_items:
|
for send_value, expect_send in iter_send_val_items:
|
||||||
try:
|
try:
|
||||||
print(
|
print(
|
||||||
f'CHILD attempting to `.started({send_value})`\n'
|
f'{uid}: attempting to `.started({send_value})`\n'
|
||||||
f'=> expect_send: {expect_send}\n'
|
f'=> expect_send: {expect_send}\n'
|
||||||
f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n'
|
f'SINCE, ipc_pld_spec: {ipc_pld_spec}\n'
|
||||||
f'AND, codec: {codec}\n'
|
f'AND, codec: {codec}\n'
|
||||||
|
@ -460,7 +462,6 @@ async def send_back_nsp(
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
|
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
# pytest.fail(
|
|
||||||
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
||||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||||
f'value -> {send_value}: {type(send_value)}\n'
|
f'value -> {send_value}: {type(send_value)}\n'
|
||||||
|
@ -468,54 +469,77 @@ async def send_back_nsp(
|
||||||
|
|
||||||
break # move on to streaming block..
|
break # move on to streaming block..
|
||||||
|
|
||||||
except NotImplementedError:
|
|
||||||
print('FAILED ENCODE!')
|
|
||||||
|
|
||||||
except tractor.MsgTypeError:
|
except tractor.MsgTypeError:
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
if expect_send:
|
if expect_send:
|
||||||
pytest.fail(
|
raise RuntimeError(
|
||||||
f'EXPECTED to `.started()` value given spec:\n'
|
f'EXPECTED to `.started()` value given spec:\n'
|
||||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||||
f'value -> {send_value}: {type(send_value)}\n'
|
f'value -> {send_value}: {type(send_value)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
async with ctx.open_stream() as ipc:
|
async with ctx.open_stream() as ipc:
|
||||||
|
print(
|
||||||
|
f'{uid}: Entering streaming block to send remaining values..'
|
||||||
|
)
|
||||||
|
|
||||||
for send_value, expect_send in iter_send_val_items:
|
for send_value, expect_send in iter_send_val_items:
|
||||||
send_type: Type = type(send_value)
|
send_type: Type = type(send_value)
|
||||||
print(
|
print(
|
||||||
'CHILD report on send value\n'
|
'------ - ------\n'
|
||||||
|
f'{uid}: SENDING NEXT VALUE\n'
|
||||||
f'ipc_pld_spec: {ipc_pld_spec}\n'
|
f'ipc_pld_spec: {ipc_pld_spec}\n'
|
||||||
f'expect_send: {expect_send}\n'
|
f'expect_send: {expect_send}\n'
|
||||||
f'val: {send_value}\n'
|
f'val: {send_value}\n'
|
||||||
|
'------ - ------\n'
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
await ipc.send(send_value)
|
await ipc.send(send_value)
|
||||||
|
print(f'***\n{uid}-CHILD sent {send_value!r}\n***\n')
|
||||||
sent.append(send_value)
|
sent.append(send_value)
|
||||||
if not expect_send:
|
|
||||||
pytest.fail(
|
# NOTE: should only raise above on
|
||||||
f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
# `.started()` or a `Return`
|
||||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
# if not expect_send:
|
||||||
f'value -> {send_value}: {send_type}\n'
|
# raise RuntimeError(
|
||||||
)
|
# f'NOT-EXPECTED able to roundtrip value given spec:\n'
|
||||||
|
# f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||||
|
# f'value -> {send_value}: {send_type}\n'
|
||||||
|
# )
|
||||||
|
|
||||||
except ValidationError:
|
except ValidationError:
|
||||||
|
print(f'{uid} FAILED TO SEND {send_value}!')
|
||||||
|
|
||||||
|
# await tractor.pause()
|
||||||
if expect_send:
|
if expect_send:
|
||||||
pytest.fail(
|
raise RuntimeError(
|
||||||
f'EXPECTED to roundtrip value given spec:\n'
|
f'EXPECTED to roundtrip value given spec:\n'
|
||||||
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
f'ipc_pld_spec -> {ipc_pld_spec}\n'
|
||||||
f'value -> {send_value}: {send_type}\n'
|
f'value -> {send_value}: {send_type}\n'
|
||||||
)
|
)
|
||||||
continue
|
# continue
|
||||||
|
|
||||||
assert (
|
else:
|
||||||
len(sent)
|
print(
|
||||||
==
|
f'{uid}: finished sending all values\n'
|
||||||
len([val
|
'Should be exiting stream block!\n'
|
||||||
for val, expect in
|
|
||||||
expect_ipc_send.values()
|
|
||||||
if expect is True])
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
print(f'{uid}: exited streaming block!')
|
||||||
|
|
||||||
|
# TODO: this won't be true bc in streaming phase we DO NOT
|
||||||
|
# msgspec check outbound msgs!
|
||||||
|
# -[ ] once we implement the receiver side `InvalidMsg`
|
||||||
|
# then we can expect it here?
|
||||||
|
# assert (
|
||||||
|
# len(sent)
|
||||||
|
# ==
|
||||||
|
# len([val
|
||||||
|
# for val, expect in
|
||||||
|
# expect_ipc_send.values()
|
||||||
|
# if expect is True])
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
def ex_func(*args):
|
def ex_func(*args):
|
||||||
print(f'ex_func({args})')
|
print(f'ex_func({args})')
|
||||||
|
@ -635,7 +659,7 @@ def test_codec_hooks_mod(
|
||||||
async with (
|
async with (
|
||||||
|
|
||||||
p.open_context(
|
p.open_context(
|
||||||
send_back_nsp,
|
send_back_values,
|
||||||
expect_debug=debug_mode,
|
expect_debug=debug_mode,
|
||||||
pld_spec_type_strs=pld_spec_type_strs,
|
pld_spec_type_strs=pld_spec_type_strs,
|
||||||
add_hooks=add_codec_hooks,
|
add_hooks=add_codec_hooks,
|
||||||
|
@ -665,10 +689,13 @@ def test_codec_hooks_mod(
|
||||||
|
|
||||||
async for next_sent in ipc:
|
async for next_sent in ipc:
|
||||||
print(
|
print(
|
||||||
'Child sent next value\n'
|
'Parent: child sent next value\n'
|
||||||
f'{next_sent}: {type(next_sent)}\n'
|
f'{next_sent}: {type(next_sent)}\n'
|
||||||
)
|
)
|
||||||
|
if expect_to_send:
|
||||||
expect_to_send.remove(next_sent)
|
expect_to_send.remove(next_sent)
|
||||||
|
else:
|
||||||
|
print('PARENT should terminate stream loop + block!')
|
||||||
|
|
||||||
# all sent values should have arrived!
|
# all sent values should have arrived!
|
||||||
assert not expect_to_send
|
assert not expect_to_send
|
||||||
|
|
|
@ -796,10 +796,12 @@ async def test_callee_cancels_before_started(
|
||||||
|
|
||||||
# raises a special cancel signal
|
# raises a special cancel signal
|
||||||
except tractor.ContextCancelled as ce:
|
except tractor.ContextCancelled as ce:
|
||||||
|
_ce = ce # for debug on crash
|
||||||
ce.boxed_type == trio.Cancelled
|
ce.boxed_type == trio.Cancelled
|
||||||
|
|
||||||
# the traceback should be informative
|
# the traceback should be informative
|
||||||
assert 'itself' in ce.msgdata['tb_str']
|
assert 'itself' in ce.tb_str
|
||||||
|
assert ce.tb_str == ce.msgdata['tb_str']
|
||||||
|
|
||||||
# teardown the actor
|
# teardown the actor
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
@ -1157,7 +1159,8 @@ def test_maybe_allow_overruns_stream(
|
||||||
|
|
||||||
elif slow_side == 'parent':
|
elif slow_side == 'parent':
|
||||||
assert err.boxed_type == tractor.RemoteActorError
|
assert err.boxed_type == tractor.RemoteActorError
|
||||||
assert 'StreamOverrun' in err.msgdata['tb_str']
|
assert 'StreamOverrun' in err.tb_str
|
||||||
|
assert err.tb_str == err.msgdata['tb_str']
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# if this hits the logic blocks from above are not
|
# if this hits the logic blocks from above are not
|
||||||
|
|
|
@ -185,6 +185,10 @@ async def sleep_a_bit_then_cancel_peer(
|
||||||
await trio.sleep(cancel_after)
|
await trio.sleep(cancel_after)
|
||||||
await peer.cancel_actor()
|
await peer.cancel_actor()
|
||||||
|
|
||||||
|
# such that we're cancelled by our rent ctx-task
|
||||||
|
await trio.sleep(3)
|
||||||
|
print('CANCELLER RETURNING!')
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def stream_ints(
|
async def stream_ints(
|
||||||
|
@ -245,6 +249,12 @@ async def stream_from_peer(
|
||||||
assert peer_ctx._remote_error is ctxerr
|
assert peer_ctx._remote_error is ctxerr
|
||||||
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
||||||
|
|
||||||
|
# XXX YES, bc exact same msg instances
|
||||||
|
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
|
||||||
|
|
||||||
|
# XXX NO, bc new one always created for property accesss
|
||||||
|
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
|
||||||
|
|
||||||
# the peer ctx is the canceller even though it's canceller
|
# the peer ctx is the canceller even though it's canceller
|
||||||
# is the "canceller" XD
|
# is the "canceller" XD
|
||||||
assert peer_name in peer_ctx.canceller
|
assert peer_name in peer_ctx.canceller
|
||||||
|
|
|
@ -44,9 +44,10 @@ from ._state import (
|
||||||
is_root_process as is_root_process,
|
is_root_process as is_root_process,
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
RemoteActorError as RemoteActorError,
|
|
||||||
ModuleNotExposed as ModuleNotExposed,
|
|
||||||
ContextCancelled as ContextCancelled,
|
ContextCancelled as ContextCancelled,
|
||||||
|
ModuleNotExposed as ModuleNotExposed,
|
||||||
|
MsgTypeError as MsgTypeError,
|
||||||
|
RemoteActorError as RemoteActorError,
|
||||||
)
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
breakpoint as breakpoint,
|
breakpoint as breakpoint,
|
||||||
|
|
|
@ -1207,7 +1207,7 @@ class Context:
|
||||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||||
# wait for any immediate child in debug before popping the
|
# wait for any immediate child in debug before popping the
|
||||||
# context from the runtime msg loop otherwise inside
|
# context from the runtime msg loop otherwise inside
|
||||||
# ``Actor._push_result()`` the msg will be discarded and in
|
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
|
||||||
# the case where that msg is global debugger unlock (via
|
# the case where that msg is global debugger unlock (via
|
||||||
# a "stop" msg for a stream), this can result in a deadlock
|
# a "stop" msg for a stream), this can result in a deadlock
|
||||||
# where the root is waiting on the lock to clear but the
|
# where the root is waiting on the lock to clear but the
|
||||||
|
@ -1698,11 +1698,11 @@ class Context:
|
||||||
|
|
||||||
# raise any msg type error NO MATTER WHAT!
|
# raise any msg type error NO MATTER WHAT!
|
||||||
except msgspec.ValidationError as verr:
|
except msgspec.ValidationError as verr:
|
||||||
from tractor._ipc import _raise_msg_type_err
|
from tractor._ipc import _mk_msg_type_err
|
||||||
_raise_msg_type_err(
|
raise _mk_msg_type_err(
|
||||||
msg=msg_bytes,
|
msg=msg_bytes,
|
||||||
codec=codec,
|
codec=codec,
|
||||||
validation_err=verr,
|
src_validation_error=verr,
|
||||||
verb_header='Trying to send payload'
|
verb_header='Trying to send payload'
|
||||||
# > 'invalid `Started IPC msgs\n'
|
# > 'invalid `Started IPC msgs\n'
|
||||||
)
|
)
|
||||||
|
@ -2415,7 +2415,7 @@ async def open_context_from_portal(
|
||||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||||
# wait for any immediate child in debug before popping the
|
# wait for any immediate child in debug before popping the
|
||||||
# context from the runtime msg loop otherwise inside
|
# context from the runtime msg loop otherwise inside
|
||||||
# ``Actor._push_result()`` the msg will be discarded and in
|
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
|
||||||
# the case where that msg is global debugger unlock (via
|
# the case where that msg is global debugger unlock (via
|
||||||
# a "stop" msg for a stream), this can result in a deadlock
|
# a "stop" msg for a stream), this can result in a deadlock
|
||||||
# where the root is waiting on the lock to clear but the
|
# where the root is waiting on the lock to clear but the
|
||||||
|
|
|
@ -31,7 +31,10 @@ import textwrap
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from msgspec import structs
|
from msgspec import (
|
||||||
|
structs,
|
||||||
|
defstruct,
|
||||||
|
)
|
||||||
|
|
||||||
from tractor._state import current_actor
|
from tractor._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
@ -40,6 +43,8 @@ from tractor.msg import (
|
||||||
Msg,
|
Msg,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
|
pretty_struct,
|
||||||
|
types as msgtypes,
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -64,21 +69,38 @@ class InternalError(RuntimeError):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
_body_fields: list[str] = [
|
|
||||||
'boxed_type',
|
|
||||||
'src_type',
|
|
||||||
# TODO: format this better if we're going to include it.
|
|
||||||
# 'relay_path',
|
|
||||||
'src_uid',
|
|
||||||
|
|
||||||
# only in sub-types
|
# NOTE: more or less should be close to these:
|
||||||
'canceller',
|
# 'boxed_type',
|
||||||
'sender',
|
# 'src_type',
|
||||||
|
# 'src_uid',
|
||||||
|
# 'canceller',
|
||||||
|
# 'sender',
|
||||||
|
# TODO: format this better if we're going to include it.
|
||||||
|
# 'relay_path',
|
||||||
|
#
|
||||||
|
_ipcmsg_keys: list[str] = [
|
||||||
|
fi.name
|
||||||
|
for fi, k, v
|
||||||
|
in pretty_struct.iter_fields(Error)
|
||||||
|
|
||||||
]
|
]
|
||||||
|
|
||||||
_msgdata_keys: list[str] = [
|
_body_fields: list[str] = list(
|
||||||
|
set(_ipcmsg_keys)
|
||||||
|
|
||||||
|
# NOTE: don't show fields that either don't provide
|
||||||
|
# any extra useful info or that are already shown
|
||||||
|
# as part of `.__repr__()` output.
|
||||||
|
- {
|
||||||
|
'src_type_str',
|
||||||
'boxed_type_str',
|
'boxed_type_str',
|
||||||
] + _body_fields
|
'tb_str',
|
||||||
|
'relay_path',
|
||||||
|
'_msg_dict',
|
||||||
|
'cid',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_err_type(type_name: str) -> BaseException|None:
|
def get_err_type(type_name: str) -> BaseException|None:
|
||||||
|
@ -152,10 +174,40 @@ def pformat_boxed_tb(
|
||||||
+
|
+
|
||||||
body
|
body
|
||||||
)
|
)
|
||||||
# return body
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: rename to just `RemoteError`?
|
def pack_from_raise(
|
||||||
|
local_err: (
|
||||||
|
ContextCancelled
|
||||||
|
|StreamOverrun
|
||||||
|
|MsgTypeError
|
||||||
|
),
|
||||||
|
cid: str,
|
||||||
|
|
||||||
|
**rae_fields,
|
||||||
|
|
||||||
|
) -> Error:
|
||||||
|
'''
|
||||||
|
Raise the provided `RemoteActorError` subtype exception
|
||||||
|
instance locally to get a traceback and pack it into an IPC
|
||||||
|
`Error`-msg using `pack_error()` to extract the tb info.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
raise local_err
|
||||||
|
except type(local_err) as local_err:
|
||||||
|
err_msg: dict[str, dict] = pack_error(
|
||||||
|
local_err,
|
||||||
|
cid=cid,
|
||||||
|
**rae_fields,
|
||||||
|
)
|
||||||
|
return err_msg
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: better compat with IPC msg structs?
|
||||||
|
# -[ ] rename to just `RemoteError` like in `mp.manager`?
|
||||||
|
# -[ ] make a `Struct`-subtype by using the .__post_init__()`?
|
||||||
|
# https://jcristharif.com/msgspec/structs.html#post-init-processing
|
||||||
class RemoteActorError(Exception):
|
class RemoteActorError(Exception):
|
||||||
'''
|
'''
|
||||||
A box(ing) type which bundles a remote actor `BaseException` for
|
A box(ing) type which bundles a remote actor `BaseException` for
|
||||||
|
@ -170,12 +222,28 @@ class RemoteActorError(Exception):
|
||||||
'src_uid',
|
'src_uid',
|
||||||
# 'relay_path',
|
# 'relay_path',
|
||||||
]
|
]
|
||||||
|
extra_body_fields: list[str] = [
|
||||||
|
'cid',
|
||||||
|
'boxed_type',
|
||||||
|
]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
message: str,
|
message: str,
|
||||||
|
ipc_msg: Error|None = None,
|
||||||
boxed_type: Type[BaseException]|None = None,
|
boxed_type: Type[BaseException]|None = None,
|
||||||
**msgdata
|
|
||||||
|
# NOTE: only provided by subtypes (ctxc and overruns)
|
||||||
|
# wishing to both manually instantiate and add field
|
||||||
|
# values defined on `Error` without having to construct an
|
||||||
|
# `Error()` before the exception is processed by
|
||||||
|
# `pack_error()`.
|
||||||
|
#
|
||||||
|
# TODO: a better way to support this without the extra
|
||||||
|
# private `._extra_msgdata`?
|
||||||
|
# -[ ] ctxc constructed inside `._rpc._invoke()` L:638
|
||||||
|
# -[ ] overrun @ `._context.Context._deliver_msg()` L:1958
|
||||||
|
**extra_msgdata,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
|
@ -188,14 +256,24 @@ class RemoteActorError(Exception):
|
||||||
# - .remote_type
|
# - .remote_type
|
||||||
# also pertains to our long long oustanding issue XD
|
# also pertains to our long long oustanding issue XD
|
||||||
# https://github.com/goodboy/tractor/issues/5
|
# https://github.com/goodboy/tractor/issues/5
|
||||||
#
|
|
||||||
# TODO: always set ._boxed_type` as `None` by default
|
|
||||||
# and instead render if from `.boxed_type_str`?
|
|
||||||
self._boxed_type: BaseException = boxed_type
|
self._boxed_type: BaseException = boxed_type
|
||||||
self._src_type: BaseException|None = None
|
self._src_type: BaseException|None = None
|
||||||
|
self._ipc_msg: Error|None = ipc_msg
|
||||||
|
|
||||||
# TODO: make this a `.errmsg: Error` throughout?
|
if (
|
||||||
self.msgdata: dict[str, Any] = msgdata
|
extra_msgdata
|
||||||
|
and ipc_msg
|
||||||
|
):
|
||||||
|
# XXX mutate the orig msg directly from
|
||||||
|
# manually provided input params.
|
||||||
|
for k, v in extra_msgdata.items():
|
||||||
|
setattr(
|
||||||
|
self._ipc_msg,
|
||||||
|
k,
|
||||||
|
v,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._extra_msgdata = extra_msgdata
|
||||||
|
|
||||||
# TODO: mask out eventually or place in `pack_error()`
|
# TODO: mask out eventually or place in `pack_error()`
|
||||||
# pre-`return` lines?
|
# pre-`return` lines?
|
||||||
|
@ -214,14 +292,56 @@ class RemoteActorError(Exception):
|
||||||
# either by customizing `ContextCancelled.__init__()` or
|
# either by customizing `ContextCancelled.__init__()` or
|
||||||
# through a special factor func?
|
# through a special factor func?
|
||||||
elif boxed_type:
|
elif boxed_type:
|
||||||
if not self.msgdata.get('boxed_type_str'):
|
boxed_type_str: str = type(boxed_type).__name__
|
||||||
self.msgdata['boxed_type_str'] = str(
|
if (
|
||||||
type(boxed_type).__name__
|
ipc_msg
|
||||||
)
|
and not self._ipc_msg.boxed_type_str
|
||||||
|
):
|
||||||
|
self._ipc_msg.boxed_type_str = boxed_type_str
|
||||||
|
assert self.boxed_type_str == self._ipc_msg.boxed_type_str
|
||||||
|
|
||||||
|
else:
|
||||||
|
self._extra_msgdata['boxed_type_str'] = boxed_type_str
|
||||||
|
|
||||||
assert self.boxed_type_str == self.msgdata['boxed_type_str']
|
|
||||||
assert self.boxed_type is boxed_type
|
assert self.boxed_type is boxed_type
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ipc_msg(self) -> pretty_struct.Struct:
|
||||||
|
'''
|
||||||
|
Re-render the underlying `._ipc_msg: Msg` as
|
||||||
|
a `pretty_struct.Struct` for introspection such that the
|
||||||
|
returned value is a read-only copy of the original.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if self._ipc_msg is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
msg_type: Msg = type(self._ipc_msg)
|
||||||
|
fields: dict[str, Any] = {
|
||||||
|
k: v for _, k, v in
|
||||||
|
pretty_struct.iter_fields(self._ipc_msg)
|
||||||
|
}
|
||||||
|
return defstruct(
|
||||||
|
msg_type.__name__,
|
||||||
|
fields=fields.keys(),
|
||||||
|
bases=(msg_type, pretty_struct.Struct),
|
||||||
|
)(**fields)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def msgdata(self) -> dict[str, Any]:
|
||||||
|
'''
|
||||||
|
The (remote) error data provided by a merge of the
|
||||||
|
`._ipc_msg: Error` msg and any input `._extra_msgdata: dict`
|
||||||
|
(provided by subtypes via `.__init__()`).
|
||||||
|
|
||||||
|
'''
|
||||||
|
msgdata: dict = (
|
||||||
|
structs.asdict(self._ipc_msg)
|
||||||
|
if self._ipc_msg
|
||||||
|
else {}
|
||||||
|
)
|
||||||
|
return self._extra_msgdata | msgdata
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def src_type_str(self) -> str:
|
def src_type_str(self) -> str:
|
||||||
'''
|
'''
|
||||||
|
@ -231,7 +351,7 @@ class RemoteActorError(Exception):
|
||||||
at the first relay/hop's receiving actor.
|
at the first relay/hop's receiving actor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.msgdata['src_type_str']
|
return self._ipc_msg.src_type_str
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def src_type(self) -> str:
|
def src_type(self) -> str:
|
||||||
|
@ -241,7 +361,7 @@ class RemoteActorError(Exception):
|
||||||
'''
|
'''
|
||||||
if self._src_type is None:
|
if self._src_type is None:
|
||||||
self._src_type = get_err_type(
|
self._src_type = get_err_type(
|
||||||
self.msgdata['src_type_str']
|
self._ipc_msg.src_type_str
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._src_type
|
return self._src_type
|
||||||
|
@ -252,7 +372,7 @@ class RemoteActorError(Exception):
|
||||||
String-name of the (last hop's) boxed error type.
|
String-name of the (last hop's) boxed error type.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.msgdata['boxed_type_str']
|
return self._ipc_msg.boxed_type_str
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def boxed_type(self) -> str:
|
def boxed_type(self) -> str:
|
||||||
|
@ -262,7 +382,7 @@ class RemoteActorError(Exception):
|
||||||
'''
|
'''
|
||||||
if self._boxed_type is None:
|
if self._boxed_type is None:
|
||||||
self._boxed_type = get_err_type(
|
self._boxed_type = get_err_type(
|
||||||
self.msgdata['boxed_type_str']
|
self._ipc_msg.boxed_type_str
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._boxed_type
|
return self._boxed_type
|
||||||
|
@ -275,41 +395,45 @@ class RemoteActorError(Exception):
|
||||||
actor's hop.
|
actor's hop.
|
||||||
|
|
||||||
NOTE: a `list` field with the same name is expected to be
|
NOTE: a `list` field with the same name is expected to be
|
||||||
passed/updated in `.msgdata`.
|
passed/updated in `.ipc_msg`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.msgdata['relay_path']
|
return self._ipc_msg.relay_path
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def relay_uid(self) -> tuple[str, str]|None:
|
def relay_uid(self) -> tuple[str, str]|None:
|
||||||
return tuple(
|
return tuple(
|
||||||
self.msgdata['relay_path'][-1]
|
self._ipc_msg.relay_path[-1]
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def src_uid(self) -> tuple[str, str]|None:
|
def src_uid(self) -> tuple[str, str]|None:
|
||||||
if src_uid := (
|
if src_uid := (
|
||||||
self.msgdata.get('src_uid')
|
self._ipc_msg.src_uid
|
||||||
):
|
):
|
||||||
return tuple(src_uid)
|
return tuple(src_uid)
|
||||||
# TODO: use path lookup instead?
|
# TODO: use path lookup instead?
|
||||||
# return tuple(
|
# return tuple(
|
||||||
# self.msgdata['relay_path'][0]
|
# self._ipc_msg.relay_path[0]
|
||||||
# )
|
# )
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def tb_str(
|
def tb_str(
|
||||||
self,
|
self,
|
||||||
indent: str = ' ',
|
indent: str = '',
|
||||||
) -> str:
|
) -> str:
|
||||||
if remote_tb := self.msgdata.get('tb_str'):
|
remote_tb: str = ''
|
||||||
|
|
||||||
|
if self._ipc_msg:
|
||||||
|
remote_tb: str = self._ipc_msg.tb_str
|
||||||
|
else:
|
||||||
|
remote_tb = self.msgdata.get('tb_str')
|
||||||
|
|
||||||
return textwrap.indent(
|
return textwrap.indent(
|
||||||
remote_tb,
|
remote_tb or '',
|
||||||
prefix=indent,
|
prefix=indent,
|
||||||
)
|
)
|
||||||
|
|
||||||
return ''
|
|
||||||
|
|
||||||
def _mk_fields_str(
|
def _mk_fields_str(
|
||||||
self,
|
self,
|
||||||
fields: list[str],
|
fields: list[str],
|
||||||
|
@ -320,14 +444,17 @@ class RemoteActorError(Exception):
|
||||||
val: Any|None = (
|
val: Any|None = (
|
||||||
getattr(self, key, None)
|
getattr(self, key, None)
|
||||||
or
|
or
|
||||||
self.msgdata.get(key)
|
getattr(
|
||||||
|
self._ipc_msg,
|
||||||
|
key,
|
||||||
|
None,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
# TODO: for `.relay_path` on multiline?
|
# TODO: for `.relay_path` on multiline?
|
||||||
# if not isinstance(val, str):
|
# if not isinstance(val, str):
|
||||||
# val_str = pformat(val)
|
# val_str = pformat(val)
|
||||||
# else:
|
# else:
|
||||||
val_str: str = repr(val)
|
val_str: str = repr(val)
|
||||||
|
|
||||||
if val:
|
if val:
|
||||||
_repr += f'{key}={val_str}{end_char}'
|
_repr += f'{key}={val_str}{end_char}'
|
||||||
|
|
||||||
|
@ -358,7 +485,9 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
fields: str = self._mk_fields_str(
|
fields: str = self._mk_fields_str(
|
||||||
_body_fields,
|
_body_fields
|
||||||
|
+
|
||||||
|
self.extra_body_fields,
|
||||||
)
|
)
|
||||||
body: str = pformat_boxed_tb(
|
body: str = pformat_boxed_tb(
|
||||||
tb_str=self.tb_str,
|
tb_str=self.tb_str,
|
||||||
|
@ -415,15 +544,6 @@ class RemoteActorError(Exception):
|
||||||
# raise NotImplementedError
|
# raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
class InternalActorError(RemoteActorError):
|
|
||||||
'''
|
|
||||||
(Remote) internal `tractor` error indicating failure of some
|
|
||||||
primitive, machinery state or lowlevel task that should never
|
|
||||||
occur.
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
|
|
||||||
class ContextCancelled(RemoteActorError):
|
class ContextCancelled(RemoteActorError):
|
||||||
'''
|
'''
|
||||||
Inter-actor task context was cancelled by either a call to
|
Inter-actor task context was cancelled by either a call to
|
||||||
|
@ -433,6 +553,10 @@ class ContextCancelled(RemoteActorError):
|
||||||
reprol_fields: list[str] = [
|
reprol_fields: list[str] = [
|
||||||
'canceller',
|
'canceller',
|
||||||
]
|
]
|
||||||
|
extra_body_fields: list[str] = [
|
||||||
|
'cid',
|
||||||
|
'canceller',
|
||||||
|
]
|
||||||
@property
|
@property
|
||||||
def canceller(self) -> tuple[str, str]|None:
|
def canceller(self) -> tuple[str, str]|None:
|
||||||
'''
|
'''
|
||||||
|
@ -454,7 +578,7 @@ class ContextCancelled(RemoteActorError):
|
||||||
|_`._cancel_task()`
|
|_`._cancel_task()`
|
||||||
|
|
||||||
'''
|
'''
|
||||||
value = self.msgdata.get('canceller')
|
value: tuple[str, str]|None = self._ipc_msg.canceller
|
||||||
if value:
|
if value:
|
||||||
return tuple(value)
|
return tuple(value)
|
||||||
|
|
||||||
|
@ -468,6 +592,132 @@ class ContextCancelled(RemoteActorError):
|
||||||
# src_actor_uid = canceller
|
# src_actor_uid = canceller
|
||||||
|
|
||||||
|
|
||||||
|
class MsgTypeError(
|
||||||
|
RemoteActorError,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Equivalent of a runtime `TypeError` for IPC dialogs.
|
||||||
|
|
||||||
|
Raise when any IPC wire-message is decoded to have invalid
|
||||||
|
field values (due to type) or for other `MsgCodec` related
|
||||||
|
violations such as having no extension-type for a field with
|
||||||
|
a custom type but no `enc/dec_hook()` support.
|
||||||
|
|
||||||
|
Can be raised on the send or recv side of an IPC `Channel`
|
||||||
|
depending on the particular msg.
|
||||||
|
|
||||||
|
Msgs which cause this to be raised on the `.send()` side (aka
|
||||||
|
in the "ctl" dialog phase) include:
|
||||||
|
- `Start`
|
||||||
|
- `Started`
|
||||||
|
- `Return`
|
||||||
|
|
||||||
|
Those which cause it on on the `.recv()` side (aka the "nasty
|
||||||
|
streaming" dialog phase) are:
|
||||||
|
- `Yield`
|
||||||
|
- TODO: any embedded `.pld` type defined by user code?
|
||||||
|
|
||||||
|
Normally the source of an error is re-raised from some `.msg._codec`
|
||||||
|
decode which itself raises in a backend interchange
|
||||||
|
lib (eg. a `msgspec.ValidationError`).
|
||||||
|
|
||||||
|
'''
|
||||||
|
reprol_fields: list[str] = [
|
||||||
|
'ipc_msg',
|
||||||
|
]
|
||||||
|
extra_body_fields: list[str] = [
|
||||||
|
'cid',
|
||||||
|
'payload_msg',
|
||||||
|
]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def msg_dict(self) -> dict[str, Any]:
|
||||||
|
'''
|
||||||
|
If the underlying IPC `Msg` was received from a remote
|
||||||
|
actor but was unable to be decoded to a native
|
||||||
|
`Yield`|`Started`|`Return` struct, the interchange backend
|
||||||
|
native format decoder can be used to stash a `dict`
|
||||||
|
version for introspection by the invalidating RPC task.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self.msgdata.get('_msg_dict')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def payload_msg(self) -> Msg|None:
|
||||||
|
'''
|
||||||
|
Attempt to construct what would have been the original
|
||||||
|
`Msg`-with-payload subtype (i.e. an instance from the set
|
||||||
|
of msgs in `.msg.types._payload_msgs`) which failed
|
||||||
|
validation.
|
||||||
|
|
||||||
|
'''
|
||||||
|
msg_dict: dict = self.msg_dict.copy()
|
||||||
|
name: str = msg_dict.pop('msg_type')
|
||||||
|
msg_type: Msg = getattr(
|
||||||
|
msgtypes,
|
||||||
|
name,
|
||||||
|
Msg,
|
||||||
|
)
|
||||||
|
return msg_type(**msg_dict)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cid(self) -> str:
|
||||||
|
# pre-packed using `.from_decode()` constructor
|
||||||
|
return self.msgdata.get('cid')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_decode(
|
||||||
|
cls,
|
||||||
|
message: str,
|
||||||
|
msgdict: dict,
|
||||||
|
|
||||||
|
) -> MsgTypeError:
|
||||||
|
return cls(
|
||||||
|
message=message,
|
||||||
|
|
||||||
|
# NOTE: original "vanilla decode" of the msg-bytes
|
||||||
|
# is placed inside a value readable from
|
||||||
|
# `.msgdata['_msg_dict']`
|
||||||
|
_msg_dict=msgdict,
|
||||||
|
|
||||||
|
# expand and pack all RAE compat fields
|
||||||
|
# into the `._extra_msgdata` aux `dict`.
|
||||||
|
**{
|
||||||
|
k: v
|
||||||
|
for k, v in msgdict.items()
|
||||||
|
if k in _ipcmsg_keys
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class StreamOverrun(
|
||||||
|
RemoteActorError,
|
||||||
|
trio.TooSlowError,
|
||||||
|
):
|
||||||
|
reprol_fields: list[str] = [
|
||||||
|
'sender',
|
||||||
|
]
|
||||||
|
'''
|
||||||
|
This stream was overrun by its sender and can be optionally
|
||||||
|
handled by app code using `MsgStream.send()/.receive()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
@property
|
||||||
|
def sender(self) -> tuple[str, str] | None:
|
||||||
|
value = self._ipc_msg.sender
|
||||||
|
if value:
|
||||||
|
return tuple(value)
|
||||||
|
|
||||||
|
|
||||||
|
# class InternalActorError(RemoteActorError):
|
||||||
|
# '''
|
||||||
|
# Boxed (Remote) internal `tractor` error indicating failure of some
|
||||||
|
# primitive, machinery state or lowlevel task that should never
|
||||||
|
# occur.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
|
||||||
|
|
||||||
class TransportClosed(trio.ClosedResourceError):
|
class TransportClosed(trio.ClosedResourceError):
|
||||||
"Underlying channel transport was closed prior to use"
|
"Underlying channel transport was closed prior to use"
|
||||||
|
|
||||||
|
@ -484,23 +734,6 @@ class NoRuntime(RuntimeError):
|
||||||
"The root actor has not been initialized yet"
|
"The root actor has not been initialized yet"
|
||||||
|
|
||||||
|
|
||||||
class StreamOverrun(
|
|
||||||
RemoteActorError,
|
|
||||||
trio.TooSlowError,
|
|
||||||
):
|
|
||||||
reprol_fields: list[str] = [
|
|
||||||
'sender',
|
|
||||||
]
|
|
||||||
'''
|
|
||||||
This stream was overrun by sender
|
|
||||||
|
|
||||||
'''
|
|
||||||
@property
|
|
||||||
def sender(self) -> tuple[str, str] | None:
|
|
||||||
value = self.msgdata.get('sender')
|
|
||||||
if value:
|
|
||||||
return tuple(value)
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncioCancelled(Exception):
|
class AsyncioCancelled(Exception):
|
||||||
'''
|
'''
|
||||||
|
@ -518,23 +751,12 @@ class MessagingError(Exception):
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
class MsgTypeError(MessagingError):
|
|
||||||
'''
|
|
||||||
Equivalent of a `TypeError` for an IPC wire-message
|
|
||||||
due to an invalid field value (type).
|
|
||||||
|
|
||||||
Normally this is re-raised from some `.msg._codec`
|
|
||||||
decode error raised by a backend interchange lib
|
|
||||||
like `msgspec` or `pycapnproto`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
|
|
||||||
def pack_error(
|
def pack_error(
|
||||||
exc: BaseException|RemoteActorError,
|
exc: BaseException|RemoteActorError,
|
||||||
|
|
||||||
tb: str|None = None,
|
tb: str|None = None,
|
||||||
cid: str|None = None,
|
cid: str|None = None,
|
||||||
|
src_uid: tuple[str, str]|None = None,
|
||||||
|
|
||||||
) -> Error:
|
) -> Error:
|
||||||
'''
|
'''
|
||||||
|
@ -560,7 +782,8 @@ def pack_error(
|
||||||
):
|
):
|
||||||
error_msg.update(exc.msgdata)
|
error_msg.update(exc.msgdata)
|
||||||
|
|
||||||
# an onion/inception we need to pack
|
# an onion/inception we need to pack as a nested and relayed
|
||||||
|
# remotely boxed error.
|
||||||
if (
|
if (
|
||||||
type(exc) is RemoteActorError
|
type(exc) is RemoteActorError
|
||||||
and (boxed := exc.boxed_type)
|
and (boxed := exc.boxed_type)
|
||||||
|
@ -584,7 +807,7 @@ def pack_error(
|
||||||
error_msg['boxed_type_str'] = 'RemoteActorError'
|
error_msg['boxed_type_str'] = 'RemoteActorError'
|
||||||
|
|
||||||
else:
|
else:
|
||||||
error_msg['src_uid'] = our_uid
|
error_msg['src_uid'] = src_uid or our_uid
|
||||||
error_msg['src_type_str'] = type(exc).__name__
|
error_msg['src_type_str'] = type(exc).__name__
|
||||||
error_msg['boxed_type_str'] = type(exc).__name__
|
error_msg['boxed_type_str'] = type(exc).__name__
|
||||||
|
|
||||||
|
@ -596,7 +819,7 @@ def pack_error(
|
||||||
|
|
||||||
# XXX NOTE: always ensure the traceback-str is from the
|
# XXX NOTE: always ensure the traceback-str is from the
|
||||||
# locally raised error (**not** the prior relay's boxed
|
# locally raised error (**not** the prior relay's boxed
|
||||||
# content's `.msgdata`).
|
# content's in `._ipc_msg.tb_str`).
|
||||||
error_msg['tb_str'] = tb_str
|
error_msg['tb_str'] = tb_str
|
||||||
|
|
||||||
if cid is not None:
|
if cid is not None:
|
||||||
|
@ -606,7 +829,7 @@ def pack_error(
|
||||||
|
|
||||||
|
|
||||||
def unpack_error(
|
def unpack_error(
|
||||||
msg: dict[str, Any]|Error,
|
msg: Error,
|
||||||
|
|
||||||
chan: Channel|None = None,
|
chan: Channel|None = None,
|
||||||
box_type: RemoteActorError = RemoteActorError,
|
box_type: RemoteActorError = RemoteActorError,
|
||||||
|
@ -624,16 +847,10 @@ def unpack_error(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
error_dict: dict[str, dict]|None
|
|
||||||
if not isinstance(msg, Error):
|
if not isinstance(msg, Error):
|
||||||
# if (
|
|
||||||
# error_dict := msg.get('error')
|
|
||||||
# ) is None:
|
|
||||||
# no error field, nothing to unpack.
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# retrieve the remote error's msg encoded details
|
# retrieve the remote error's encoded details from fields
|
||||||
# tb_str: str = error_dict.get('tb_str', '')
|
|
||||||
tb_str: str = msg.tb_str
|
tb_str: str = msg.tb_str
|
||||||
message: str = (
|
message: str = (
|
||||||
f'{chan.uid}\n'
|
f'{chan.uid}\n'
|
||||||
|
@ -651,6 +868,10 @@ def unpack_error(
|
||||||
box_type = ContextCancelled
|
box_type = ContextCancelled
|
||||||
assert boxed_type is box_type
|
assert boxed_type is box_type
|
||||||
|
|
||||||
|
elif boxed_type_str == 'MsgTypeError':
|
||||||
|
box_type = MsgTypeError
|
||||||
|
assert boxed_type is box_type
|
||||||
|
|
||||||
# TODO: already included by `_this_mod` in else loop right?
|
# TODO: already included by `_this_mod` in else loop right?
|
||||||
#
|
#
|
||||||
# we have an inception/onion-error so ensure
|
# we have an inception/onion-error so ensure
|
||||||
|
@ -661,12 +882,9 @@ def unpack_error(
|
||||||
# assert len(error_dict['relay_path']) >= 1
|
# assert len(error_dict['relay_path']) >= 1
|
||||||
assert len(msg.relay_path) >= 1
|
assert len(msg.relay_path) >= 1
|
||||||
|
|
||||||
# TODO: mk RAE just take the `Error` instance directly?
|
|
||||||
error_dict: dict = structs.asdict(msg)
|
|
||||||
|
|
||||||
exc = box_type(
|
exc = box_type(
|
||||||
message,
|
message,
|
||||||
**error_dict,
|
ipc_msg=msg,
|
||||||
)
|
)
|
||||||
|
|
||||||
return exc
|
return exc
|
||||||
|
|
165
tractor/_ipc.py
165
tractor/_ipc.py
|
@ -54,7 +54,8 @@ from tractor.msg import (
|
||||||
_ctxvar_MsgCodec,
|
_ctxvar_MsgCodec,
|
||||||
_codec,
|
_codec,
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
types,
|
types as msgtypes,
|
||||||
|
pretty_struct,
|
||||||
)
|
)
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -72,6 +73,7 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: this should be our `Union[*msgtypes.__spec__]` now right?
|
||||||
MsgType = TypeVar("MsgType")
|
MsgType = TypeVar("MsgType")
|
||||||
|
|
||||||
# TODO: consider using a generic def and indexing with our eventual
|
# TODO: consider using a generic def and indexing with our eventual
|
||||||
|
@ -116,6 +118,74 @@ class MsgTransport(Protocol[MsgType]):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def _raise_msg_type_err(
|
||||||
|
msg: Any|bytes,
|
||||||
|
codec: MsgCodec,
|
||||||
|
validation_err: msgspec.ValidationError|None = None,
|
||||||
|
verb_header: str = '',
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
# if side == 'send':
|
||||||
|
if validation_err is None: # send-side
|
||||||
|
|
||||||
|
import traceback
|
||||||
|
from tractor._exceptions import pformat_boxed_tb
|
||||||
|
|
||||||
|
fmt_spec: str = '\n'.join(
|
||||||
|
map(str, codec.msg_spec.__args__)
|
||||||
|
)
|
||||||
|
fmt_stack: str = (
|
||||||
|
'\n'.join(traceback.format_stack(limit=3))
|
||||||
|
)
|
||||||
|
tb_fmt: str = pformat_boxed_tb(
|
||||||
|
tb_str=fmt_stack,
|
||||||
|
# fields_str=header,
|
||||||
|
field_prefix=' ',
|
||||||
|
indent='',
|
||||||
|
)
|
||||||
|
raise MsgTypeError(
|
||||||
|
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||||
|
f'{tb_fmt}\n'
|
||||||
|
f'Valid IPC msgs are:\n\n'
|
||||||
|
# f' ------ - ------\n'
|
||||||
|
f'{fmt_spec}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# decode the msg-bytes using the std msgpack
|
||||||
|
# interchange-prot (i.e. without any
|
||||||
|
# `msgspec.Struct` handling) so that we can
|
||||||
|
# determine what `.msg.types.Msg` is the culprit
|
||||||
|
# by reporting the received value.
|
||||||
|
msg_dict: dict = msgspec.msgpack.decode(msg)
|
||||||
|
msg_type_name: str = msg_dict['msg_type']
|
||||||
|
msg_type = getattr(msgtypes, msg_type_name)
|
||||||
|
errmsg: str = (
|
||||||
|
f'invalid `{msg_type_name}` IPC msg\n\n'
|
||||||
|
)
|
||||||
|
if verb_header:
|
||||||
|
errmsg = f'{verb_header} ' + errmsg
|
||||||
|
|
||||||
|
# XXX see if we can determine the exact invalid field
|
||||||
|
# such that we can comprehensively report the
|
||||||
|
# specific field's type problem
|
||||||
|
msgspec_msg: str = validation_err.args[0].rstrip('`')
|
||||||
|
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
||||||
|
obj = object()
|
||||||
|
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
||||||
|
field_type: Union[Type] = msg_type.__signature__.parameters[
|
||||||
|
maybe_field
|
||||||
|
].annotation
|
||||||
|
errmsg += (
|
||||||
|
f'{msg.rstrip("`")}\n\n'
|
||||||
|
f'{msg_type}\n'
|
||||||
|
f' |_.{maybe_field}: {field_type} = {field_val!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
raise MsgTypeError(errmsg) from validation_err
|
||||||
|
|
||||||
|
|
||||||
# TODO: not sure why we have to inherit here, but it seems to be an
|
# TODO: not sure why we have to inherit here, but it seems to be an
|
||||||
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
|
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
|
||||||
# probably should make a `mypy` issue?
|
# probably should make a `mypy` issue?
|
||||||
|
@ -175,9 +245,10 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
or
|
or
|
||||||
_codec._ctxvar_MsgCodec.get()
|
_codec._ctxvar_MsgCodec.get()
|
||||||
)
|
)
|
||||||
log.critical(
|
# TODO: mask out before release?
|
||||||
'!?!: USING STD `tractor` CODEC !?!?\n'
|
log.runtime(
|
||||||
f'{self._codec}\n'
|
f'New {self} created with codec\n'
|
||||||
|
f'codec: {self._codec}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
||||||
|
@ -221,16 +292,18 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# NOTE: lookup the `trio.Task.context`'s var for
|
# NOTE: lookup the `trio.Task.context`'s var for
|
||||||
# the current `MsgCodec`.
|
# the current `MsgCodec`.
|
||||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||||
|
|
||||||
|
# TODO: mask out before release?
|
||||||
if self._codec.pld_spec != codec.pld_spec:
|
if self._codec.pld_spec != codec.pld_spec:
|
||||||
# assert (
|
# assert (
|
||||||
# task := trio.lowlevel.current_task()
|
# task := trio.lowlevel.current_task()
|
||||||
# ) is not self._task
|
# ) is not self._task
|
||||||
# self._task = task
|
# self._task = task
|
||||||
self._codec = codec
|
self._codec = codec
|
||||||
log.critical(
|
log.runtime(
|
||||||
'.recv() USING NEW CODEC !?!?\n'
|
'Using new codec in {self}.recv()\n'
|
||||||
f'{self._codec}\n\n'
|
f'codec: {self._codec}\n\n'
|
||||||
f'msg_bytes -> {msg_bytes}\n'
|
f'msg_bytes: {msg_bytes}\n'
|
||||||
)
|
)
|
||||||
yield codec.decode(msg_bytes)
|
yield codec.decode(msg_bytes)
|
||||||
|
|
||||||
|
@ -252,36 +325,13 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# and always raise such that spec violations
|
# and always raise such that spec violations
|
||||||
# are never allowed to be caught silently!
|
# are never allowed to be caught silently!
|
||||||
except msgspec.ValidationError as verr:
|
except msgspec.ValidationError as verr:
|
||||||
|
# re-raise as type error
|
||||||
# decode the msg-bytes using the std msgpack
|
_raise_msg_type_err(
|
||||||
# interchange-prot (i.e. without any
|
msg=msg_bytes,
|
||||||
# `msgspec.Struct` handling) so that we can
|
codec=codec,
|
||||||
# determine what `.msg.types.Msg` is the culprit
|
validation_err=verr,
|
||||||
# by reporting the received value.
|
|
||||||
msg_dict: dict = msgspec.msgpack.decode(msg_bytes)
|
|
||||||
msg_type_name: str = msg_dict['msg_type']
|
|
||||||
msg_type = getattr(types, msg_type_name)
|
|
||||||
errmsg: str = (
|
|
||||||
f'Received invalid IPC `{msg_type_name}` msg\n\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX see if we can determine the exact invalid field
|
|
||||||
# such that we can comprehensively report the
|
|
||||||
# specific field's type problem
|
|
||||||
msgspec_msg: str = verr.args[0].rstrip('`')
|
|
||||||
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
|
||||||
if field_val := msg_dict.get(maybe_field):
|
|
||||||
field_type: Union[Type] = msg_type.__signature__.parameters[
|
|
||||||
maybe_field
|
|
||||||
].annotation
|
|
||||||
errmsg += (
|
|
||||||
f'{msg.rstrip("`")}\n\n'
|
|
||||||
f'{msg_type}\n'
|
|
||||||
f' |_.{maybe_field}: {field_type} = {field_val}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
raise MsgTypeError(errmsg) from verr
|
|
||||||
|
|
||||||
except (
|
except (
|
||||||
msgspec.DecodeError,
|
msgspec.DecodeError,
|
||||||
UnicodeDecodeError,
|
UnicodeDecodeError,
|
||||||
|
@ -307,12 +357,16 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
|
|
||||||
async def send(
|
async def send(
|
||||||
self,
|
self,
|
||||||
msg: Any,
|
msg: msgtypes.Msg,
|
||||||
|
|
||||||
|
strict_types: bool = True,
|
||||||
# hide_tb: bool = False,
|
# hide_tb: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Send a msgpack coded blob-as-msg over TCP.
|
Send a msgpack encoded py-object-blob-as-msg over TCP.
|
||||||
|
|
||||||
|
If `strict_types == True` then a `MsgTypeError` will be raised on any
|
||||||
|
invalid msg type
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# __tracebackhide__: bool = hide_tb
|
# __tracebackhide__: bool = hide_tb
|
||||||
|
@ -321,25 +375,40 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# NOTE: lookup the `trio.Task.context`'s var for
|
# NOTE: lookup the `trio.Task.context`'s var for
|
||||||
# the current `MsgCodec`.
|
# the current `MsgCodec`.
|
||||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||||
# if self._codec != codec:
|
|
||||||
|
# TODO: mask out before release?
|
||||||
if self._codec.pld_spec != codec.pld_spec:
|
if self._codec.pld_spec != codec.pld_spec:
|
||||||
self._codec = codec
|
self._codec = codec
|
||||||
log.critical(
|
log.runtime(
|
||||||
'.send() using NEW CODEC !?!?\n'
|
'Using new codec in {self}.send()\n'
|
||||||
f'{self._codec}\n\n'
|
f'codec: {self._codec}\n\n'
|
||||||
f'OBJ -> {msg}\n'
|
f'msg: {msg}\n'
|
||||||
)
|
)
|
||||||
if type(msg) not in types.__spec__:
|
|
||||||
|
if type(msg) not in msgtypes.__msg_types__:
|
||||||
|
if strict_types:
|
||||||
|
_raise_msg_type_err(
|
||||||
|
msg,
|
||||||
|
codec=codec,
|
||||||
|
)
|
||||||
|
else:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Sending non-`Msg`-spec msg?\n\n'
|
'Sending non-`Msg`-spec msg?\n\n'
|
||||||
f'{msg}\n'
|
f'{msg}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
bytes_data: bytes = codec.encode(msg)
|
bytes_data: bytes = codec.encode(msg)
|
||||||
|
except TypeError as typerr:
|
||||||
|
raise MsgTypeError(
|
||||||
|
'A msg field violates the current spec\n'
|
||||||
|
f'{codec.pld_spec}\n\n'
|
||||||
|
f'{pretty_struct.Struct.pformat(msg)}'
|
||||||
|
) from typerr
|
||||||
|
|
||||||
# supposedly the fastest says,
|
# supposedly the fastest says,
|
||||||
# https://stackoverflow.com/a/54027962
|
# https://stackoverflow.com/a/54027962
|
||||||
size: bytes = struct.pack("<I", len(bytes_data))
|
size: bytes = struct.pack("<I", len(bytes_data))
|
||||||
|
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -567,7 +636,6 @@ class Channel:
|
||||||
f'{pformat(payload)}\n'
|
f'{pformat(payload)}\n'
|
||||||
) # type: ignore
|
) # type: ignore
|
||||||
assert self._transport
|
assert self._transport
|
||||||
|
|
||||||
await self._transport.send(
|
await self._transport.send(
|
||||||
payload,
|
payload,
|
||||||
# hide_tb=hide_tb,
|
# hide_tb=hide_tb,
|
||||||
|
@ -577,6 +645,11 @@ class Channel:
|
||||||
assert self._transport
|
assert self._transport
|
||||||
return await self._transport.recv()
|
return await self._transport.recv()
|
||||||
|
|
||||||
|
# TODO: auto-reconnect features like 0mq/nanomsg?
|
||||||
|
# -[ ] implement it manually with nods to SC prot
|
||||||
|
# possibly on multiple transport backends?
|
||||||
|
# -> seems like that might be re-inventing scalability
|
||||||
|
# prots tho no?
|
||||||
# try:
|
# try:
|
||||||
# return await self._transport.recv()
|
# return await self._transport.recv()
|
||||||
# except trio.BrokenResourceError:
|
# except trio.BrokenResourceError:
|
||||||
|
|
|
@ -502,7 +502,7 @@ async def open_portal(
|
||||||
'''
|
'''
|
||||||
actor = current_actor()
|
actor = current_actor()
|
||||||
assert actor
|
assert actor
|
||||||
was_connected = False
|
was_connected: bool = False
|
||||||
|
|
||||||
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
||||||
|
|
||||||
|
@ -533,9 +533,7 @@ async def open_portal(
|
||||||
await portal.aclose()
|
await portal.aclose()
|
||||||
|
|
||||||
if was_connected:
|
if was_connected:
|
||||||
# gracefully signal remote channel-msg loop
|
await channel.aclose()
|
||||||
await channel.send(None)
|
|
||||||
# await channel.aclose()
|
|
||||||
|
|
||||||
# cancel background msg loop task
|
# cancel background msg loop task
|
||||||
if msg_loop_cs:
|
if msg_loop_cs:
|
||||||
|
|
296
tractor/_rpc.py
296
tractor/_rpc.py
|
@ -55,20 +55,21 @@ from ._exceptions import (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
pause,
|
|
||||||
maybe_wait_for_debugger,
|
maybe_wait_for_debugger,
|
||||||
_debug,
|
_debug,
|
||||||
)
|
)
|
||||||
from . import _state
|
from . import _state
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
|
CancelAck,
|
||||||
|
Error,
|
||||||
|
Msg,
|
||||||
|
Return,
|
||||||
Start,
|
Start,
|
||||||
StartAck,
|
StartAck,
|
||||||
Started,
|
Started,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
Return,
|
|
||||||
Error,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -90,6 +91,7 @@ async def _invoke_non_context(
|
||||||
|
|
||||||
treat_as_gen: bool,
|
treat_as_gen: bool,
|
||||||
is_rpc: bool,
|
is_rpc: bool,
|
||||||
|
return_msg: Return|CancelAck = Return,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
|
@ -98,7 +100,6 @@ async def _invoke_non_context(
|
||||||
|
|
||||||
# TODO: can we unify this with the `context=True` impl below?
|
# TODO: can we unify this with the `context=True` impl below?
|
||||||
if inspect.isasyncgen(coro):
|
if inspect.isasyncgen(coro):
|
||||||
# await chan.send({
|
|
||||||
await chan.send(
|
await chan.send(
|
||||||
StartAck(
|
StartAck(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -124,11 +125,6 @@ async def _invoke_non_context(
|
||||||
# to_send = await chan.recv_nowait()
|
# to_send = await chan.recv_nowait()
|
||||||
# if to_send is not None:
|
# if to_send is not None:
|
||||||
# to_yield = await coro.asend(to_send)
|
# to_yield = await coro.asend(to_send)
|
||||||
# await chan.send({
|
|
||||||
# # Yield()
|
|
||||||
# 'cid': cid,
|
|
||||||
# 'yield': item,
|
|
||||||
# })
|
|
||||||
await chan.send(
|
await chan.send(
|
||||||
Yield(
|
Yield(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -143,11 +139,6 @@ async def _invoke_non_context(
|
||||||
await chan.send(
|
await chan.send(
|
||||||
Stop(cid=cid)
|
Stop(cid=cid)
|
||||||
)
|
)
|
||||||
# await chan.send({
|
|
||||||
# # Stop(
|
|
||||||
# 'cid': cid,
|
|
||||||
# 'stop': True,
|
|
||||||
# })
|
|
||||||
|
|
||||||
# one way @stream func that gets treated like an async gen
|
# one way @stream func that gets treated like an async gen
|
||||||
# TODO: can we unify this with the `context=True` impl below?
|
# TODO: can we unify this with the `context=True` impl below?
|
||||||
|
@ -158,11 +149,6 @@ async def _invoke_non_context(
|
||||||
functype='asyncgen',
|
functype='asyncgen',
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# await chan.send({
|
|
||||||
# # StartAck()
|
|
||||||
# 'cid': cid,
|
|
||||||
# 'functype': 'asyncgen',
|
|
||||||
# })
|
|
||||||
# XXX: the async-func may spawn further tasks which push
|
# XXX: the async-func may spawn further tasks which push
|
||||||
# back values like an async-generator would but must
|
# back values like an async-generator would but must
|
||||||
# manualy construct the response dict-packet-responses as
|
# manualy construct the response dict-packet-responses as
|
||||||
|
@ -178,11 +164,6 @@ async def _invoke_non_context(
|
||||||
await chan.send(
|
await chan.send(
|
||||||
Stop(cid=cid)
|
Stop(cid=cid)
|
||||||
)
|
)
|
||||||
# await chan.send({
|
|
||||||
# # Stop(
|
|
||||||
# 'cid': cid,
|
|
||||||
# 'stop': True,
|
|
||||||
# })
|
|
||||||
else:
|
else:
|
||||||
# regular async function/method
|
# regular async function/method
|
||||||
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
||||||
|
@ -200,11 +181,6 @@ async def _invoke_non_context(
|
||||||
functype='asyncfunc',
|
functype='asyncfunc',
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# await chan.send({
|
|
||||||
# # StartAck()
|
|
||||||
# 'cid': cid,
|
|
||||||
# 'functype': 'asyncfunc',
|
|
||||||
# })
|
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
@ -238,13 +214,8 @@ async def _invoke_non_context(
|
||||||
and chan.connected()
|
and chan.connected()
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
# await chan.send({
|
|
||||||
# # Return()
|
|
||||||
# 'cid': cid,
|
|
||||||
# 'return': result,
|
|
||||||
# })
|
|
||||||
await chan.send(
|
await chan.send(
|
||||||
Return(
|
return_msg(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
pld=result,
|
pld=result,
|
||||||
)
|
)
|
||||||
|
@ -409,6 +380,7 @@ async def _invoke(
|
||||||
|
|
||||||
is_rpc: bool = True,
|
is_rpc: bool = True,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
return_msg: Return|CancelAck = Return,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
|
@ -429,8 +401,6 @@ async def _invoke(
|
||||||
# XXX for .pause_from_sync()` usage we need to make sure
|
# XXX for .pause_from_sync()` usage we need to make sure
|
||||||
# `greenback` is boostrapped in the subactor!
|
# `greenback` is boostrapped in the subactor!
|
||||||
await _debug.maybe_init_greenback()
|
await _debug.maybe_init_greenback()
|
||||||
# else:
|
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# TODO: possibly a specially formatted traceback
|
# TODO: possibly a specially formatted traceback
|
||||||
# (not sure what typing is for this..)?
|
# (not sure what typing is for this..)?
|
||||||
|
@ -520,6 +490,7 @@ async def _invoke(
|
||||||
kwargs,
|
kwargs,
|
||||||
treat_as_gen,
|
treat_as_gen,
|
||||||
is_rpc,
|
is_rpc,
|
||||||
|
return_msg,
|
||||||
task_status,
|
task_status,
|
||||||
)
|
)
|
||||||
# below is only for `@context` funcs
|
# below is only for `@context` funcs
|
||||||
|
@ -550,11 +521,6 @@ async def _invoke(
|
||||||
functype='context',
|
functype='context',
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# await chan.send({
|
|
||||||
# # StartAck()
|
|
||||||
# 'cid': cid,
|
|
||||||
# 'functype': 'context',
|
|
||||||
# })
|
|
||||||
|
|
||||||
# TODO: should we also use an `.open_context()` equiv
|
# TODO: should we also use an `.open_context()` equiv
|
||||||
# for this callee side by factoring the impl from
|
# for this callee side by factoring the impl from
|
||||||
|
@ -579,16 +545,11 @@ async def _invoke(
|
||||||
|
|
||||||
# deliver final result to caller side.
|
# deliver final result to caller side.
|
||||||
await chan.send(
|
await chan.send(
|
||||||
Return(
|
return_msg(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
pld=res,
|
pld=res,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# await chan.send({
|
|
||||||
# # Return()
|
|
||||||
# 'cid': cid,
|
|
||||||
# 'return': res,
|
|
||||||
# })
|
|
||||||
|
|
||||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||||
# called by any of,
|
# called by any of,
|
||||||
|
@ -677,7 +638,6 @@ async def _invoke(
|
||||||
ctxc = ContextCancelled(
|
ctxc = ContextCancelled(
|
||||||
msg,
|
msg,
|
||||||
boxed_type=trio.Cancelled,
|
boxed_type=trio.Cancelled,
|
||||||
# boxed_type_str='Cancelled',
|
|
||||||
canceller=canceller,
|
canceller=canceller,
|
||||||
)
|
)
|
||||||
# assign local error so that the `.outcome`
|
# assign local error so that the `.outcome`
|
||||||
|
@ -778,12 +738,12 @@ async def try_ship_error_to_remote(
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
BrokenPipeError,
|
BrokenPipeError,
|
||||||
):
|
):
|
||||||
# err_msg: dict = msg['error']['tb_str']
|
|
||||||
log.critical(
|
log.critical(
|
||||||
'IPC transport failure -> '
|
'IPC transport failure -> '
|
||||||
f'failed to ship error to {remote_descr}!\n\n'
|
f'failed to ship error to {remote_descr}!\n\n'
|
||||||
f'X=> {channel.uid}\n\n'
|
f'X=> {channel.uid}\n\n'
|
||||||
# f'{err_msg}\n'
|
|
||||||
|
# TODO: use `.msg.preetty_struct` for this!
|
||||||
f'{msg}\n'
|
f'{msg}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -825,6 +785,8 @@ async def process_messages(
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
assert actor._service_n # state sanity
|
||||||
|
|
||||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||||
# should use it?
|
# should use it?
|
||||||
# https://github.com/python-trio/trio/issues/467
|
# https://github.com/python-trio/trio/issues/467
|
||||||
|
@ -834,7 +796,7 @@ async def process_messages(
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
)
|
)
|
||||||
nursery_cancelled_before_task: bool = False
|
nursery_cancelled_before_task: bool = False
|
||||||
msg: dict | None = None
|
msg: Msg|None = None
|
||||||
try:
|
try:
|
||||||
# NOTE: this internal scope allows for keeping this
|
# NOTE: this internal scope allows for keeping this
|
||||||
# message loop running despite the current task having
|
# message loop running despite the current task having
|
||||||
|
@ -843,122 +805,49 @@ async def process_messages(
|
||||||
# using ``scope = Nursery.start()``
|
# using ``scope = Nursery.start()``
|
||||||
with CancelScope(shield=shield) as loop_cs:
|
with CancelScope(shield=shield) as loop_cs:
|
||||||
task_status.started(loop_cs)
|
task_status.started(loop_cs)
|
||||||
|
|
||||||
async for msg in chan:
|
async for msg in chan:
|
||||||
log.transport( # type: ignore
|
log.transport( # type: ignore
|
||||||
f'<= IPC msg from peer: {chan.uid}\n\n'
|
f'<= IPC msg from peer: {chan.uid}\n\n'
|
||||||
|
|
||||||
# TODO: conditionally avoid fmting depending
|
# TODO: avoid fmting depending on loglevel for perf?
|
||||||
# on log level (for perf)?
|
# -[ ] specifically `pformat()` sub-call..?
|
||||||
# => specifically `pformat()` sub-call..?
|
# -[ ] use `.msg.pretty_struct` here now instead!
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
match msg:
|
match msg:
|
||||||
|
# msg for an ongoing IPC ctx session, deliver msg to
|
||||||
# if msg is None:
|
# local task.
|
||||||
# dedicated loop terminate sentinel
|
|
||||||
case None:
|
|
||||||
|
|
||||||
tasks: dict[
|
|
||||||
tuple[Channel, str],
|
|
||||||
tuple[Context, Callable, trio.Event]
|
|
||||||
] = actor._rpc_tasks.copy()
|
|
||||||
log.cancel(
|
|
||||||
f'Peer IPC channel terminated via `None` setinel msg?\n'
|
|
||||||
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
|
|
||||||
f'peer: {chan.uid}\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
)
|
|
||||||
for (channel, cid) in tasks:
|
|
||||||
if channel is chan:
|
|
||||||
await actor._cancel_task(
|
|
||||||
cid,
|
|
||||||
channel,
|
|
||||||
requesting_uid=channel.uid,
|
|
||||||
|
|
||||||
ipc_msg=msg,
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
# cid = msg.get('cid')
|
|
||||||
# if cid:
|
|
||||||
case (
|
case (
|
||||||
StartAck(cid=cid)
|
StartAck(cid=cid)
|
||||||
| Started(cid=cid)
|
| Started(cid=cid)
|
||||||
| Yield(cid=cid)
|
| Yield(cid=cid)
|
||||||
| Stop(cid=cid)
|
| Stop(cid=cid)
|
||||||
| Return(cid=cid)
|
| Return(cid=cid)
|
||||||
| Error(cid=cid)
|
| CancelAck(cid=cid)
|
||||||
|
| Error(cid=cid) # RPC-task ctx specific
|
||||||
):
|
):
|
||||||
# deliver response to local caller/waiter
|
# deliver response to local caller/waiter
|
||||||
# via its per-remote-context memory channel.
|
# via its per-remote-context memory channel.
|
||||||
await actor._push_result(
|
await actor._deliver_ctx_payload(
|
||||||
chan,
|
chan,
|
||||||
cid,
|
cid,
|
||||||
msg,
|
msg,
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(
|
# `Actor`(-internal) runtime cancel requests
|
||||||
'Waiting on next IPC msg from\n'
|
|
||||||
f'peer: {chan.uid}:\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
|
|
||||||
# f'last msg: {msg}\n'
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# process a 'cmd' request-msg upack
|
|
||||||
# TODO: impl with native `msgspec.Struct` support !!
|
|
||||||
# -[ ] implement with ``match:`` syntax?
|
|
||||||
# -[ ] discard un-authed msgs as per,
|
|
||||||
# <TODO put issue for typed msging structs>
|
|
||||||
case Start(
|
case Start(
|
||||||
|
ns='self',
|
||||||
|
func='cancel',
|
||||||
cid=cid,
|
cid=cid,
|
||||||
ns=ns,
|
|
||||||
func=funcname,
|
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
uid=actorid,
|
|
||||||
):
|
):
|
||||||
# try:
|
kwargs |= {'req_chan': chan}
|
||||||
# (
|
|
||||||
# ns,
|
|
||||||
# funcname,
|
|
||||||
# kwargs,
|
|
||||||
# actorid,
|
|
||||||
# cid,
|
|
||||||
# ) = msg['cmd']
|
|
||||||
|
|
||||||
# # TODO: put in `case Error():` right?
|
# XXX NOTE XXX don't start entire actor
|
||||||
# except KeyError:
|
# runtime cancellation if this actor is
|
||||||
# # This is the non-rpc error case, that is, an
|
# currently in debug mode!
|
||||||
# # error **not** raised inside a call to ``_invoke()``
|
|
||||||
# # (i.e. no cid was provided in the msg - see above).
|
|
||||||
# # Push this error to all local channel consumers
|
|
||||||
# # (normally portals) by marking the channel as errored
|
|
||||||
# assert chan.uid
|
|
||||||
# exc = unpack_error(msg, chan=chan)
|
|
||||||
# chan._exc = exc
|
|
||||||
# raise exc
|
|
||||||
|
|
||||||
log.runtime(
|
|
||||||
'Handling RPC `Start` request from\n'
|
|
||||||
f'peer: {actorid}\n'
|
|
||||||
'\n'
|
|
||||||
f'=> {ns}.{funcname}({kwargs})\n'
|
|
||||||
)
|
|
||||||
# case Start(
|
|
||||||
# ns='self',
|
|
||||||
# funcname='cancel',
|
|
||||||
# ):
|
|
||||||
if ns == 'self':
|
|
||||||
if funcname == 'cancel':
|
|
||||||
func: Callable = actor.cancel
|
|
||||||
kwargs |= {
|
|
||||||
'req_chan': chan,
|
|
||||||
}
|
|
||||||
|
|
||||||
# don't start entire actor runtime cancellation
|
|
||||||
# if this actor is currently in debug mode!
|
|
||||||
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
|
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
@ -973,9 +862,10 @@ async def process_messages(
|
||||||
actor,
|
actor,
|
||||||
cid,
|
cid,
|
||||||
chan,
|
chan,
|
||||||
func,
|
actor.cancel,
|
||||||
kwargs,
|
kwargs,
|
||||||
is_rpc=False,
|
is_rpc=False,
|
||||||
|
return_msg=CancelAck,
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -985,41 +875,31 @@ async def process_messages(
|
||||||
loop_cs.cancel()
|
loop_cs.cancel()
|
||||||
break
|
break
|
||||||
|
|
||||||
# case Start(
|
case Start(
|
||||||
# ns='self',
|
ns='self',
|
||||||
# funcname='_cancel_task',
|
func='_cancel_task',
|
||||||
# ):
|
cid=cid,
|
||||||
if funcname == '_cancel_task':
|
kwargs=kwargs,
|
||||||
func: Callable = actor._cancel_task
|
):
|
||||||
|
|
||||||
# we immediately start the runtime machinery
|
|
||||||
# shutdown
|
|
||||||
# with CancelScope(shield=True):
|
|
||||||
target_cid: str = kwargs['cid']
|
target_cid: str = kwargs['cid']
|
||||||
kwargs |= {
|
kwargs |= {
|
||||||
# NOTE: ONLY the rpc-task-owning
|
'requesting_uid': chan.uid,
|
||||||
|
'ipc_msg': msg,
|
||||||
|
|
||||||
|
# XXX NOTE! ONLY the rpc-task-owning
|
||||||
# parent IPC channel should be able to
|
# parent IPC channel should be able to
|
||||||
# cancel it!
|
# cancel it!
|
||||||
'parent_chan': chan,
|
'parent_chan': chan,
|
||||||
'requesting_uid': chan.uid,
|
|
||||||
'ipc_msg': msg,
|
|
||||||
}
|
}
|
||||||
# TODO: remove? already have emit in meth.
|
|
||||||
# log.runtime(
|
|
||||||
# f'Rx RPC task cancel request\n'
|
|
||||||
# f'<= canceller: {chan.uid}\n'
|
|
||||||
# f' |_{chan}\n\n'
|
|
||||||
# f'=> {actor}\n'
|
|
||||||
# f' |_cid: {target_cid}\n'
|
|
||||||
# )
|
|
||||||
try:
|
try:
|
||||||
await _invoke(
|
await _invoke(
|
||||||
actor,
|
actor,
|
||||||
cid,
|
cid,
|
||||||
chan,
|
chan,
|
||||||
func,
|
actor._cancel_task,
|
||||||
kwargs,
|
kwargs,
|
||||||
is_rpc=False,
|
is_rpc=False,
|
||||||
|
return_msg=CancelAck,
|
||||||
)
|
)
|
||||||
except BaseException:
|
except BaseException:
|
||||||
log.exception(
|
log.exception(
|
||||||
|
@ -1029,29 +909,44 @@ async def process_messages(
|
||||||
f'=> {actor}\n'
|
f'=> {actor}\n'
|
||||||
f' |_cid: {target_cid}\n'
|
f' |_cid: {target_cid}\n'
|
||||||
)
|
)
|
||||||
continue
|
|
||||||
|
|
||||||
# case Start(
|
# the "MAIN" RPC endpoint to schedule-a-`trio.Task`
|
||||||
# ns='self',
|
# ------ - ------
|
||||||
# funcname='register_actor',
|
# -[x] discard un-authed msgs as per,
|
||||||
# ):
|
# <TODO put issue for typed msging structs>
|
||||||
else:
|
case Start(
|
||||||
# normally registry methods, eg.
|
cid=cid,
|
||||||
# ``.register_actor()`` etc.
|
ns=ns,
|
||||||
|
func=funcname,
|
||||||
|
kwargs=kwargs, # type-spec this? see `msg.types`
|
||||||
|
uid=actorid,
|
||||||
|
):
|
||||||
|
log.runtime(
|
||||||
|
'Handling RPC `Start` request from\n'
|
||||||
|
f'peer: {actorid}\n'
|
||||||
|
'\n'
|
||||||
|
f'=> {ns}.{funcname}({kwargs})\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# runtime-internal endpoint: `Actor.<funcname>`
|
||||||
|
# only registry methods exist now yah,
|
||||||
|
# like ``.register_actor()`` etc. ?
|
||||||
|
if ns == 'self':
|
||||||
func: Callable = getattr(actor, funcname)
|
func: Callable = getattr(actor, funcname)
|
||||||
|
|
||||||
# case Start(
|
# application RPC endpoint
|
||||||
# ns=str(),
|
|
||||||
# funcname=funcname,
|
|
||||||
# ):
|
|
||||||
else:
|
else:
|
||||||
# complain to client about restricted modules
|
|
||||||
try:
|
try:
|
||||||
func = actor._get_rpc_func(ns, funcname)
|
func: Callable = actor._get_rpc_func(
|
||||||
|
ns,
|
||||||
|
funcname,
|
||||||
|
)
|
||||||
except (
|
except (
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
AttributeError,
|
AttributeError,
|
||||||
) as err:
|
) as err:
|
||||||
|
# always complain to requester
|
||||||
|
# client about un-enabled modules
|
||||||
err_msg: dict[str, dict] = pack_error(
|
err_msg: dict[str, dict] = pack_error(
|
||||||
err,
|
err,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -1061,6 +956,7 @@ async def process_messages(
|
||||||
|
|
||||||
# schedule a task for the requested RPC function
|
# schedule a task for the requested RPC function
|
||||||
# in the actor's main "service nursery".
|
# in the actor's main "service nursery".
|
||||||
|
#
|
||||||
# TODO: possibly a service-tn per IPC channel for
|
# TODO: possibly a service-tn per IPC channel for
|
||||||
# supervision isolation? would avoid having to
|
# supervision isolation? would avoid having to
|
||||||
# manage RPC tasks individually in `._rpc_tasks`
|
# manage RPC tasks individually in `._rpc_tasks`
|
||||||
|
@ -1069,7 +965,7 @@ async def process_messages(
|
||||||
f'Spawning task for RPC request\n'
|
f'Spawning task for RPC request\n'
|
||||||
f'<= caller: {chan.uid}\n'
|
f'<= caller: {chan.uid}\n'
|
||||||
f' |_{chan}\n\n'
|
f' |_{chan}\n\n'
|
||||||
# TODO: maddr style repr?
|
# ^-TODO-^ maddr style repr?
|
||||||
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
|
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
|
||||||
# f'cid="{cid[-16:]} .."\n\n'
|
# f'cid="{cid[-16:]} .."\n\n'
|
||||||
|
|
||||||
|
@ -1077,7 +973,6 @@ async def process_messages(
|
||||||
f' |_cid: {cid}\n'
|
f' |_cid: {cid}\n'
|
||||||
f' |>> {func}()\n'
|
f' |>> {func}()\n'
|
||||||
)
|
)
|
||||||
assert actor._service_n # wait why? do it at top?
|
|
||||||
try:
|
try:
|
||||||
ctx: Context = await actor._service_n.start(
|
ctx: Context = await actor._service_n.start(
|
||||||
partial(
|
partial(
|
||||||
|
@ -1107,13 +1002,12 @@ async def process_messages(
|
||||||
log.warning(
|
log.warning(
|
||||||
'Task for RPC failed?'
|
'Task for RPC failed?'
|
||||||
f'|_ {func}()\n\n'
|
f'|_ {func}()\n\n'
|
||||||
|
|
||||||
f'{err}'
|
f'{err}'
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# mark that we have ongoing rpc tasks
|
# mark our global state with ongoing rpc tasks
|
||||||
actor._ongoing_rpc_tasks = trio.Event()
|
actor._ongoing_rpc_tasks = trio.Event()
|
||||||
|
|
||||||
# store cancel scope such that the rpc task can be
|
# store cancel scope such that the rpc task can be
|
||||||
|
@ -1124,23 +1018,26 @@ async def process_messages(
|
||||||
trio.Event(),
|
trio.Event(),
|
||||||
)
|
)
|
||||||
|
|
||||||
case Error()|_:
|
# XXX remote (runtime scoped) error or uknown
|
||||||
# This is the non-rpc error case, that is, an
|
# msg (type).
|
||||||
# error **not** raised inside a call to ``_invoke()``
|
case Error() | _:
|
||||||
# (i.e. no cid was provided in the msg - see above).
|
# NOTE: this is the non-rpc error case,
|
||||||
# Push this error to all local channel consumers
|
# that is, an error **not** raised inside
|
||||||
# (normally portals) by marking the channel as errored
|
# a call to ``_invoke()`` (i.e. no cid was
|
||||||
|
# provided in the msg - see above). Push
|
||||||
|
# this error to all local channel
|
||||||
|
# consumers (normally portals) by marking
|
||||||
|
# the channel as errored
|
||||||
log.exception(
|
log.exception(
|
||||||
f'Unhandled IPC msg:\n\n'
|
f'Unhandled IPC msg:\n\n'
|
||||||
f'{msg}\n'
|
f'{msg}\n'
|
||||||
)
|
)
|
||||||
assert chan.uid
|
# assert chan.uid
|
||||||
exc = unpack_error(
|
chan._exc: Exception = unpack_error(
|
||||||
msg,
|
msg,
|
||||||
chan=chan,
|
chan=chan,
|
||||||
)
|
)
|
||||||
chan._exc = exc
|
raise chan._exc
|
||||||
raise exc
|
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on next IPC msg from\n'
|
'Waiting on next IPC msg from\n'
|
||||||
|
@ -1148,10 +1045,12 @@ async def process_messages(
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# end of async for, channel disconnect vis
|
# END-OF `async for`:
|
||||||
# ``trio.EndOfChannel``
|
# IPC disconnected via `trio.EndOfChannel`, likely
|
||||||
|
# due to a (graceful) `Channel.aclose()`.
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
f'channel for {chan.uid} disconnected, cancelling RPC tasks\n'
|
||||||
|
f'|_{chan}\n'
|
||||||
)
|
)
|
||||||
await actor.cancel_rpc_tasks(
|
await actor.cancel_rpc_tasks(
|
||||||
req_uid=actor.uid,
|
req_uid=actor.uid,
|
||||||
|
@ -1168,9 +1067,10 @@ async def process_messages(
|
||||||
# connection-reset) is ok since we don't have a teardown
|
# connection-reset) is ok since we don't have a teardown
|
||||||
# handshake for them (yet) and instead we simply bail out of
|
# handshake for them (yet) and instead we simply bail out of
|
||||||
# the message loop and expect the teardown sequence to clean
|
# the message loop and expect the teardown sequence to clean
|
||||||
# up.
|
# up..
|
||||||
# TODO: don't show this msg if it's an emphemeral
|
# TODO: add a teardown handshake? and,
|
||||||
# discovery ep call?
|
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
|
||||||
|
# -[ ] figure out how this will break with other transports?
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'channel closed abruptly with\n'
|
f'channel closed abruptly with\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
|
|
|
@ -65,7 +65,12 @@ from trio import (
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .msg import NamespacePath
|
from tractor.msg import (
|
||||||
|
pretty_struct,
|
||||||
|
NamespacePath,
|
||||||
|
types as msgtypes,
|
||||||
|
Msg,
|
||||||
|
)
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._context import (
|
from ._context import (
|
||||||
mk_context,
|
mk_context,
|
||||||
|
@ -73,9 +78,10 @@ from ._context import (
|
||||||
)
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
unpack_error,
|
|
||||||
ModuleNotExposed,
|
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
ModuleNotExposed,
|
||||||
|
MsgTypeError,
|
||||||
|
unpack_error,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
|
@ -91,10 +97,6 @@ from ._rpc import (
|
||||||
process_messages,
|
process_messages,
|
||||||
try_ship_error_to_remote,
|
try_ship_error_to_remote,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
|
||||||
types as msgtypes,
|
|
||||||
pretty_struct,
|
|
||||||
)
|
|
||||||
# from tractor.msg.types import (
|
# from tractor.msg.types import (
|
||||||
# Aid,
|
# Aid,
|
||||||
# SpawnSpec,
|
# SpawnSpec,
|
||||||
|
@ -164,18 +166,15 @@ class Actor:
|
||||||
# Information about `__main__` from parent
|
# Information about `__main__` from parent
|
||||||
_parent_main_data: dict[str, str]
|
_parent_main_data: dict[str, str]
|
||||||
_parent_chan_cs: CancelScope|None = None
|
_parent_chan_cs: CancelScope|None = None
|
||||||
_spawn_spec: SpawnSpec|None = None
|
_spawn_spec: msgtypes.SpawnSpec|None = None
|
||||||
|
|
||||||
# syncs for setup/teardown sequences
|
# syncs for setup/teardown sequences
|
||||||
_server_down: trio.Event|None = None
|
_server_down: trio.Event|None = None
|
||||||
|
|
||||||
# user toggled crash handling (including monkey-patched in
|
|
||||||
# `trio.open_nursery()` via `.trionics._supervisor` B)
|
|
||||||
_debug_mode: bool = False
|
|
||||||
|
|
||||||
# if started on ``asycio`` running ``trio`` in guest mode
|
# if started on ``asycio`` running ``trio`` in guest mode
|
||||||
_infected_aio: bool = False
|
_infected_aio: bool = False
|
||||||
|
|
||||||
|
# TODO: nursery tracking like `trio` does?
|
||||||
# _ans: dict[
|
# _ans: dict[
|
||||||
# tuple[str, str],
|
# tuple[str, str],
|
||||||
# list[ActorNursery],
|
# list[ActorNursery],
|
||||||
|
@ -396,8 +395,9 @@ class Actor:
|
||||||
|
|
||||||
raise mne
|
raise mne
|
||||||
|
|
||||||
|
# TODO: maybe change to mod-func and rename for implied
|
||||||
|
# multi-transport semantics?
|
||||||
async def _stream_handler(
|
async def _stream_handler(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
stream: trio.SocketStream,
|
stream: trio.SocketStream,
|
||||||
|
|
||||||
|
@ -559,7 +559,7 @@ class Actor:
|
||||||
cid: str|None = msg.cid
|
cid: str|None = msg.cid
|
||||||
if cid:
|
if cid:
|
||||||
# deliver response to local caller/waiter
|
# deliver response to local caller/waiter
|
||||||
await self._push_result(
|
await self._deliver_ctx_payload(
|
||||||
chan,
|
chan,
|
||||||
cid,
|
cid,
|
||||||
msg,
|
msg,
|
||||||
|
@ -716,43 +716,13 @@ class Actor:
|
||||||
# TODO: figure out why this breaks tests..
|
# TODO: figure out why this breaks tests..
|
||||||
db_cs.cancel()
|
db_cs.cancel()
|
||||||
|
|
||||||
# XXX: is this necessary (GC should do it)?
|
|
||||||
# XXX WARNING XXX
|
|
||||||
# Be AWARE OF THE INDENT LEVEL HERE
|
|
||||||
# -> ONLY ENTER THIS BLOCK WHEN ._peers IS
|
|
||||||
# EMPTY!!!!
|
|
||||||
if (
|
|
||||||
not self._peers
|
|
||||||
and chan.connected()
|
|
||||||
):
|
|
||||||
# if the channel is still connected it may mean the far
|
|
||||||
# end has not closed and we may have gotten here due to
|
|
||||||
# an error and so we should at least try to terminate
|
|
||||||
# the channel from this end gracefully.
|
|
||||||
log.runtime(
|
|
||||||
'Terminating channel with `None` setinel msg\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
# send msg loop terminate sentinel which
|
|
||||||
# triggers cancellation of all remotely
|
|
||||||
# started tasks.
|
|
||||||
await chan.send(None)
|
|
||||||
|
|
||||||
# XXX: do we want this? no right?
|
|
||||||
# causes "[104] connection reset by peer" on other end
|
|
||||||
# await chan.aclose()
|
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
|
||||||
log.runtime(f"Channel {chan.uid} was already closed")
|
|
||||||
|
|
||||||
# TODO: rename to `._deliver_payload()` since this handles
|
# TODO: rename to `._deliver_payload()` since this handles
|
||||||
# more then just `result` msgs now obvi XD
|
# more then just `result` msgs now obvi XD
|
||||||
async def _push_result(
|
async def _deliver_ctx_payload(
|
||||||
self,
|
self,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
cid: str,
|
cid: str,
|
||||||
msg: dict[str, Any],
|
msg: Msg|MsgTypeError,
|
||||||
|
|
||||||
) -> None|bool:
|
) -> None|bool:
|
||||||
'''
|
'''
|
||||||
|
@ -774,12 +744,16 @@ class Actor:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Ignoring invalid IPC ctx msg!\n\n'
|
'Ignoring invalid IPC ctx msg!\n\n'
|
||||||
f'<= sender: {uid}\n'
|
f'<= sender: {uid}\n'
|
||||||
f'=> cid: {cid}\n\n'
|
# XXX don't need right since it's always in msg?
|
||||||
|
# f'=> cid: {cid}\n\n'
|
||||||
|
|
||||||
f'{msg}\n'
|
f'{pretty_struct.Struct.pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# if isinstance(msg, MsgTypeError):
|
||||||
|
# return await ctx._deliver_bad_msg()
|
||||||
|
|
||||||
return await ctx._deliver_msg(msg)
|
return await ctx._deliver_msg(msg)
|
||||||
|
|
||||||
def get_context(
|
def get_context(
|
||||||
|
@ -1437,7 +1411,7 @@ class Actor:
|
||||||
)
|
)
|
||||||
await self._ongoing_rpc_tasks.wait()
|
await self._ongoing_rpc_tasks.wait()
|
||||||
|
|
||||||
def cancel_server(self) -> None:
|
def cancel_server(self) -> bool:
|
||||||
'''
|
'''
|
||||||
Cancel the internal IPC transport server nursery thereby
|
Cancel the internal IPC transport server nursery thereby
|
||||||
preventing any new inbound IPC connections establishing.
|
preventing any new inbound IPC connections establishing.
|
||||||
|
@ -1446,6 +1420,9 @@ class Actor:
|
||||||
if self._server_n:
|
if self._server_n:
|
||||||
log.runtime("Shutting down channel server")
|
log.runtime("Shutting down channel server")
|
||||||
self._server_n.cancel_scope.cancel()
|
self._server_n.cancel_scope.cancel()
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def accept_addrs(self) -> list[tuple[str, int]]:
|
def accept_addrs(self) -> list[tuple[str, int]]:
|
||||||
|
|
|
@ -46,7 +46,6 @@ from .trionics import (
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
Error,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -184,7 +183,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
# - via a received `{'stop': ...}` msg from remote side.
|
# - via a received `{'stop': ...}` msg from remote side.
|
||||||
# |_ NOTE: previously this was triggered by calling
|
# |_ NOTE: previously this was triggered by calling
|
||||||
# ``._rx_chan.aclose()`` on the send side of the channel inside
|
# ``._rx_chan.aclose()`` on the send side of the channel inside
|
||||||
# `Actor._push_result()`, but now the 'stop' message handling
|
# `Actor._deliver_ctx_payload()`, but now the 'stop' message handling
|
||||||
# has been put just above inside `_raise_from_no_key_in_msg()`.
|
# has been put just above inside `_raise_from_no_key_in_msg()`.
|
||||||
except (
|
except (
|
||||||
trio.EndOfChannel,
|
trio.EndOfChannel,
|
||||||
|
@ -391,11 +390,11 @@ class MsgStream(trio.abc.Channel):
|
||||||
|
|
||||||
if not self._eoc:
|
if not self._eoc:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Stream closed before it received an EoC?\n'
|
'Stream closed by self before it received an EoC?\n'
|
||||||
'Setting eoc manually..\n..'
|
'Setting eoc manually..\n..'
|
||||||
)
|
)
|
||||||
self._eoc: bool = trio.EndOfChannel(
|
self._eoc: bool = trio.EndOfChannel(
|
||||||
f'Context stream closed by {self._ctx.side}\n'
|
f'Context stream closed by self({self._ctx.side})\n'
|
||||||
f'|_{self}\n'
|
f'|_{self}\n'
|
||||||
)
|
)
|
||||||
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
||||||
|
|
|
@ -454,6 +454,10 @@ _runtime_msgs: list[Msg] = [
|
||||||
# emission from `MsgStream.aclose()`
|
# emission from `MsgStream.aclose()`
|
||||||
Stop,
|
Stop,
|
||||||
|
|
||||||
|
# `Return` sub-type that we always accept from
|
||||||
|
# runtime-internal cancel endpoints
|
||||||
|
CancelAck,
|
||||||
|
|
||||||
# box remote errors, normally subtypes
|
# box remote errors, normally subtypes
|
||||||
# of `RemoteActorError`.
|
# of `RemoteActorError`.
|
||||||
Error,
|
Error,
|
||||||
|
|
Loading…
Reference in New Issue