Compare commits
7 Commits
4fbd469c33
...
af3745684c
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | af3745684c | |
Tyler Goodlet | 3907cba68e | |
Tyler Goodlet | e3d59964af | |
Tyler Goodlet | ba83bab776 | |
Tyler Goodlet | 18d440c207 | |
Tyler Goodlet | edac717613 | |
Tyler Goodlet | 7e93b81a83 |
|
@ -11,9 +11,6 @@ from typing import (
|
|||
Type,
|
||||
Union,
|
||||
)
|
||||
from contextvars import (
|
||||
Context,
|
||||
)
|
||||
|
||||
from msgspec import (
|
||||
structs,
|
||||
|
@ -27,6 +24,7 @@ import tractor
|
|||
from tractor import (
|
||||
_state,
|
||||
MsgTypeError,
|
||||
Context,
|
||||
)
|
||||
from tractor.msg import (
|
||||
_codec,
|
||||
|
@ -41,7 +39,7 @@ from tractor.msg import (
|
|||
from tractor.msg.types import (
|
||||
_payload_msgs,
|
||||
log,
|
||||
Msg,
|
||||
PayloadMsg,
|
||||
Started,
|
||||
mk_msg_spec,
|
||||
)
|
||||
|
@ -61,7 +59,7 @@ def mk_custom_codec(
|
|||
uid: tuple[str, str] = tractor.current_actor().uid
|
||||
|
||||
# XXX NOTE XXX: despite defining `NamespacePath` as a type
|
||||
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
|
||||
# field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair
|
||||
# to cast to/from that type on the wire. See the docs:
|
||||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||
|
||||
|
@ -321,12 +319,12 @@ def dec_type_union(
|
|||
import importlib
|
||||
types: list[Type] = []
|
||||
for type_name in type_names:
|
||||
for ns in [
|
||||
for mod in [
|
||||
typing,
|
||||
importlib.import_module(__name__),
|
||||
]:
|
||||
if type_ref := getattr(
|
||||
ns,
|
||||
mod,
|
||||
type_name,
|
||||
False,
|
||||
):
|
||||
|
@ -744,7 +742,7 @@ def chk_pld_type(
|
|||
# 'Error', .pld: ErrorData
|
||||
|
||||
codec: MsgCodec = mk_codec(
|
||||
# NOTE: this ONLY accepts `Msg.pld` fields of a specified
|
||||
# NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified
|
||||
# type union.
|
||||
ipc_pld_spec=payload_spec,
|
||||
)
|
||||
|
@ -752,7 +750,7 @@ def chk_pld_type(
|
|||
# make a one-off dec to compare with our `MsgCodec` instance
|
||||
# which does the below `mk_msg_spec()` call internally
|
||||
ipc_msg_spec: Union[Type[Struct]]
|
||||
msg_types: list[Msg[payload_spec]]
|
||||
msg_types: list[PayloadMsg[payload_spec]]
|
||||
(
|
||||
ipc_msg_spec,
|
||||
msg_types,
|
||||
|
@ -761,7 +759,7 @@ def chk_pld_type(
|
|||
)
|
||||
_enc = msgpack.Encoder()
|
||||
_dec = msgpack.Decoder(
|
||||
type=ipc_msg_spec or Any, # like `Msg[Any]`
|
||||
type=ipc_msg_spec or Any, # like `PayloadMsg[Any]`
|
||||
)
|
||||
|
||||
assert (
|
||||
|
@ -806,7 +804,7 @@ def chk_pld_type(
|
|||
'cid': '666',
|
||||
'pld': pld,
|
||||
}
|
||||
enc_msg: Msg = typedef(**kwargs)
|
||||
enc_msg: PayloadMsg = typedef(**kwargs)
|
||||
|
||||
_wire_bytes: bytes = _enc.encode(enc_msg)
|
||||
wire_bytes: bytes = codec.enc.encode(enc_msg)
|
||||
|
@ -883,25 +881,16 @@ def test_limit_msgspec():
|
|||
debug_mode=True
|
||||
):
|
||||
|
||||
# ensure we can round-trip a boxing `Msg`
|
||||
# ensure we can round-trip a boxing `PayloadMsg`
|
||||
assert chk_pld_type(
|
||||
# Msg,
|
||||
Any,
|
||||
None,
|
||||
payload_spec=Any,
|
||||
pld=None,
|
||||
expect_roundtrip=True,
|
||||
)
|
||||
|
||||
# TODO: don't need this any more right since
|
||||
# `msgspec>=0.15` has the nice generics stuff yah??
|
||||
#
|
||||
# manually override the type annot of the payload
|
||||
# field and ensure it propagates to all msg-subtypes.
|
||||
# Msg.__annotations__['pld'] = Any
|
||||
|
||||
# verify that a mis-typed payload value won't decode
|
||||
assert not chk_pld_type(
|
||||
# Msg,
|
||||
int,
|
||||
payload_spec=int,
|
||||
pld='doggy',
|
||||
)
|
||||
|
||||
|
@ -913,18 +902,16 @@ def test_limit_msgspec():
|
|||
value: Any
|
||||
|
||||
assert not chk_pld_type(
|
||||
# Msg,
|
||||
CustomPayload,
|
||||
payload_spec=CustomPayload,
|
||||
pld='doggy',
|
||||
)
|
||||
|
||||
assert chk_pld_type(
|
||||
# Msg,
|
||||
CustomPayload,
|
||||
payload_spec=CustomPayload,
|
||||
pld=CustomPayload(name='doggy', value='urmom')
|
||||
)
|
||||
|
||||
# uhh bc we can `.pause_from_sync()` now! :surfer:
|
||||
# yah, we can `.pause_from_sync()` now!
|
||||
# breakpoint()
|
||||
|
||||
trio.run(main)
|
||||
|
|
|
@ -1336,6 +1336,23 @@ def test_shield_pause(
|
|||
child.expect(pexpect.EOF)
|
||||
|
||||
|
||||
# TODO: better error for "non-ideal" usage from the root actor.
|
||||
# -[ ] if called from an async scope emit a message that suggests
|
||||
# using `await tractor.pause()` instead since it's less overhead
|
||||
# (in terms of `greenback` and/or extra threads) and if it's from
|
||||
# a sync scope suggest that usage must first call
|
||||
# `ensure_portal()` in the (eventual parent) async calling scope?
|
||||
def test_sync_pause_from_bg_task_in_root_actor_():
|
||||
'''
|
||||
When used from the root actor, normally we can only implicitly
|
||||
support `.pause_from_sync()` from the main-parent-task (that
|
||||
opens the runtime via `open_root_actor()`) since `greenback`
|
||||
requires a `.ensure_portal()` call per `trio.Task` where it is
|
||||
used.
|
||||
|
||||
'''
|
||||
...
|
||||
|
||||
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
||||
def test_correct_frames_below_hidden():
|
||||
'''
|
||||
|
|
|
@ -19,7 +19,7 @@ from tractor._testing import (
|
|||
@pytest.fixture
|
||||
def run_example_in_subproc(
|
||||
loglevel: str,
|
||||
testdir,
|
||||
testdir: pytest.Testdir,
|
||||
reg_addr: tuple[str, int],
|
||||
):
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ from ._exceptions import (
|
|||
ModuleNotExposed as ModuleNotExposed,
|
||||
MsgTypeError as MsgTypeError,
|
||||
RemoteActorError as RemoteActorError,
|
||||
TransportClosed as TransportClosed,
|
||||
)
|
||||
from .devx import (
|
||||
breakpoint as breakpoint,
|
||||
|
|
|
@ -906,8 +906,59 @@ class StreamOverrun(
|
|||
'''
|
||||
|
||||
|
||||
class TransportClosed(trio.ClosedResourceError):
|
||||
"Underlying channel transport was closed prior to use"
|
||||
class TransportClosed(trio.BrokenResourceError):
|
||||
'''
|
||||
IPC transport (protocol) connection was closed or broke and
|
||||
indicates that the wrapping communication `Channel` can no longer
|
||||
be used to send/receive msgs from the remote peer.
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
loglevel: str = 'transport',
|
||||
cause: BaseException|None = None,
|
||||
raise_on_report: bool = False,
|
||||
|
||||
) -> None:
|
||||
self.message: str = message
|
||||
self._loglevel = loglevel
|
||||
super().__init__(message)
|
||||
|
||||
if cause is not None:
|
||||
self.__cause__ = cause
|
||||
|
||||
# flag to toggle whether the msg loop should raise
|
||||
# the exc in its `TransportClosed` handler block.
|
||||
self._raise_on_report = raise_on_report
|
||||
|
||||
def report_n_maybe_raise(
|
||||
self,
|
||||
message: str|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Using the init-specified log level emit a logging report
|
||||
for this error.
|
||||
|
||||
'''
|
||||
message: str = message or self.message
|
||||
# when a cause is set, slap it onto the log emission.
|
||||
if cause := self.__cause__:
|
||||
cause_tb_str: str = ''.join(
|
||||
traceback.format_tb(cause.__traceback__)
|
||||
)
|
||||
message += (
|
||||
f'{cause_tb_str}\n' # tb
|
||||
f' {cause}\n' # exc repr
|
||||
)
|
||||
|
||||
getattr(log, self._loglevel)(message)
|
||||
|
||||
# some errors we want to blow up from
|
||||
# inside the RPC msg loop
|
||||
if self._raise_on_report:
|
||||
raise self from cause
|
||||
|
||||
|
||||
class NoResult(RuntimeError):
|
||||
|
|
284
tractor/_ipc.py
284
tractor/_ipc.py
|
@ -54,7 +54,7 @@ from tractor._exceptions import (
|
|||
)
|
||||
from tractor.msg import (
|
||||
_ctxvar_MsgCodec,
|
||||
_codec,
|
||||
# _codec, XXX see `self._codec` sanity/debug checks
|
||||
MsgCodec,
|
||||
types as msgtypes,
|
||||
pretty_struct,
|
||||
|
@ -65,8 +65,18 @@ log = get_logger(__name__)
|
|||
_is_windows = platform.system() == 'Windows'
|
||||
|
||||
|
||||
def get_stream_addrs(stream: trio.SocketStream) -> tuple:
|
||||
# should both be IP sockets
|
||||
def get_stream_addrs(
|
||||
stream: trio.SocketStream
|
||||
) -> tuple[
|
||||
tuple[str, int], # local
|
||||
tuple[str, int], # remote
|
||||
]:
|
||||
'''
|
||||
Return the `trio` streaming transport prot's socket-addrs for
|
||||
both the local and remote sides as a pair.
|
||||
|
||||
'''
|
||||
# rn, should both be IP sockets
|
||||
lsockname = stream.socket.getsockname()
|
||||
rsockname = stream.socket.getpeername()
|
||||
return (
|
||||
|
@ -75,17 +85,22 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple:
|
|||
)
|
||||
|
||||
|
||||
# TODO: this should be our `Union[*msgtypes.__spec__]` now right?
|
||||
MsgType = TypeVar("MsgType")
|
||||
|
||||
# TODO: consider using a generic def and indexing with our eventual
|
||||
# msg definition/types?
|
||||
# - https://docs.python.org/3/library/typing.html#typing.Protocol
|
||||
# - https://jcristharif.com/msgspec/usage.html#structs
|
||||
# from tractor.msg.types import MsgType
|
||||
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
|
||||
# => BLEH, except can't bc prots must inherit typevar or param-spec
|
||||
# vars..
|
||||
MsgType = TypeVar('MsgType')
|
||||
|
||||
|
||||
# TODO: break up this mod into a subpkg so we can start adding new
|
||||
# backends and move this type stuff into a dedicated file.. Bo
|
||||
#
|
||||
@runtime_checkable
|
||||
class MsgTransport(Protocol[MsgType]):
|
||||
#
|
||||
# ^-TODO-^ consider using a generic def and indexing with our
|
||||
# eventual msg definition/types?
|
||||
# - https://docs.python.org/3/library/typing.html#typing.Protocol
|
||||
|
||||
stream: trio.SocketStream
|
||||
drained: list[MsgType]
|
||||
|
@ -120,9 +135,9 @@ class MsgTransport(Protocol[MsgType]):
|
|||
...
|
||||
|
||||
|
||||
# TODO: not sure why we have to inherit here, but it seems to be an
|
||||
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
|
||||
# probably should make a `mypy` issue?
|
||||
# TODO: typing oddity.. not sure why we have to inherit here, but it
|
||||
# seems to be an issue with `get_msg_transport()` returning
|
||||
# a `Type[Protocol]`; probably should make a `mypy` issue?
|
||||
class MsgpackTCPStream(MsgTransport):
|
||||
'''
|
||||
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||
|
@ -145,7 +160,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||
#
|
||||
# TODO: define this as a `Codec` struct which can be
|
||||
# overriden dynamically by the application/runtime.
|
||||
# overriden dynamically by the application/runtime?
|
||||
codec: tuple[
|
||||
Callable[[Any], Any]|None, # coder
|
||||
Callable[[type, Any], Any]|None, # decoder
|
||||
|
@ -160,7 +175,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
self._laddr, self._raddr = get_stream_addrs(stream)
|
||||
|
||||
# create read loop instance
|
||||
self._agen = self._iter_packets()
|
||||
self._aiter_pkts = self._iter_packets()
|
||||
self._send_lock = trio.StrictFIFOLock()
|
||||
|
||||
# public i guess?
|
||||
|
@ -174,15 +189,12 @@ class MsgpackTCPStream(MsgTransport):
|
|||
# allow for custom IPC msg interchange format
|
||||
# dynamic override Bo
|
||||
self._task = trio.lowlevel.current_task()
|
||||
self._codec: MsgCodec = (
|
||||
codec
|
||||
or
|
||||
_codec._ctxvar_MsgCodec.get()
|
||||
)
|
||||
# TODO: mask out before release?
|
||||
# log.runtime(
|
||||
# f'New {self} created with codec\n'
|
||||
# f'codec: {self._codec}\n'
|
||||
|
||||
# XXX for ctxvar debug only!
|
||||
# self._codec: MsgCodec = (
|
||||
# codec
|
||||
# or
|
||||
# _codec._ctxvar_MsgCodec.get()
|
||||
# )
|
||||
|
||||
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
||||
|
@ -190,6 +202,11 @@ class MsgpackTCPStream(MsgTransport):
|
|||
Yield `bytes`-blob decoded packets from the underlying TCP
|
||||
stream using the current task's `MsgCodec`.
|
||||
|
||||
This is a streaming routine implemented as an async generator
|
||||
func (which was the original design, but could be changed?)
|
||||
and is allocated by a `.__call__()` inside `.__init__()` where
|
||||
it is assigned to the `._aiter_pkts` attr.
|
||||
|
||||
'''
|
||||
decodes_failed: int = 0
|
||||
|
||||
|
@ -204,16 +221,82 @@ class MsgpackTCPStream(MsgTransport):
|
|||
# seem to be getting racy failures here on
|
||||
# arbiter/registry name subs..
|
||||
trio.BrokenResourceError,
|
||||
):
|
||||
raise TransportClosed(
|
||||
f'transport {self} was already closed prior ro read'
|
||||
)
|
||||
|
||||
) as trans_err:
|
||||
|
||||
loglevel = 'transport'
|
||||
match trans_err:
|
||||
# case (
|
||||
# ConnectionResetError()
|
||||
# ):
|
||||
# loglevel = 'transport'
|
||||
|
||||
# peer actor (graceful??) TCP EOF but `tricycle`
|
||||
# seems to raise a 0-bytes-read?
|
||||
case ValueError() if (
|
||||
'unclean EOF' in trans_err.args[0]
|
||||
):
|
||||
pass
|
||||
|
||||
# peer actor (task) prolly shutdown quickly due
|
||||
# to cancellation
|
||||
case trio.BrokenResourceError() if (
|
||||
'Connection reset by peer' in trans_err.args[0]
|
||||
):
|
||||
pass
|
||||
|
||||
# unless the disconnect condition falls under "a
|
||||
# normal operation breakage" we usualy console warn
|
||||
# about it.
|
||||
case _:
|
||||
loglevel: str = 'warning'
|
||||
|
||||
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'IPC transport already closed by peer\n'
|
||||
f'x)> {type(trans_err)}\n'
|
||||
f' |_{self}\n'
|
||||
),
|
||||
loglevel=loglevel,
|
||||
) from trans_err
|
||||
|
||||
# XXX definitely can happen if transport is closed
|
||||
# manually by another `trio.lowlevel.Task` in the
|
||||
# same actor; we use this in some simulated fault
|
||||
# testing for ex, but generally should never happen
|
||||
# under normal operation!
|
||||
#
|
||||
# NOTE: as such we always re-raise this error from the
|
||||
# RPC msg loop!
|
||||
except trio.ClosedResourceError as closure_err:
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'IPC transport already manually closed locally?\n'
|
||||
f'x)> {type(closure_err)} \n'
|
||||
f' |_{self}\n'
|
||||
),
|
||||
loglevel='error',
|
||||
raise_on_report=(
|
||||
closure_err.args[0] == 'another task closed this fd'
|
||||
or
|
||||
closure_err.args[0] in ['another task closed this fd']
|
||||
),
|
||||
) from closure_err
|
||||
|
||||
# graceful TCP EOF disconnect
|
||||
if header == b'':
|
||||
raise TransportClosed(
|
||||
f'transport {self} was already closed prior ro read'
|
||||
message=(
|
||||
f'IPC transport already gracefully closed\n'
|
||||
f')>\n'
|
||||
f'|_{self}\n'
|
||||
),
|
||||
loglevel='transport',
|
||||
# cause=??? # handy or no?
|
||||
)
|
||||
|
||||
size: int
|
||||
size, = struct.unpack("<I", header)
|
||||
|
||||
log.transport(f'received header {size}') # type: ignore
|
||||
|
@ -225,33 +308,20 @@ class MsgpackTCPStream(MsgTransport):
|
|||
# the current `MsgCodec`.
|
||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
|
||||
# TODO: mask out before release?
|
||||
if self._codec.pld_spec != codec.pld_spec:
|
||||
# assert (
|
||||
# task := trio.lowlevel.current_task()
|
||||
# ) is not self._task
|
||||
# self._task = task
|
||||
self._codec = codec
|
||||
log.runtime(
|
||||
f'Using new codec in {self}.recv()\n'
|
||||
f'codec: {self._codec}\n\n'
|
||||
f'msg_bytes: {msg_bytes}\n'
|
||||
)
|
||||
# XXX for ctxvar debug only!
|
||||
# if self._codec.pld_spec != codec.pld_spec:
|
||||
# assert (
|
||||
# task := trio.lowlevel.current_task()
|
||||
# ) is not self._task
|
||||
# self._task = task
|
||||
# self._codec = codec
|
||||
# log.runtime(
|
||||
# f'Using new codec in {self}.recv()\n'
|
||||
# f'codec: {self._codec}\n\n'
|
||||
# f'msg_bytes: {msg_bytes}\n'
|
||||
# )
|
||||
yield codec.decode(msg_bytes)
|
||||
|
||||
# TODO: remove, was only for orig draft impl
|
||||
# testing.
|
||||
#
|
||||
# curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
# obj = curr_codec.decode(msg_bytes)
|
||||
# if (
|
||||
# curr_codec is not
|
||||
# _codec._def_msgspec_codec
|
||||
# ):
|
||||
# print(f'OBJ: {obj}\n')
|
||||
#
|
||||
# yield obj
|
||||
|
||||
# XXX NOTE: since the below error derives from
|
||||
# `DecodeError` we need to catch is specially
|
||||
# and always raise such that spec violations
|
||||
|
@ -295,7 +365,8 @@ class MsgpackTCPStream(MsgTransport):
|
|||
msg: msgtypes.MsgType,
|
||||
|
||||
strict_types: bool = True,
|
||||
# hide_tb: bool = False,
|
||||
hide_tb: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Send a msgpack encoded py-object-blob-as-msg over TCP.
|
||||
|
@ -304,21 +375,24 @@ class MsgpackTCPStream(MsgTransport):
|
|||
invalid msg type
|
||||
|
||||
'''
|
||||
# __tracebackhide__: bool = hide_tb
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# XXX see `trio._sync.AsyncContextManagerMixin` for details
|
||||
# on the `.acquire()`/`.release()` sequencing..
|
||||
async with self._send_lock:
|
||||
|
||||
# NOTE: lookup the `trio.Task.context`'s var for
|
||||
# the current `MsgCodec`.
|
||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
|
||||
# TODO: mask out before release?
|
||||
if self._codec.pld_spec != codec.pld_spec:
|
||||
self._codec = codec
|
||||
log.runtime(
|
||||
f'Using new codec in {self}.send()\n'
|
||||
f'codec: {self._codec}\n\n'
|
||||
f'msg: {msg}\n'
|
||||
)
|
||||
# XXX for ctxvar debug only!
|
||||
# if self._codec.pld_spec != codec.pld_spec:
|
||||
# self._codec = codec
|
||||
# log.runtime(
|
||||
# f'Using new codec in {self}.send()\n'
|
||||
# f'codec: {self._codec}\n\n'
|
||||
# f'msg: {msg}\n'
|
||||
# )
|
||||
|
||||
if type(msg) not in msgtypes.__msg_types__:
|
||||
if strict_types:
|
||||
|
@ -352,6 +426,16 @@ class MsgpackTCPStream(MsgTransport):
|
|||
size: bytes = struct.pack("<I", len(bytes_data))
|
||||
return await self.stream.send_all(size + bytes_data)
|
||||
|
||||
# ?TODO? does it help ever to dynamically show this
|
||||
# frame?
|
||||
# try:
|
||||
# <the-above_code>
|
||||
# except BaseException as _err:
|
||||
# err = _err
|
||||
# if not isinstance(err, MsgTypeError):
|
||||
# __tracebackhide__: bool = False
|
||||
# raise
|
||||
|
||||
@property
|
||||
def laddr(self) -> tuple[str, int]:
|
||||
return self._laddr
|
||||
|
@ -361,7 +445,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
return self._raddr
|
||||
|
||||
async def recv(self) -> Any:
|
||||
return await self._agen.asend(None)
|
||||
return await self._aiter_pkts.asend(None)
|
||||
|
||||
async def drain(self) -> AsyncIterator[dict]:
|
||||
'''
|
||||
|
@ -378,7 +462,7 @@ class MsgpackTCPStream(MsgTransport):
|
|||
yield msg
|
||||
|
||||
def __aiter__(self):
|
||||
return self._agen
|
||||
return self._aiter_pkts
|
||||
|
||||
def connected(self) -> bool:
|
||||
return self.stream.socket.fileno() != -1
|
||||
|
@ -433,7 +517,7 @@ class Channel:
|
|||
# set after handshake - always uid of far end
|
||||
self.uid: tuple[str, str]|None = None
|
||||
|
||||
self._agen = self._aiter_recv()
|
||||
self._aiter_msgs = self._iter_msgs()
|
||||
self._exc: Exception|None = None # set if far end actor errors
|
||||
self._closed: bool = False
|
||||
|
||||
|
@ -497,8 +581,6 @@ class Channel:
|
|||
)
|
||||
return self._transport
|
||||
|
||||
# TODO: something simliar at the IPC-`Context`
|
||||
# level so as to support
|
||||
@cm
|
||||
def apply_codec(
|
||||
self,
|
||||
|
@ -517,6 +599,7 @@ class Channel:
|
|||
finally:
|
||||
self._transport.codec = orig
|
||||
|
||||
# TODO: do a .src/.dst: str for maddrs?
|
||||
def __repr__(self) -> str:
|
||||
if not self._transport:
|
||||
return '<Channel with inactive transport?>'
|
||||
|
@ -560,27 +643,43 @@ class Channel:
|
|||
)
|
||||
return transport
|
||||
|
||||
# TODO: something like,
|
||||
# `pdbp.hideframe_on(errors=[MsgTypeError])`
|
||||
# instead of the `try/except` hack we have rn..
|
||||
# seems like a pretty useful thing to have in general
|
||||
# along with being able to filter certain stack frame(s / sets)
|
||||
# possibly based on the current log-level?
|
||||
async def send(
|
||||
self,
|
||||
payload: Any,
|
||||
|
||||
# hide_tb: bool = False,
|
||||
hide_tb: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Send a coded msg-blob over the transport.
|
||||
|
||||
'''
|
||||
# __tracebackhide__: bool = hide_tb
|
||||
log.transport(
|
||||
'=> send IPC msg:\n\n'
|
||||
f'{pformat(payload)}\n'
|
||||
) # type: ignore
|
||||
assert self._transport
|
||||
await self._transport.send(
|
||||
payload,
|
||||
# hide_tb=hide_tb,
|
||||
)
|
||||
__tracebackhide__: bool = hide_tb
|
||||
try:
|
||||
log.transport(
|
||||
'=> send IPC msg:\n\n'
|
||||
f'{pformat(payload)}\n'
|
||||
)
|
||||
# assert self._transport # but why typing?
|
||||
await self._transport.send(
|
||||
payload,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
except BaseException as _err:
|
||||
err = _err # bind for introspection
|
||||
if not isinstance(_err, MsgTypeError):
|
||||
# assert err
|
||||
__tracebackhide__: bool = False
|
||||
else:
|
||||
assert err.cid
|
||||
|
||||
raise
|
||||
|
||||
async def recv(self) -> Any:
|
||||
assert self._transport
|
||||
|
@ -617,8 +716,11 @@ class Channel:
|
|||
await self.aclose(*args)
|
||||
|
||||
def __aiter__(self):
|
||||
return self._agen
|
||||
return self._aiter_msgs
|
||||
|
||||
# ?TODO? run any reconnection sequence?
|
||||
# -[ ] prolly should be impl-ed as deco-API?
|
||||
#
|
||||
# async def _reconnect(self) -> None:
|
||||
# """Handle connection failures by polling until a reconnect can be
|
||||
# established.
|
||||
|
@ -636,7 +738,6 @@ class Channel:
|
|||
# else:
|
||||
# log.transport("Stream connection re-established!")
|
||||
|
||||
# # TODO: run any reconnection sequence
|
||||
# # on_recon = self._recon_seq
|
||||
# # if on_recon:
|
||||
# # await on_recon(self)
|
||||
|
@ -650,11 +751,17 @@ class Channel:
|
|||
# " for re-establishment")
|
||||
# await trio.sleep(1)
|
||||
|
||||
async def _aiter_recv(
|
||||
async def _iter_msgs(
|
||||
self
|
||||
) -> AsyncGenerator[Any, None]:
|
||||
'''
|
||||
Async iterate items from underlying stream.
|
||||
Yield `MsgType` IPC msgs decoded and deliverd from
|
||||
an underlying `MsgTransport` protocol.
|
||||
|
||||
This is a streaming routine alo implemented as an async-gen
|
||||
func (same a `MsgTransport._iter_pkts()`) gets allocated by
|
||||
a `.__call__()` inside `.__init__()` where it is assigned to
|
||||
the `._aiter_msgs` attr.
|
||||
|
||||
'''
|
||||
assert self._transport
|
||||
|
@ -680,15 +787,6 @@ class Channel:
|
|||
case _:
|
||||
yield msg
|
||||
|
||||
# TODO: if we were gonna do this it should be
|
||||
# done up at the `MsgStream` layer!
|
||||
#
|
||||
# sent = yield item
|
||||
# if sent is not None:
|
||||
# # optimization, passing None through all the
|
||||
# # time is pointless
|
||||
# await self._transport.send(sent)
|
||||
|
||||
except trio.BrokenResourceError:
|
||||
|
||||
# if not self._autorecon:
|
||||
|
|
|
@ -68,7 +68,7 @@ from .msg import (
|
|||
MsgCodec,
|
||||
PayloadT,
|
||||
NamespacePath,
|
||||
pretty_struct,
|
||||
# pretty_struct,
|
||||
_ops as msgops,
|
||||
)
|
||||
from tractor.msg.types import (
|
||||
|
@ -89,6 +89,16 @@ if TYPE_CHECKING:
|
|||
log = get_logger('tractor')
|
||||
|
||||
|
||||
# ?TODO? move to a `tractor.lowlevel._rpc` with the below
|
||||
# func-type-cases implemented "on top of" `@context` defs:
|
||||
# -[ ] std async func helper decorated with `@rpc_func`?
|
||||
# -[ ] `Portal.open_stream_from()` with async-gens?
|
||||
# |_ possibly a duplex form of this with a
|
||||
# `sent_from_peer = yield send_to_peer` form, which would require
|
||||
# syncing the send/recv side with possibly `.receive_nowait()`
|
||||
# on each `yield`?
|
||||
# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with
|
||||
# user only defining a single-`yield` generator-func?
|
||||
async def _invoke_non_context(
|
||||
actor: Actor,
|
||||
cancel_scope: CancelScope,
|
||||
|
@ -108,8 +118,9 @@ async def _invoke_non_context(
|
|||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
__tracebackhide__: bool = True
|
||||
cs: CancelScope|None = None # ref when activated
|
||||
|
||||
# TODO: can we unify this with the `context=True` impl below?
|
||||
# ?TODO? can we unify this with the `context=True` impl below?
|
||||
if inspect.isasyncgen(coro):
|
||||
await chan.send(
|
||||
StartAck(
|
||||
|
@ -160,10 +171,6 @@ async def _invoke_non_context(
|
|||
functype='asyncgen',
|
||||
)
|
||||
)
|
||||
# XXX: the async-func may spawn further tasks which push
|
||||
# back values like an async-generator would but must
|
||||
# manualy construct the response dict-packet-responses as
|
||||
# above
|
||||
with cancel_scope as cs:
|
||||
ctx._scope = cs
|
||||
task_status.started(ctx)
|
||||
|
@ -175,15 +182,13 @@ async def _invoke_non_context(
|
|||
await chan.send(
|
||||
Stop(cid=cid)
|
||||
)
|
||||
|
||||
# simplest function/method request-response pattern
|
||||
# XXX: in the most minimally used case, just a scheduled internal runtime
|
||||
# call to `Actor._cancel_task()` from the ctx-peer task since we
|
||||
# don't (yet) have a dedicated IPC msg.
|
||||
# ------ - ------
|
||||
else:
|
||||
# regular async function/method
|
||||
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
||||
# from a remote request to cancel some `Context`.
|
||||
# ------ - ------
|
||||
# TODO: ideally we unify this with the above `context=True`
|
||||
# block such that for any remote invocation ftype, we
|
||||
# always invoke the far end RPC task scheduling the same
|
||||
# way: using the linked IPC context machinery.
|
||||
failed_resp: bool = False
|
||||
try:
|
||||
ack = StartAck(
|
||||
|
@ -354,8 +359,15 @@ async def _errors_relayed_via_ipc(
|
|||
# channel.
|
||||
task_status.started(err)
|
||||
|
||||
# always reraise KBIs so they propagate at the sys-process level.
|
||||
if isinstance(err, KeyboardInterrupt):
|
||||
# always propagate KBIs at the sys-process level.
|
||||
if (
|
||||
isinstance(err, KeyboardInterrupt)
|
||||
|
||||
# ?TODO? except when running in asyncio mode?
|
||||
# |_ wut if you want to open a `@context` FROM an
|
||||
# infected_aio task?
|
||||
# and not actor.is_infected_aio()
|
||||
):
|
||||
raise
|
||||
|
||||
# RPC task bookeeping.
|
||||
|
@ -458,7 +470,6 @@ async def _invoke(
|
|||
# tb: TracebackType = None
|
||||
|
||||
cancel_scope = CancelScope()
|
||||
cs: CancelScope|None = None # ref when activated
|
||||
ctx = actor.get_context(
|
||||
chan=chan,
|
||||
cid=cid,
|
||||
|
@ -607,6 +618,8 @@ async def _invoke(
|
|||
# `@context` marked RPC function.
|
||||
# - `._portal` is never set.
|
||||
try:
|
||||
tn: trio.Nursery
|
||||
rpc_ctx_cs: CancelScope
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
msgops.maybe_limit_plds(
|
||||
|
@ -616,7 +629,7 @@ async def _invoke(
|
|||
),
|
||||
):
|
||||
ctx._scope_nursery = tn
|
||||
ctx._scope = tn.cancel_scope
|
||||
rpc_ctx_cs = ctx._scope = tn.cancel_scope
|
||||
task_status.started(ctx)
|
||||
|
||||
# TODO: better `trionics` tooling:
|
||||
|
@ -642,7 +655,7 @@ async def _invoke(
|
|||
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
||||
# which cancels the scope presuming the input error
|
||||
# is not a `.cancel_acked` pleaser.
|
||||
if ctx._scope.cancelled_caught:
|
||||
if rpc_ctx_cs.cancelled_caught:
|
||||
our_uid: tuple = actor.uid
|
||||
|
||||
# first check for and raise any remote error
|
||||
|
@ -652,9 +665,7 @@ async def _invoke(
|
|||
if re := ctx._remote_error:
|
||||
ctx._maybe_raise_remote_err(re)
|
||||
|
||||
cs: CancelScope = ctx._scope
|
||||
|
||||
if cs.cancel_called:
|
||||
if rpc_ctx_cs.cancel_called:
|
||||
canceller: tuple = ctx.canceller
|
||||
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
||||
|
||||
|
@ -680,9 +691,15 @@ async def _invoke(
|
|||
elif canceller == ctx.chan.uid:
|
||||
explain += f'its {ctx.peer_side!r}-side peer'
|
||||
|
||||
else:
|
||||
elif canceller == our_uid:
|
||||
explain += 'itself'
|
||||
|
||||
elif canceller:
|
||||
explain += 'a remote peer'
|
||||
|
||||
else:
|
||||
explain += 'an unknown cause?'
|
||||
|
||||
explain += (
|
||||
add_div(message=explain)
|
||||
+
|
||||
|
@ -911,7 +928,10 @@ async def process_messages(
|
|||
f'IPC msg from peer\n'
|
||||
f'<= {chan.uid}\n\n'
|
||||
|
||||
# TODO: avoid fmting depending on loglevel for perf?
|
||||
# TODO: use of the pprinting of structs is
|
||||
# FRAGILE and should prolly not be
|
||||
#
|
||||
# avoid fmting depending on loglevel for perf?
|
||||
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
|
||||
# - how to only log-level-aware actually call this?
|
||||
# -[ ] use `.msg.pretty_struct` here now instead!
|
||||
|
@ -1177,7 +1197,7 @@ async def process_messages(
|
|||
parent_chan=chan,
|
||||
)
|
||||
|
||||
except TransportClosed:
|
||||
except TransportClosed as tc:
|
||||
# channels "breaking" (for TCP streams by EOF or 104
|
||||
# connection-reset) is ok since we don't have a teardown
|
||||
# handshake for them (yet) and instead we simply bail out of
|
||||
|
@ -1185,12 +1205,20 @@ async def process_messages(
|
|||
# up..
|
||||
#
|
||||
# TODO: maybe add a teardown handshake? and,
|
||||
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
|
||||
# -[x] don't show this msg if it's an ephemeral discovery ep call?
|
||||
# |_ see the below `.report_n_maybe_raise()` impl as well as
|
||||
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
|
||||
# for different read-failure cases.
|
||||
# -[ ] figure out how this will break with other transports?
|
||||
log.runtime(
|
||||
f'IPC channel closed abruptly\n'
|
||||
f'<=x peer: {chan.uid}\n'
|
||||
f' |_{chan.raddr}\n'
|
||||
tc.report_n_maybe_raise(
|
||||
message=(
|
||||
f'peer IPC channel closed abruptly?\n\n'
|
||||
f'<=x {chan}\n'
|
||||
f' |_{chan.raddr}\n\n'
|
||||
)
|
||||
+
|
||||
tc.message
|
||||
|
||||
)
|
||||
|
||||
# transport **WAS** disconnected
|
||||
|
@ -1238,7 +1266,7 @@ async def process_messages(
|
|||
'Exiting IPC msg loop with final msg\n\n'
|
||||
f'<= peer: {chan.uid}\n'
|
||||
f' |_{chan}\n\n'
|
||||
f'{pretty_struct.pformat(msg)}'
|
||||
# f'{pretty_struct.pformat(msg)}'
|
||||
)
|
||||
|
||||
log.runtime(message)
|
||||
|
|
|
@ -54,11 +54,12 @@ LOG_FORMAT = (
|
|||
DATE_FORMAT = '%b %d %H:%M:%S'
|
||||
|
||||
# FYI, ERROR is 40
|
||||
# TODO: use a `bidict` to avoid the :155 check?
|
||||
CUSTOM_LEVELS: dict[str, int] = {
|
||||
'TRANSPORT': 5,
|
||||
'RUNTIME': 15,
|
||||
'DEVX': 17,
|
||||
'CANCEL': 18,
|
||||
'CANCEL': 22,
|
||||
'PDB': 500,
|
||||
}
|
||||
STD_PALETTE = {
|
||||
|
@ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter):
|
|||
Delegate a log call to the underlying logger, after adding
|
||||
contextual information from this adapter instance.
|
||||
|
||||
NOTE: all custom level methods (above) delegate to this!
|
||||
|
||||
'''
|
||||
if self.isEnabledFor(level):
|
||||
stacklevel: int = 3
|
||||
|
|
|
@ -34,6 +34,9 @@ from pprint import (
|
|||
saferepr,
|
||||
)
|
||||
|
||||
from tractor.log import get_logger
|
||||
|
||||
log = get_logger()
|
||||
# TODO: auto-gen type sig for input func both for
|
||||
# type-msgs and logging of RPC tasks?
|
||||
# taken and modified from:
|
||||
|
@ -143,7 +146,13 @@ def pformat(
|
|||
|
||||
else: # the `pprint` recursion-safe format:
|
||||
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
||||
val_str: str = saferepr(v)
|
||||
try:
|
||||
val_str: str = saferepr(v)
|
||||
except Exception:
|
||||
log.exception(
|
||||
'Failed to `saferepr({type(struct)})` !?\n'
|
||||
)
|
||||
return _Struct.__repr__(struct)
|
||||
|
||||
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
||||
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
||||
|
@ -194,12 +203,20 @@ class Struct(
|
|||
return sin_props
|
||||
|
||||
pformat = pformat
|
||||
# __repr__ = pformat
|
||||
# __str__ = __repr__ = pformat
|
||||
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
||||
# inside a known tty?
|
||||
# def __repr__(self) -> str:
|
||||
# ...
|
||||
__repr__ = pformat
|
||||
def __repr__(self) -> str:
|
||||
try:
|
||||
return pformat(self)
|
||||
except Exception:
|
||||
log.exception(
|
||||
f'Failed to `pformat({type(self)})` !?\n'
|
||||
)
|
||||
return _Struct.__repr__(self)
|
||||
|
||||
def copy(
|
||||
self,
|
||||
|
|
|
@ -156,11 +156,12 @@ class BroadcastState(Struct):
|
|||
|
||||
class BroadcastReceiver(ReceiveChannel):
|
||||
'''
|
||||
A memory receive channel broadcaster which is non-lossy for the
|
||||
fastest consumer.
|
||||
A memory receive channel broadcaster which is non-lossy for
|
||||
the fastest consumer.
|
||||
|
||||
Additional consumer tasks can receive all produced values by registering
|
||||
with ``.subscribe()`` and receiving from the new instance it delivers.
|
||||
Additional consumer tasks can receive all produced values by
|
||||
registering with ``.subscribe()`` and receiving from the new
|
||||
instance it delivers.
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
|
|
|
@ -18,8 +18,12 @@
|
|||
Async context manager primitives with hard ``trio``-aware semantics
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from __future__ import annotations
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
import inspect
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncContextManager,
|
||||
|
@ -30,13 +34,16 @@ from typing import (
|
|||
Optional,
|
||||
Sequence,
|
||||
TypeVar,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
|
||||
from tractor._state import current_actor
|
||||
from tractor.log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor import ActorNursery
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
@ -46,8 +53,10 @@ T = TypeVar("T")
|
|||
|
||||
@acm
|
||||
async def maybe_open_nursery(
|
||||
nursery: trio.Nursery | None = None,
|
||||
nursery: trio.Nursery|ActorNursery|None = None,
|
||||
shield: bool = False,
|
||||
lib: ModuleType = trio,
|
||||
|
||||
) -> AsyncGenerator[trio.Nursery, Any]:
|
||||
'''
|
||||
Create a new nursery if None provided.
|
||||
|
@ -58,13 +67,12 @@ async def maybe_open_nursery(
|
|||
if nursery is not None:
|
||||
yield nursery
|
||||
else:
|
||||
async with trio.open_nursery() as nursery:
|
||||
async with lib.open_nursery() as nursery:
|
||||
nursery.cancel_scope.shield = shield
|
||||
yield nursery
|
||||
|
||||
|
||||
async def _enter_and_wait(
|
||||
|
||||
mngr: AsyncContextManager[T],
|
||||
unwrapped: dict[int, T],
|
||||
all_entered: trio.Event,
|
||||
|
@ -91,7 +99,6 @@ async def _enter_and_wait(
|
|||
|
||||
@acm
|
||||
async def gather_contexts(
|
||||
|
||||
mngrs: Sequence[AsyncContextManager[T]],
|
||||
|
||||
) -> AsyncGenerator[
|
||||
|
@ -102,15 +109,17 @@ async def gather_contexts(
|
|||
None,
|
||||
]:
|
||||
'''
|
||||
Concurrently enter a sequence of async context managers, each in
|
||||
a separate ``trio`` task and deliver the unwrapped values in the
|
||||
same order once all managers have entered. On exit all contexts are
|
||||
subsequently and concurrently exited.
|
||||
Concurrently enter a sequence of async context managers (acms),
|
||||
each from a separate `trio` task and deliver the unwrapped
|
||||
`yield`-ed values in the same order once all managers have entered.
|
||||
|
||||
This function is somewhat similar to common usage of
|
||||
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
|
||||
combo with ``asyncio.gather()`` except the managers are concurrently
|
||||
entered and exited, and cancellation just works.
|
||||
On exit, all acms are subsequently and concurrently exited.
|
||||
|
||||
This function is somewhat similar to a batch of non-blocking
|
||||
calls to `contextlib.AsyncExitStack.enter_async_context()`
|
||||
(inside a loop) *in combo with* a `asyncio.gather()` to get the
|
||||
`.__aenter__()`-ed values, except the managers are both
|
||||
concurrently entered and exited and *cancellation just works*(R).
|
||||
|
||||
'''
|
||||
seed: int = id(mngrs)
|
||||
|
@ -210,9 +219,10 @@ async def maybe_open_context(
|
|||
|
||||
) -> AsyncIterator[tuple[bool, T]]:
|
||||
'''
|
||||
Maybe open a context manager if there is not already a _Cached
|
||||
version for the provided ``key`` for *this* actor. Return the
|
||||
_Cached instance on a _Cache hit.
|
||||
Maybe open an async-context-manager (acm) if there is not already
|
||||
a `_Cached` version for the provided (input) `key` for *this* actor.
|
||||
|
||||
Return the `_Cached` instance on a _Cache hit.
|
||||
|
||||
'''
|
||||
fid = id(acm_func)
|
||||
|
@ -273,8 +283,13 @@ async def maybe_open_context(
|
|||
else:
|
||||
_Cache.users += 1
|
||||
log.runtime(
|
||||
f'Reusing resource for `_Cache` user {_Cache.users}\n\n'
|
||||
f'{ctx_key!r} -> {yielded!r}\n'
|
||||
f'Re-using cached resource for user {_Cache.users}\n\n'
|
||||
f'{ctx_key!r} -> {type(yielded)}\n'
|
||||
|
||||
# TODO: make this work with values but without
|
||||
# `msgspec.Struct` causing frickin crashes on field-type
|
||||
# lookups..
|
||||
# f'{ctx_key!r} -> {yielded!r}\n'
|
||||
)
|
||||
lock.release()
|
||||
yield True, yielded
|
||||
|
|
Loading…
Reference in New Issue