Compare commits
7 Commits
4fbd469c33
...
af3745684c
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | af3745684c | |
Tyler Goodlet | 3907cba68e | |
Tyler Goodlet | e3d59964af | |
Tyler Goodlet | ba83bab776 | |
Tyler Goodlet | 18d440c207 | |
Tyler Goodlet | edac717613 | |
Tyler Goodlet | 7e93b81a83 |
|
@ -11,9 +11,6 @@ from typing import (
|
||||||
Type,
|
Type,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
from contextvars import (
|
|
||||||
Context,
|
|
||||||
)
|
|
||||||
|
|
||||||
from msgspec import (
|
from msgspec import (
|
||||||
structs,
|
structs,
|
||||||
|
@ -27,6 +24,7 @@ import tractor
|
||||||
from tractor import (
|
from tractor import (
|
||||||
_state,
|
_state,
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
|
Context,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
_codec,
|
_codec,
|
||||||
|
@ -41,7 +39,7 @@ from tractor.msg import (
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
_payload_msgs,
|
_payload_msgs,
|
||||||
log,
|
log,
|
||||||
Msg,
|
PayloadMsg,
|
||||||
Started,
|
Started,
|
||||||
mk_msg_spec,
|
mk_msg_spec,
|
||||||
)
|
)
|
||||||
|
@ -61,7 +59,7 @@ def mk_custom_codec(
|
||||||
uid: tuple[str, str] = tractor.current_actor().uid
|
uid: tuple[str, str] = tractor.current_actor().uid
|
||||||
|
|
||||||
# XXX NOTE XXX: despite defining `NamespacePath` as a type
|
# XXX NOTE XXX: despite defining `NamespacePath` as a type
|
||||||
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
|
# field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair
|
||||||
# to cast to/from that type on the wire. See the docs:
|
# to cast to/from that type on the wire. See the docs:
|
||||||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||||
|
|
||||||
|
@ -321,12 +319,12 @@ def dec_type_union(
|
||||||
import importlib
|
import importlib
|
||||||
types: list[Type] = []
|
types: list[Type] = []
|
||||||
for type_name in type_names:
|
for type_name in type_names:
|
||||||
for ns in [
|
for mod in [
|
||||||
typing,
|
typing,
|
||||||
importlib.import_module(__name__),
|
importlib.import_module(__name__),
|
||||||
]:
|
]:
|
||||||
if type_ref := getattr(
|
if type_ref := getattr(
|
||||||
ns,
|
mod,
|
||||||
type_name,
|
type_name,
|
||||||
False,
|
False,
|
||||||
):
|
):
|
||||||
|
@ -744,7 +742,7 @@ def chk_pld_type(
|
||||||
# 'Error', .pld: ErrorData
|
# 'Error', .pld: ErrorData
|
||||||
|
|
||||||
codec: MsgCodec = mk_codec(
|
codec: MsgCodec = mk_codec(
|
||||||
# NOTE: this ONLY accepts `Msg.pld` fields of a specified
|
# NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified
|
||||||
# type union.
|
# type union.
|
||||||
ipc_pld_spec=payload_spec,
|
ipc_pld_spec=payload_spec,
|
||||||
)
|
)
|
||||||
|
@ -752,7 +750,7 @@ def chk_pld_type(
|
||||||
# make a one-off dec to compare with our `MsgCodec` instance
|
# make a one-off dec to compare with our `MsgCodec` instance
|
||||||
# which does the below `mk_msg_spec()` call internally
|
# which does the below `mk_msg_spec()` call internally
|
||||||
ipc_msg_spec: Union[Type[Struct]]
|
ipc_msg_spec: Union[Type[Struct]]
|
||||||
msg_types: list[Msg[payload_spec]]
|
msg_types: list[PayloadMsg[payload_spec]]
|
||||||
(
|
(
|
||||||
ipc_msg_spec,
|
ipc_msg_spec,
|
||||||
msg_types,
|
msg_types,
|
||||||
|
@ -761,7 +759,7 @@ def chk_pld_type(
|
||||||
)
|
)
|
||||||
_enc = msgpack.Encoder()
|
_enc = msgpack.Encoder()
|
||||||
_dec = msgpack.Decoder(
|
_dec = msgpack.Decoder(
|
||||||
type=ipc_msg_spec or Any, # like `Msg[Any]`
|
type=ipc_msg_spec or Any, # like `PayloadMsg[Any]`
|
||||||
)
|
)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
|
@ -806,7 +804,7 @@ def chk_pld_type(
|
||||||
'cid': '666',
|
'cid': '666',
|
||||||
'pld': pld,
|
'pld': pld,
|
||||||
}
|
}
|
||||||
enc_msg: Msg = typedef(**kwargs)
|
enc_msg: PayloadMsg = typedef(**kwargs)
|
||||||
|
|
||||||
_wire_bytes: bytes = _enc.encode(enc_msg)
|
_wire_bytes: bytes = _enc.encode(enc_msg)
|
||||||
wire_bytes: bytes = codec.enc.encode(enc_msg)
|
wire_bytes: bytes = codec.enc.encode(enc_msg)
|
||||||
|
@ -883,25 +881,16 @@ def test_limit_msgspec():
|
||||||
debug_mode=True
|
debug_mode=True
|
||||||
):
|
):
|
||||||
|
|
||||||
# ensure we can round-trip a boxing `Msg`
|
# ensure we can round-trip a boxing `PayloadMsg`
|
||||||
assert chk_pld_type(
|
assert chk_pld_type(
|
||||||
# Msg,
|
payload_spec=Any,
|
||||||
Any,
|
pld=None,
|
||||||
None,
|
|
||||||
expect_roundtrip=True,
|
expect_roundtrip=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: don't need this any more right since
|
|
||||||
# `msgspec>=0.15` has the nice generics stuff yah??
|
|
||||||
#
|
|
||||||
# manually override the type annot of the payload
|
|
||||||
# field and ensure it propagates to all msg-subtypes.
|
|
||||||
# Msg.__annotations__['pld'] = Any
|
|
||||||
|
|
||||||
# verify that a mis-typed payload value won't decode
|
# verify that a mis-typed payload value won't decode
|
||||||
assert not chk_pld_type(
|
assert not chk_pld_type(
|
||||||
# Msg,
|
payload_spec=int,
|
||||||
int,
|
|
||||||
pld='doggy',
|
pld='doggy',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -913,18 +902,16 @@ def test_limit_msgspec():
|
||||||
value: Any
|
value: Any
|
||||||
|
|
||||||
assert not chk_pld_type(
|
assert not chk_pld_type(
|
||||||
# Msg,
|
payload_spec=CustomPayload,
|
||||||
CustomPayload,
|
|
||||||
pld='doggy',
|
pld='doggy',
|
||||||
)
|
)
|
||||||
|
|
||||||
assert chk_pld_type(
|
assert chk_pld_type(
|
||||||
# Msg,
|
payload_spec=CustomPayload,
|
||||||
CustomPayload,
|
|
||||||
pld=CustomPayload(name='doggy', value='urmom')
|
pld=CustomPayload(name='doggy', value='urmom')
|
||||||
)
|
)
|
||||||
|
|
||||||
# uhh bc we can `.pause_from_sync()` now! :surfer:
|
# yah, we can `.pause_from_sync()` now!
|
||||||
# breakpoint()
|
# breakpoint()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -1336,6 +1336,23 @@ def test_shield_pause(
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: better error for "non-ideal" usage from the root actor.
|
||||||
|
# -[ ] if called from an async scope emit a message that suggests
|
||||||
|
# using `await tractor.pause()` instead since it's less overhead
|
||||||
|
# (in terms of `greenback` and/or extra threads) and if it's from
|
||||||
|
# a sync scope suggest that usage must first call
|
||||||
|
# `ensure_portal()` in the (eventual parent) async calling scope?
|
||||||
|
def test_sync_pause_from_bg_task_in_root_actor_():
|
||||||
|
'''
|
||||||
|
When used from the root actor, normally we can only implicitly
|
||||||
|
support `.pause_from_sync()` from the main-parent-task (that
|
||||||
|
opens the runtime via `open_root_actor()`) since `greenback`
|
||||||
|
requires a `.ensure_portal()` call per `trio.Task` where it is
|
||||||
|
used.
|
||||||
|
|
||||||
|
'''
|
||||||
|
...
|
||||||
|
|
||||||
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
|
||||||
def test_correct_frames_below_hidden():
|
def test_correct_frames_below_hidden():
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -19,7 +19,7 @@ from tractor._testing import (
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def run_example_in_subproc(
|
def run_example_in_subproc(
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
testdir,
|
testdir: pytest.Testdir,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
):
|
):
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ from ._exceptions import (
|
||||||
ModuleNotExposed as ModuleNotExposed,
|
ModuleNotExposed as ModuleNotExposed,
|
||||||
MsgTypeError as MsgTypeError,
|
MsgTypeError as MsgTypeError,
|
||||||
RemoteActorError as RemoteActorError,
|
RemoteActorError as RemoteActorError,
|
||||||
|
TransportClosed as TransportClosed,
|
||||||
)
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
breakpoint as breakpoint,
|
breakpoint as breakpoint,
|
||||||
|
|
|
@ -906,8 +906,59 @@ class StreamOverrun(
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
class TransportClosed(trio.ClosedResourceError):
|
class TransportClosed(trio.BrokenResourceError):
|
||||||
"Underlying channel transport was closed prior to use"
|
'''
|
||||||
|
IPC transport (protocol) connection was closed or broke and
|
||||||
|
indicates that the wrapping communication `Channel` can no longer
|
||||||
|
be used to send/receive msgs from the remote peer.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
message: str,
|
||||||
|
loglevel: str = 'transport',
|
||||||
|
cause: BaseException|None = None,
|
||||||
|
raise_on_report: bool = False,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
self.message: str = message
|
||||||
|
self._loglevel = loglevel
|
||||||
|
super().__init__(message)
|
||||||
|
|
||||||
|
if cause is not None:
|
||||||
|
self.__cause__ = cause
|
||||||
|
|
||||||
|
# flag to toggle whether the msg loop should raise
|
||||||
|
# the exc in its `TransportClosed` handler block.
|
||||||
|
self._raise_on_report = raise_on_report
|
||||||
|
|
||||||
|
def report_n_maybe_raise(
|
||||||
|
self,
|
||||||
|
message: str|None = None,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Using the init-specified log level emit a logging report
|
||||||
|
for this error.
|
||||||
|
|
||||||
|
'''
|
||||||
|
message: str = message or self.message
|
||||||
|
# when a cause is set, slap it onto the log emission.
|
||||||
|
if cause := self.__cause__:
|
||||||
|
cause_tb_str: str = ''.join(
|
||||||
|
traceback.format_tb(cause.__traceback__)
|
||||||
|
)
|
||||||
|
message += (
|
||||||
|
f'{cause_tb_str}\n' # tb
|
||||||
|
f' {cause}\n' # exc repr
|
||||||
|
)
|
||||||
|
|
||||||
|
getattr(log, self._loglevel)(message)
|
||||||
|
|
||||||
|
# some errors we want to blow up from
|
||||||
|
# inside the RPC msg loop
|
||||||
|
if self._raise_on_report:
|
||||||
|
raise self from cause
|
||||||
|
|
||||||
|
|
||||||
class NoResult(RuntimeError):
|
class NoResult(RuntimeError):
|
||||||
|
|
284
tractor/_ipc.py
284
tractor/_ipc.py
|
@ -54,7 +54,7 @@ from tractor._exceptions import (
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
_ctxvar_MsgCodec,
|
_ctxvar_MsgCodec,
|
||||||
_codec,
|
# _codec, XXX see `self._codec` sanity/debug checks
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
types as msgtypes,
|
types as msgtypes,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
|
@ -65,8 +65,18 @@ log = get_logger(__name__)
|
||||||
_is_windows = platform.system() == 'Windows'
|
_is_windows = platform.system() == 'Windows'
|
||||||
|
|
||||||
|
|
||||||
def get_stream_addrs(stream: trio.SocketStream) -> tuple:
|
def get_stream_addrs(
|
||||||
# should both be IP sockets
|
stream: trio.SocketStream
|
||||||
|
) -> tuple[
|
||||||
|
tuple[str, int], # local
|
||||||
|
tuple[str, int], # remote
|
||||||
|
]:
|
||||||
|
'''
|
||||||
|
Return the `trio` streaming transport prot's socket-addrs for
|
||||||
|
both the local and remote sides as a pair.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# rn, should both be IP sockets
|
||||||
lsockname = stream.socket.getsockname()
|
lsockname = stream.socket.getsockname()
|
||||||
rsockname = stream.socket.getpeername()
|
rsockname = stream.socket.getpeername()
|
||||||
return (
|
return (
|
||||||
|
@ -75,17 +85,22 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO: this should be our `Union[*msgtypes.__spec__]` now right?
|
# from tractor.msg.types import MsgType
|
||||||
MsgType = TypeVar("MsgType")
|
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
|
||||||
|
# => BLEH, except can't bc prots must inherit typevar or param-spec
|
||||||
# TODO: consider using a generic def and indexing with our eventual
|
# vars..
|
||||||
# msg definition/types?
|
MsgType = TypeVar('MsgType')
|
||||||
# - https://docs.python.org/3/library/typing.html#typing.Protocol
|
|
||||||
# - https://jcristharif.com/msgspec/usage.html#structs
|
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: break up this mod into a subpkg so we can start adding new
|
||||||
|
# backends and move this type stuff into a dedicated file.. Bo
|
||||||
|
#
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class MsgTransport(Protocol[MsgType]):
|
class MsgTransport(Protocol[MsgType]):
|
||||||
|
#
|
||||||
|
# ^-TODO-^ consider using a generic def and indexing with our
|
||||||
|
# eventual msg definition/types?
|
||||||
|
# - https://docs.python.org/3/library/typing.html#typing.Protocol
|
||||||
|
|
||||||
stream: trio.SocketStream
|
stream: trio.SocketStream
|
||||||
drained: list[MsgType]
|
drained: list[MsgType]
|
||||||
|
@ -120,9 +135,9 @@ class MsgTransport(Protocol[MsgType]):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
# TODO: not sure why we have to inherit here, but it seems to be an
|
# TODO: typing oddity.. not sure why we have to inherit here, but it
|
||||||
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
|
# seems to be an issue with `get_msg_transport()` returning
|
||||||
# probably should make a `mypy` issue?
|
# a `Type[Protocol]`; probably should make a `mypy` issue?
|
||||||
class MsgpackTCPStream(MsgTransport):
|
class MsgpackTCPStream(MsgTransport):
|
||||||
'''
|
'''
|
||||||
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||||
|
@ -145,7 +160,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||||
#
|
#
|
||||||
# TODO: define this as a `Codec` struct which can be
|
# TODO: define this as a `Codec` struct which can be
|
||||||
# overriden dynamically by the application/runtime.
|
# overriden dynamically by the application/runtime?
|
||||||
codec: tuple[
|
codec: tuple[
|
||||||
Callable[[Any], Any]|None, # coder
|
Callable[[Any], Any]|None, # coder
|
||||||
Callable[[type, Any], Any]|None, # decoder
|
Callable[[type, Any], Any]|None, # decoder
|
||||||
|
@ -160,7 +175,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
self._laddr, self._raddr = get_stream_addrs(stream)
|
self._laddr, self._raddr = get_stream_addrs(stream)
|
||||||
|
|
||||||
# create read loop instance
|
# create read loop instance
|
||||||
self._agen = self._iter_packets()
|
self._aiter_pkts = self._iter_packets()
|
||||||
self._send_lock = trio.StrictFIFOLock()
|
self._send_lock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
# public i guess?
|
# public i guess?
|
||||||
|
@ -174,15 +189,12 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# allow for custom IPC msg interchange format
|
# allow for custom IPC msg interchange format
|
||||||
# dynamic override Bo
|
# dynamic override Bo
|
||||||
self._task = trio.lowlevel.current_task()
|
self._task = trio.lowlevel.current_task()
|
||||||
self._codec: MsgCodec = (
|
|
||||||
codec
|
# XXX for ctxvar debug only!
|
||||||
or
|
# self._codec: MsgCodec = (
|
||||||
_codec._ctxvar_MsgCodec.get()
|
# codec
|
||||||
)
|
# or
|
||||||
# TODO: mask out before release?
|
# _codec._ctxvar_MsgCodec.get()
|
||||||
# log.runtime(
|
|
||||||
# f'New {self} created with codec\n'
|
|
||||||
# f'codec: {self._codec}\n'
|
|
||||||
# )
|
# )
|
||||||
|
|
||||||
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
||||||
|
@ -190,6 +202,11 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
Yield `bytes`-blob decoded packets from the underlying TCP
|
Yield `bytes`-blob decoded packets from the underlying TCP
|
||||||
stream using the current task's `MsgCodec`.
|
stream using the current task's `MsgCodec`.
|
||||||
|
|
||||||
|
This is a streaming routine implemented as an async generator
|
||||||
|
func (which was the original design, but could be changed?)
|
||||||
|
and is allocated by a `.__call__()` inside `.__init__()` where
|
||||||
|
it is assigned to the `._aiter_pkts` attr.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
decodes_failed: int = 0
|
decodes_failed: int = 0
|
||||||
|
|
||||||
|
@ -204,16 +221,82 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# seem to be getting racy failures here on
|
# seem to be getting racy failures here on
|
||||||
# arbiter/registry name subs..
|
# arbiter/registry name subs..
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
):
|
|
||||||
raise TransportClosed(
|
|
||||||
f'transport {self} was already closed prior ro read'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
) as trans_err:
|
||||||
|
|
||||||
|
loglevel = 'transport'
|
||||||
|
match trans_err:
|
||||||
|
# case (
|
||||||
|
# ConnectionResetError()
|
||||||
|
# ):
|
||||||
|
# loglevel = 'transport'
|
||||||
|
|
||||||
|
# peer actor (graceful??) TCP EOF but `tricycle`
|
||||||
|
# seems to raise a 0-bytes-read?
|
||||||
|
case ValueError() if (
|
||||||
|
'unclean EOF' in trans_err.args[0]
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# peer actor (task) prolly shutdown quickly due
|
||||||
|
# to cancellation
|
||||||
|
case trio.BrokenResourceError() if (
|
||||||
|
'Connection reset by peer' in trans_err.args[0]
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# unless the disconnect condition falls under "a
|
||||||
|
# normal operation breakage" we usualy console warn
|
||||||
|
# about it.
|
||||||
|
case _:
|
||||||
|
loglevel: str = 'warning'
|
||||||
|
|
||||||
|
|
||||||
|
raise TransportClosed(
|
||||||
|
message=(
|
||||||
|
f'IPC transport already closed by peer\n'
|
||||||
|
f'x)> {type(trans_err)}\n'
|
||||||
|
f' |_{self}\n'
|
||||||
|
),
|
||||||
|
loglevel=loglevel,
|
||||||
|
) from trans_err
|
||||||
|
|
||||||
|
# XXX definitely can happen if transport is closed
|
||||||
|
# manually by another `trio.lowlevel.Task` in the
|
||||||
|
# same actor; we use this in some simulated fault
|
||||||
|
# testing for ex, but generally should never happen
|
||||||
|
# under normal operation!
|
||||||
|
#
|
||||||
|
# NOTE: as such we always re-raise this error from the
|
||||||
|
# RPC msg loop!
|
||||||
|
except trio.ClosedResourceError as closure_err:
|
||||||
|
raise TransportClosed(
|
||||||
|
message=(
|
||||||
|
f'IPC transport already manually closed locally?\n'
|
||||||
|
f'x)> {type(closure_err)} \n'
|
||||||
|
f' |_{self}\n'
|
||||||
|
),
|
||||||
|
loglevel='error',
|
||||||
|
raise_on_report=(
|
||||||
|
closure_err.args[0] == 'another task closed this fd'
|
||||||
|
or
|
||||||
|
closure_err.args[0] in ['another task closed this fd']
|
||||||
|
),
|
||||||
|
) from closure_err
|
||||||
|
|
||||||
|
# graceful TCP EOF disconnect
|
||||||
if header == b'':
|
if header == b'':
|
||||||
raise TransportClosed(
|
raise TransportClosed(
|
||||||
f'transport {self} was already closed prior ro read'
|
message=(
|
||||||
|
f'IPC transport already gracefully closed\n'
|
||||||
|
f')>\n'
|
||||||
|
f'|_{self}\n'
|
||||||
|
),
|
||||||
|
loglevel='transport',
|
||||||
|
# cause=??? # handy or no?
|
||||||
)
|
)
|
||||||
|
|
||||||
|
size: int
|
||||||
size, = struct.unpack("<I", header)
|
size, = struct.unpack("<I", header)
|
||||||
|
|
||||||
log.transport(f'received header {size}') # type: ignore
|
log.transport(f'received header {size}') # type: ignore
|
||||||
|
@ -225,33 +308,20 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# the current `MsgCodec`.
|
# the current `MsgCodec`.
|
||||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||||
|
|
||||||
# TODO: mask out before release?
|
# XXX for ctxvar debug only!
|
||||||
if self._codec.pld_spec != codec.pld_spec:
|
# if self._codec.pld_spec != codec.pld_spec:
|
||||||
# assert (
|
# assert (
|
||||||
# task := trio.lowlevel.current_task()
|
# task := trio.lowlevel.current_task()
|
||||||
# ) is not self._task
|
# ) is not self._task
|
||||||
# self._task = task
|
# self._task = task
|
||||||
self._codec = codec
|
# self._codec = codec
|
||||||
log.runtime(
|
# log.runtime(
|
||||||
f'Using new codec in {self}.recv()\n'
|
# f'Using new codec in {self}.recv()\n'
|
||||||
f'codec: {self._codec}\n\n'
|
# f'codec: {self._codec}\n\n'
|
||||||
f'msg_bytes: {msg_bytes}\n'
|
# f'msg_bytes: {msg_bytes}\n'
|
||||||
)
|
# )
|
||||||
yield codec.decode(msg_bytes)
|
yield codec.decode(msg_bytes)
|
||||||
|
|
||||||
# TODO: remove, was only for orig draft impl
|
|
||||||
# testing.
|
|
||||||
#
|
|
||||||
# curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
|
|
||||||
# obj = curr_codec.decode(msg_bytes)
|
|
||||||
# if (
|
|
||||||
# curr_codec is not
|
|
||||||
# _codec._def_msgspec_codec
|
|
||||||
# ):
|
|
||||||
# print(f'OBJ: {obj}\n')
|
|
||||||
#
|
|
||||||
# yield obj
|
|
||||||
|
|
||||||
# XXX NOTE: since the below error derives from
|
# XXX NOTE: since the below error derives from
|
||||||
# `DecodeError` we need to catch is specially
|
# `DecodeError` we need to catch is specially
|
||||||
# and always raise such that spec violations
|
# and always raise such that spec violations
|
||||||
|
@ -295,7 +365,8 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
msg: msgtypes.MsgType,
|
msg: msgtypes.MsgType,
|
||||||
|
|
||||||
strict_types: bool = True,
|
strict_types: bool = True,
|
||||||
# hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Send a msgpack encoded py-object-blob-as-msg over TCP.
|
Send a msgpack encoded py-object-blob-as-msg over TCP.
|
||||||
|
@ -304,21 +375,24 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
invalid msg type
|
invalid msg type
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# __tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
|
# XXX see `trio._sync.AsyncContextManagerMixin` for details
|
||||||
|
# on the `.acquire()`/`.release()` sequencing..
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
|
|
||||||
# NOTE: lookup the `trio.Task.context`'s var for
|
# NOTE: lookup the `trio.Task.context`'s var for
|
||||||
# the current `MsgCodec`.
|
# the current `MsgCodec`.
|
||||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||||
|
|
||||||
# TODO: mask out before release?
|
# XXX for ctxvar debug only!
|
||||||
if self._codec.pld_spec != codec.pld_spec:
|
# if self._codec.pld_spec != codec.pld_spec:
|
||||||
self._codec = codec
|
# self._codec = codec
|
||||||
log.runtime(
|
# log.runtime(
|
||||||
f'Using new codec in {self}.send()\n'
|
# f'Using new codec in {self}.send()\n'
|
||||||
f'codec: {self._codec}\n\n'
|
# f'codec: {self._codec}\n\n'
|
||||||
f'msg: {msg}\n'
|
# f'msg: {msg}\n'
|
||||||
)
|
# )
|
||||||
|
|
||||||
if type(msg) not in msgtypes.__msg_types__:
|
if type(msg) not in msgtypes.__msg_types__:
|
||||||
if strict_types:
|
if strict_types:
|
||||||
|
@ -352,6 +426,16 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
size: bytes = struct.pack("<I", len(bytes_data))
|
size: bytes = struct.pack("<I", len(bytes_data))
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
|
|
||||||
|
# ?TODO? does it help ever to dynamically show this
|
||||||
|
# frame?
|
||||||
|
# try:
|
||||||
|
# <the-above_code>
|
||||||
|
# except BaseException as _err:
|
||||||
|
# err = _err
|
||||||
|
# if not isinstance(err, MsgTypeError):
|
||||||
|
# __tracebackhide__: bool = False
|
||||||
|
# raise
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def laddr(self) -> tuple[str, int]:
|
def laddr(self) -> tuple[str, int]:
|
||||||
return self._laddr
|
return self._laddr
|
||||||
|
@ -361,7 +445,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
return self._raddr
|
return self._raddr
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
return await self._agen.asend(None)
|
return await self._aiter_pkts.asend(None)
|
||||||
|
|
||||||
async def drain(self) -> AsyncIterator[dict]:
|
async def drain(self) -> AsyncIterator[dict]:
|
||||||
'''
|
'''
|
||||||
|
@ -378,7 +462,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
yield msg
|
yield msg
|
||||||
|
|
||||||
def __aiter__(self):
|
def __aiter__(self):
|
||||||
return self._agen
|
return self._aiter_pkts
|
||||||
|
|
||||||
def connected(self) -> bool:
|
def connected(self) -> bool:
|
||||||
return self.stream.socket.fileno() != -1
|
return self.stream.socket.fileno() != -1
|
||||||
|
@ -433,7 +517,7 @@ class Channel:
|
||||||
# set after handshake - always uid of far end
|
# set after handshake - always uid of far end
|
||||||
self.uid: tuple[str, str]|None = None
|
self.uid: tuple[str, str]|None = None
|
||||||
|
|
||||||
self._agen = self._aiter_recv()
|
self._aiter_msgs = self._iter_msgs()
|
||||||
self._exc: Exception|None = None # set if far end actor errors
|
self._exc: Exception|None = None # set if far end actor errors
|
||||||
self._closed: bool = False
|
self._closed: bool = False
|
||||||
|
|
||||||
|
@ -497,8 +581,6 @@ class Channel:
|
||||||
)
|
)
|
||||||
return self._transport
|
return self._transport
|
||||||
|
|
||||||
# TODO: something simliar at the IPC-`Context`
|
|
||||||
# level so as to support
|
|
||||||
@cm
|
@cm
|
||||||
def apply_codec(
|
def apply_codec(
|
||||||
self,
|
self,
|
||||||
|
@ -517,6 +599,7 @@ class Channel:
|
||||||
finally:
|
finally:
|
||||||
self._transport.codec = orig
|
self._transport.codec = orig
|
||||||
|
|
||||||
|
# TODO: do a .src/.dst: str for maddrs?
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
if not self._transport:
|
if not self._transport:
|
||||||
return '<Channel with inactive transport?>'
|
return '<Channel with inactive transport?>'
|
||||||
|
@ -560,27 +643,43 @@ class Channel:
|
||||||
)
|
)
|
||||||
return transport
|
return transport
|
||||||
|
|
||||||
|
# TODO: something like,
|
||||||
|
# `pdbp.hideframe_on(errors=[MsgTypeError])`
|
||||||
|
# instead of the `try/except` hack we have rn..
|
||||||
|
# seems like a pretty useful thing to have in general
|
||||||
|
# along with being able to filter certain stack frame(s / sets)
|
||||||
|
# possibly based on the current log-level?
|
||||||
async def send(
|
async def send(
|
||||||
self,
|
self,
|
||||||
payload: Any,
|
payload: Any,
|
||||||
|
|
||||||
# hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Send a coded msg-blob over the transport.
|
Send a coded msg-blob over the transport.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# __tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
log.transport(
|
try:
|
||||||
'=> send IPC msg:\n\n'
|
log.transport(
|
||||||
f'{pformat(payload)}\n'
|
'=> send IPC msg:\n\n'
|
||||||
) # type: ignore
|
f'{pformat(payload)}\n'
|
||||||
assert self._transport
|
)
|
||||||
await self._transport.send(
|
# assert self._transport # but why typing?
|
||||||
payload,
|
await self._transport.send(
|
||||||
# hide_tb=hide_tb,
|
payload,
|
||||||
)
|
hide_tb=hide_tb,
|
||||||
|
)
|
||||||
|
except BaseException as _err:
|
||||||
|
err = _err # bind for introspection
|
||||||
|
if not isinstance(_err, MsgTypeError):
|
||||||
|
# assert err
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
else:
|
||||||
|
assert err.cid
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
assert self._transport
|
assert self._transport
|
||||||
|
@ -617,8 +716,11 @@ class Channel:
|
||||||
await self.aclose(*args)
|
await self.aclose(*args)
|
||||||
|
|
||||||
def __aiter__(self):
|
def __aiter__(self):
|
||||||
return self._agen
|
return self._aiter_msgs
|
||||||
|
|
||||||
|
# ?TODO? run any reconnection sequence?
|
||||||
|
# -[ ] prolly should be impl-ed as deco-API?
|
||||||
|
#
|
||||||
# async def _reconnect(self) -> None:
|
# async def _reconnect(self) -> None:
|
||||||
# """Handle connection failures by polling until a reconnect can be
|
# """Handle connection failures by polling until a reconnect can be
|
||||||
# established.
|
# established.
|
||||||
|
@ -636,7 +738,6 @@ class Channel:
|
||||||
# else:
|
# else:
|
||||||
# log.transport("Stream connection re-established!")
|
# log.transport("Stream connection re-established!")
|
||||||
|
|
||||||
# # TODO: run any reconnection sequence
|
|
||||||
# # on_recon = self._recon_seq
|
# # on_recon = self._recon_seq
|
||||||
# # if on_recon:
|
# # if on_recon:
|
||||||
# # await on_recon(self)
|
# # await on_recon(self)
|
||||||
|
@ -650,11 +751,17 @@ class Channel:
|
||||||
# " for re-establishment")
|
# " for re-establishment")
|
||||||
# await trio.sleep(1)
|
# await trio.sleep(1)
|
||||||
|
|
||||||
async def _aiter_recv(
|
async def _iter_msgs(
|
||||||
self
|
self
|
||||||
) -> AsyncGenerator[Any, None]:
|
) -> AsyncGenerator[Any, None]:
|
||||||
'''
|
'''
|
||||||
Async iterate items from underlying stream.
|
Yield `MsgType` IPC msgs decoded and deliverd from
|
||||||
|
an underlying `MsgTransport` protocol.
|
||||||
|
|
||||||
|
This is a streaming routine alo implemented as an async-gen
|
||||||
|
func (same a `MsgTransport._iter_pkts()`) gets allocated by
|
||||||
|
a `.__call__()` inside `.__init__()` where it is assigned to
|
||||||
|
the `._aiter_msgs` attr.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
assert self._transport
|
assert self._transport
|
||||||
|
@ -680,15 +787,6 @@ class Channel:
|
||||||
case _:
|
case _:
|
||||||
yield msg
|
yield msg
|
||||||
|
|
||||||
# TODO: if we were gonna do this it should be
|
|
||||||
# done up at the `MsgStream` layer!
|
|
||||||
#
|
|
||||||
# sent = yield item
|
|
||||||
# if sent is not None:
|
|
||||||
# # optimization, passing None through all the
|
|
||||||
# # time is pointless
|
|
||||||
# await self._transport.send(sent)
|
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
|
|
||||||
# if not self._autorecon:
|
# if not self._autorecon:
|
||||||
|
|
|
@ -68,7 +68,7 @@ from .msg import (
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
PayloadT,
|
PayloadT,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
pretty_struct,
|
# pretty_struct,
|
||||||
_ops as msgops,
|
_ops as msgops,
|
||||||
)
|
)
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
|
@ -89,6 +89,16 @@ if TYPE_CHECKING:
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
|
# ?TODO? move to a `tractor.lowlevel._rpc` with the below
|
||||||
|
# func-type-cases implemented "on top of" `@context` defs:
|
||||||
|
# -[ ] std async func helper decorated with `@rpc_func`?
|
||||||
|
# -[ ] `Portal.open_stream_from()` with async-gens?
|
||||||
|
# |_ possibly a duplex form of this with a
|
||||||
|
# `sent_from_peer = yield send_to_peer` form, which would require
|
||||||
|
# syncing the send/recv side with possibly `.receive_nowait()`
|
||||||
|
# on each `yield`?
|
||||||
|
# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with
|
||||||
|
# user only defining a single-`yield` generator-func?
|
||||||
async def _invoke_non_context(
|
async def _invoke_non_context(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
cancel_scope: CancelScope,
|
cancel_scope: CancelScope,
|
||||||
|
@ -108,8 +118,9 @@ async def _invoke_non_context(
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
|
cs: CancelScope|None = None # ref when activated
|
||||||
|
|
||||||
# TODO: can we unify this with the `context=True` impl below?
|
# ?TODO? can we unify this with the `context=True` impl below?
|
||||||
if inspect.isasyncgen(coro):
|
if inspect.isasyncgen(coro):
|
||||||
await chan.send(
|
await chan.send(
|
||||||
StartAck(
|
StartAck(
|
||||||
|
@ -160,10 +171,6 @@ async def _invoke_non_context(
|
||||||
functype='asyncgen',
|
functype='asyncgen',
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# XXX: the async-func may spawn further tasks which push
|
|
||||||
# back values like an async-generator would but must
|
|
||||||
# manualy construct the response dict-packet-responses as
|
|
||||||
# above
|
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
ctx._scope = cs
|
ctx._scope = cs
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
@ -175,15 +182,13 @@ async def _invoke_non_context(
|
||||||
await chan.send(
|
await chan.send(
|
||||||
Stop(cid=cid)
|
Stop(cid=cid)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# simplest function/method request-response pattern
|
||||||
|
# XXX: in the most minimally used case, just a scheduled internal runtime
|
||||||
|
# call to `Actor._cancel_task()` from the ctx-peer task since we
|
||||||
|
# don't (yet) have a dedicated IPC msg.
|
||||||
|
# ------ - ------
|
||||||
else:
|
else:
|
||||||
# regular async function/method
|
|
||||||
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
|
||||||
# from a remote request to cancel some `Context`.
|
|
||||||
# ------ - ------
|
|
||||||
# TODO: ideally we unify this with the above `context=True`
|
|
||||||
# block such that for any remote invocation ftype, we
|
|
||||||
# always invoke the far end RPC task scheduling the same
|
|
||||||
# way: using the linked IPC context machinery.
|
|
||||||
failed_resp: bool = False
|
failed_resp: bool = False
|
||||||
try:
|
try:
|
||||||
ack = StartAck(
|
ack = StartAck(
|
||||||
|
@ -354,8 +359,15 @@ async def _errors_relayed_via_ipc(
|
||||||
# channel.
|
# channel.
|
||||||
task_status.started(err)
|
task_status.started(err)
|
||||||
|
|
||||||
# always reraise KBIs so they propagate at the sys-process level.
|
# always propagate KBIs at the sys-process level.
|
||||||
if isinstance(err, KeyboardInterrupt):
|
if (
|
||||||
|
isinstance(err, KeyboardInterrupt)
|
||||||
|
|
||||||
|
# ?TODO? except when running in asyncio mode?
|
||||||
|
# |_ wut if you want to open a `@context` FROM an
|
||||||
|
# infected_aio task?
|
||||||
|
# and not actor.is_infected_aio()
|
||||||
|
):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# RPC task bookeeping.
|
# RPC task bookeeping.
|
||||||
|
@ -458,7 +470,6 @@ async def _invoke(
|
||||||
# tb: TracebackType = None
|
# tb: TracebackType = None
|
||||||
|
|
||||||
cancel_scope = CancelScope()
|
cancel_scope = CancelScope()
|
||||||
cs: CancelScope|None = None # ref when activated
|
|
||||||
ctx = actor.get_context(
|
ctx = actor.get_context(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -607,6 +618,8 @@ async def _invoke(
|
||||||
# `@context` marked RPC function.
|
# `@context` marked RPC function.
|
||||||
# - `._portal` is never set.
|
# - `._portal` is never set.
|
||||||
try:
|
try:
|
||||||
|
tn: trio.Nursery
|
||||||
|
rpc_ctx_cs: CancelScope
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
msgops.maybe_limit_plds(
|
msgops.maybe_limit_plds(
|
||||||
|
@ -616,7 +629,7 @@ async def _invoke(
|
||||||
),
|
),
|
||||||
):
|
):
|
||||||
ctx._scope_nursery = tn
|
ctx._scope_nursery = tn
|
||||||
ctx._scope = tn.cancel_scope
|
rpc_ctx_cs = ctx._scope = tn.cancel_scope
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
|
||||||
# TODO: better `trionics` tooling:
|
# TODO: better `trionics` tooling:
|
||||||
|
@ -642,7 +655,7 @@ async def _invoke(
|
||||||
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
||||||
# which cancels the scope presuming the input error
|
# which cancels the scope presuming the input error
|
||||||
# is not a `.cancel_acked` pleaser.
|
# is not a `.cancel_acked` pleaser.
|
||||||
if ctx._scope.cancelled_caught:
|
if rpc_ctx_cs.cancelled_caught:
|
||||||
our_uid: tuple = actor.uid
|
our_uid: tuple = actor.uid
|
||||||
|
|
||||||
# first check for and raise any remote error
|
# first check for and raise any remote error
|
||||||
|
@ -652,9 +665,7 @@ async def _invoke(
|
||||||
if re := ctx._remote_error:
|
if re := ctx._remote_error:
|
||||||
ctx._maybe_raise_remote_err(re)
|
ctx._maybe_raise_remote_err(re)
|
||||||
|
|
||||||
cs: CancelScope = ctx._scope
|
if rpc_ctx_cs.cancel_called:
|
||||||
|
|
||||||
if cs.cancel_called:
|
|
||||||
canceller: tuple = ctx.canceller
|
canceller: tuple = ctx.canceller
|
||||||
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
||||||
|
|
||||||
|
@ -680,9 +691,15 @@ async def _invoke(
|
||||||
elif canceller == ctx.chan.uid:
|
elif canceller == ctx.chan.uid:
|
||||||
explain += f'its {ctx.peer_side!r}-side peer'
|
explain += f'its {ctx.peer_side!r}-side peer'
|
||||||
|
|
||||||
else:
|
elif canceller == our_uid:
|
||||||
|
explain += 'itself'
|
||||||
|
|
||||||
|
elif canceller:
|
||||||
explain += 'a remote peer'
|
explain += 'a remote peer'
|
||||||
|
|
||||||
|
else:
|
||||||
|
explain += 'an unknown cause?'
|
||||||
|
|
||||||
explain += (
|
explain += (
|
||||||
add_div(message=explain)
|
add_div(message=explain)
|
||||||
+
|
+
|
||||||
|
@ -911,7 +928,10 @@ async def process_messages(
|
||||||
f'IPC msg from peer\n'
|
f'IPC msg from peer\n'
|
||||||
f'<= {chan.uid}\n\n'
|
f'<= {chan.uid}\n\n'
|
||||||
|
|
||||||
# TODO: avoid fmting depending on loglevel for perf?
|
# TODO: use of the pprinting of structs is
|
||||||
|
# FRAGILE and should prolly not be
|
||||||
|
#
|
||||||
|
# avoid fmting depending on loglevel for perf?
|
||||||
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
|
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
|
||||||
# - how to only log-level-aware actually call this?
|
# - how to only log-level-aware actually call this?
|
||||||
# -[ ] use `.msg.pretty_struct` here now instead!
|
# -[ ] use `.msg.pretty_struct` here now instead!
|
||||||
|
@ -1177,7 +1197,7 @@ async def process_messages(
|
||||||
parent_chan=chan,
|
parent_chan=chan,
|
||||||
)
|
)
|
||||||
|
|
||||||
except TransportClosed:
|
except TransportClosed as tc:
|
||||||
# channels "breaking" (for TCP streams by EOF or 104
|
# channels "breaking" (for TCP streams by EOF or 104
|
||||||
# connection-reset) is ok since we don't have a teardown
|
# connection-reset) is ok since we don't have a teardown
|
||||||
# handshake for them (yet) and instead we simply bail out of
|
# handshake for them (yet) and instead we simply bail out of
|
||||||
|
@ -1185,12 +1205,20 @@ async def process_messages(
|
||||||
# up..
|
# up..
|
||||||
#
|
#
|
||||||
# TODO: maybe add a teardown handshake? and,
|
# TODO: maybe add a teardown handshake? and,
|
||||||
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
|
# -[x] don't show this msg if it's an ephemeral discovery ep call?
|
||||||
|
# |_ see the below `.report_n_maybe_raise()` impl as well as
|
||||||
|
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
|
||||||
|
# for different read-failure cases.
|
||||||
# -[ ] figure out how this will break with other transports?
|
# -[ ] figure out how this will break with other transports?
|
||||||
log.runtime(
|
tc.report_n_maybe_raise(
|
||||||
f'IPC channel closed abruptly\n'
|
message=(
|
||||||
f'<=x peer: {chan.uid}\n'
|
f'peer IPC channel closed abruptly?\n\n'
|
||||||
f' |_{chan.raddr}\n'
|
f'<=x {chan}\n'
|
||||||
|
f' |_{chan.raddr}\n\n'
|
||||||
|
)
|
||||||
|
+
|
||||||
|
tc.message
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# transport **WAS** disconnected
|
# transport **WAS** disconnected
|
||||||
|
@ -1238,7 +1266,7 @@ async def process_messages(
|
||||||
'Exiting IPC msg loop with final msg\n\n'
|
'Exiting IPC msg loop with final msg\n\n'
|
||||||
f'<= peer: {chan.uid}\n'
|
f'<= peer: {chan.uid}\n'
|
||||||
f' |_{chan}\n\n'
|
f' |_{chan}\n\n'
|
||||||
f'{pretty_struct.pformat(msg)}'
|
# f'{pretty_struct.pformat(msg)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(message)
|
log.runtime(message)
|
||||||
|
|
|
@ -54,11 +54,12 @@ LOG_FORMAT = (
|
||||||
DATE_FORMAT = '%b %d %H:%M:%S'
|
DATE_FORMAT = '%b %d %H:%M:%S'
|
||||||
|
|
||||||
# FYI, ERROR is 40
|
# FYI, ERROR is 40
|
||||||
|
# TODO: use a `bidict` to avoid the :155 check?
|
||||||
CUSTOM_LEVELS: dict[str, int] = {
|
CUSTOM_LEVELS: dict[str, int] = {
|
||||||
'TRANSPORT': 5,
|
'TRANSPORT': 5,
|
||||||
'RUNTIME': 15,
|
'RUNTIME': 15,
|
||||||
'DEVX': 17,
|
'DEVX': 17,
|
||||||
'CANCEL': 18,
|
'CANCEL': 22,
|
||||||
'PDB': 500,
|
'PDB': 500,
|
||||||
}
|
}
|
||||||
STD_PALETTE = {
|
STD_PALETTE = {
|
||||||
|
@ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter):
|
||||||
Delegate a log call to the underlying logger, after adding
|
Delegate a log call to the underlying logger, after adding
|
||||||
contextual information from this adapter instance.
|
contextual information from this adapter instance.
|
||||||
|
|
||||||
|
NOTE: all custom level methods (above) delegate to this!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if self.isEnabledFor(level):
|
if self.isEnabledFor(level):
|
||||||
stacklevel: int = 3
|
stacklevel: int = 3
|
||||||
|
|
|
@ -34,6 +34,9 @@ from pprint import (
|
||||||
saferepr,
|
saferepr,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from tractor.log import get_logger
|
||||||
|
|
||||||
|
log = get_logger()
|
||||||
# TODO: auto-gen type sig for input func both for
|
# TODO: auto-gen type sig for input func both for
|
||||||
# type-msgs and logging of RPC tasks?
|
# type-msgs and logging of RPC tasks?
|
||||||
# taken and modified from:
|
# taken and modified from:
|
||||||
|
@ -143,7 +146,13 @@ def pformat(
|
||||||
|
|
||||||
else: # the `pprint` recursion-safe format:
|
else: # the `pprint` recursion-safe format:
|
||||||
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
||||||
val_str: str = saferepr(v)
|
try:
|
||||||
|
val_str: str = saferepr(v)
|
||||||
|
except Exception:
|
||||||
|
log.exception(
|
||||||
|
'Failed to `saferepr({type(struct)})` !?\n'
|
||||||
|
)
|
||||||
|
return _Struct.__repr__(struct)
|
||||||
|
|
||||||
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
||||||
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
||||||
|
@ -194,12 +203,20 @@ class Struct(
|
||||||
return sin_props
|
return sin_props
|
||||||
|
|
||||||
pformat = pformat
|
pformat = pformat
|
||||||
|
# __repr__ = pformat
|
||||||
# __str__ = __repr__ = pformat
|
# __str__ = __repr__ = pformat
|
||||||
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
||||||
# inside a known tty?
|
# inside a known tty?
|
||||||
# def __repr__(self) -> str:
|
# def __repr__(self) -> str:
|
||||||
# ...
|
# ...
|
||||||
__repr__ = pformat
|
def __repr__(self) -> str:
|
||||||
|
try:
|
||||||
|
return pformat(self)
|
||||||
|
except Exception:
|
||||||
|
log.exception(
|
||||||
|
f'Failed to `pformat({type(self)})` !?\n'
|
||||||
|
)
|
||||||
|
return _Struct.__repr__(self)
|
||||||
|
|
||||||
def copy(
|
def copy(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -156,11 +156,12 @@ class BroadcastState(Struct):
|
||||||
|
|
||||||
class BroadcastReceiver(ReceiveChannel):
|
class BroadcastReceiver(ReceiveChannel):
|
||||||
'''
|
'''
|
||||||
A memory receive channel broadcaster which is non-lossy for the
|
A memory receive channel broadcaster which is non-lossy for
|
||||||
fastest consumer.
|
the fastest consumer.
|
||||||
|
|
||||||
Additional consumer tasks can receive all produced values by registering
|
Additional consumer tasks can receive all produced values by
|
||||||
with ``.subscribe()`` and receiving from the new instance it delivers.
|
registering with ``.subscribe()`` and receiving from the new
|
||||||
|
instance it delivers.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
|
@ -18,8 +18,12 @@
|
||||||
Async context manager primitives with hard ``trio``-aware semantics
|
Async context manager primitives with hard ``trio``-aware semantics
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from __future__ import annotations
|
||||||
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
)
|
||||||
import inspect
|
import inspect
|
||||||
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncContextManager,
|
AsyncContextManager,
|
||||||
|
@ -30,13 +34,16 @@ from typing import (
|
||||||
Optional,
|
Optional,
|
||||||
Sequence,
|
Sequence,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from tractor._state import current_actor
|
from tractor._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tractor import ActorNursery
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -46,8 +53,10 @@ T = TypeVar("T")
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_nursery(
|
async def maybe_open_nursery(
|
||||||
nursery: trio.Nursery | None = None,
|
nursery: trio.Nursery|ActorNursery|None = None,
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
|
lib: ModuleType = trio,
|
||||||
|
|
||||||
) -> AsyncGenerator[trio.Nursery, Any]:
|
) -> AsyncGenerator[trio.Nursery, Any]:
|
||||||
'''
|
'''
|
||||||
Create a new nursery if None provided.
|
Create a new nursery if None provided.
|
||||||
|
@ -58,13 +67,12 @@ async def maybe_open_nursery(
|
||||||
if nursery is not None:
|
if nursery is not None:
|
||||||
yield nursery
|
yield nursery
|
||||||
else:
|
else:
|
||||||
async with trio.open_nursery() as nursery:
|
async with lib.open_nursery() as nursery:
|
||||||
nursery.cancel_scope.shield = shield
|
nursery.cancel_scope.shield = shield
|
||||||
yield nursery
|
yield nursery
|
||||||
|
|
||||||
|
|
||||||
async def _enter_and_wait(
|
async def _enter_and_wait(
|
||||||
|
|
||||||
mngr: AsyncContextManager[T],
|
mngr: AsyncContextManager[T],
|
||||||
unwrapped: dict[int, T],
|
unwrapped: dict[int, T],
|
||||||
all_entered: trio.Event,
|
all_entered: trio.Event,
|
||||||
|
@ -91,7 +99,6 @@ async def _enter_and_wait(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def gather_contexts(
|
async def gather_contexts(
|
||||||
|
|
||||||
mngrs: Sequence[AsyncContextManager[T]],
|
mngrs: Sequence[AsyncContextManager[T]],
|
||||||
|
|
||||||
) -> AsyncGenerator[
|
) -> AsyncGenerator[
|
||||||
|
@ -102,15 +109,17 @@ async def gather_contexts(
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Concurrently enter a sequence of async context managers, each in
|
Concurrently enter a sequence of async context managers (acms),
|
||||||
a separate ``trio`` task and deliver the unwrapped values in the
|
each from a separate `trio` task and deliver the unwrapped
|
||||||
same order once all managers have entered. On exit all contexts are
|
`yield`-ed values in the same order once all managers have entered.
|
||||||
subsequently and concurrently exited.
|
|
||||||
|
|
||||||
This function is somewhat similar to common usage of
|
On exit, all acms are subsequently and concurrently exited.
|
||||||
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
|
|
||||||
combo with ``asyncio.gather()`` except the managers are concurrently
|
This function is somewhat similar to a batch of non-blocking
|
||||||
entered and exited, and cancellation just works.
|
calls to `contextlib.AsyncExitStack.enter_async_context()`
|
||||||
|
(inside a loop) *in combo with* a `asyncio.gather()` to get the
|
||||||
|
`.__aenter__()`-ed values, except the managers are both
|
||||||
|
concurrently entered and exited and *cancellation just works*(R).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
seed: int = id(mngrs)
|
seed: int = id(mngrs)
|
||||||
|
@ -210,9 +219,10 @@ async def maybe_open_context(
|
||||||
|
|
||||||
) -> AsyncIterator[tuple[bool, T]]:
|
) -> AsyncIterator[tuple[bool, T]]:
|
||||||
'''
|
'''
|
||||||
Maybe open a context manager if there is not already a _Cached
|
Maybe open an async-context-manager (acm) if there is not already
|
||||||
version for the provided ``key`` for *this* actor. Return the
|
a `_Cached` version for the provided (input) `key` for *this* actor.
|
||||||
_Cached instance on a _Cache hit.
|
|
||||||
|
Return the `_Cached` instance on a _Cache hit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
fid = id(acm_func)
|
fid = id(acm_func)
|
||||||
|
@ -273,8 +283,13 @@ async def maybe_open_context(
|
||||||
else:
|
else:
|
||||||
_Cache.users += 1
|
_Cache.users += 1
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Reusing resource for `_Cache` user {_Cache.users}\n\n'
|
f'Re-using cached resource for user {_Cache.users}\n\n'
|
||||||
f'{ctx_key!r} -> {yielded!r}\n'
|
f'{ctx_key!r} -> {type(yielded)}\n'
|
||||||
|
|
||||||
|
# TODO: make this work with values but without
|
||||||
|
# `msgspec.Struct` causing frickin crashes on field-type
|
||||||
|
# lookups..
|
||||||
|
# f'{ctx_key!r} -> {yielded!r}\n'
|
||||||
)
|
)
|
||||||
lock.release()
|
lock.release()
|
||||||
yield True, yielded
|
yield True, yielded
|
||||||
|
|
Loading…
Reference in New Issue