Compare commits

...

7 Commits

Author SHA1 Message Date
Tyler Goodlet af3745684c More formal `TransportClosed` reporting/raising
Since it was all ad-hoc defined inside
`._ipc.MsgpackTCPStream._iter_pkts()` more or less, this starts
formalizing a way for particular transport backends to indicate whether
a disconnect condition should be re-raised in the RPC msg loop and if
not what log level to report it at (if any).

Based on our lone transport currently we try to suppress any logging
noise from ephemeral connections expected during normal actor
interaction and discovery subsys ops:
- any short lived discovery related TCP connects are only logged as
  `.transport()` level.
- both `.error()` and raise on any underlying `trio.ClosedResource`
  cause since that normally means some task touched transport layer
  internals that it shouldn't have.
- do a `.warning()` on anything else unexpected.

Impl deats:
- extend the `._exceptions.TransportClosed` to accept an input log
  level, raise-on-report toggle and custom reporting & raising via a new
  `.report_n_maybe_raise()` method.
- construct the TCs with inputs per case in (the newly named) `._iter_pkts().
- call ^ this method from the `TransportClosed` handler block inside the
  RPC msg loop thus delegating reporting levels and/or raising to the
  backend's per-case TC instantiating.

Related `._ipc` changes:
- mask out all the `MsgpackTCPStream._codec` debug helper stuff and drop
  any lingering cruft from the initial proto-ing of msg-codecs.
- rename some attrs/methods:
  |_`MsgpackTCPStream._iter_packets()` -> `._iter_pkts()` and
    `._agen` -> `_aiter_pkts`.
  |_`Channel._aiter_recv()` -> `._aiter_msgs()` and
    `._agen` -> `_aiter_msgs`.
- add `hide_tb: bool` support to `Channel.send()` and only show the
  frame on non-MTEs.
2024-07-02 12:21:26 -04:00
Tyler Goodlet 3907cba68e Refine some `.trionics` docs and logging
- allow passing and report the lib name (`trio` or `tractor`) from
  `maybe_open_nursery()`.
- use `.runtime()` level when reporting `_Cache`-hits in
  `maybe_open_context()`.
- tidy up some doc strings.
2024-06-28 19:28:12 -04:00
Tyler Goodlet e3d59964af Woops, set `.cancel()` level in custom levels table.. 2024-06-28 19:27:13 -04:00
Tyler Goodlet ba83bab776 Todo a test for sync-pausing from non-main-root-tasks 2024-06-28 19:26:35 -04:00
Tyler Goodlet 18d440c207 (Re)type annot some tests
- For the (still not finished) `test_caps_based_msging`, switch to
  using the new `PayloadMsg`.
- add `testdir` fixture type.
2024-06-28 19:24:03 -04:00
Tyler Goodlet edac717613 Use `msgspec.Struct.__repr__()` failover impl
In case the struct doesn't import a field type (which will cause the
`.pformat()` to raise) just report the issue and try to fall back to the
original `repr()` version.
2024-06-28 19:17:05 -04:00
Tyler Goodlet 7e93b81a83 Don't use pretty struct stuff in `._invoke`
It's too fragile to put in side core RPC machinery since
`msgspec.Struct` defs can fail if a field type can't be
looked up at creation time (like can easily happen if you
conditionally import using `if TYPE_CHECKING:`)

Also,
- rename `cs` to `rpc_ctx_cs: CancelScope` since it's literally
  the wrapping RPC `Context._scope`.
- report self cancellation via `explain: str` and add tail case for
  "unknown cause".
- put a ?TODO? around what to do about KBIs if a context is opened
  from an `infected_aio`-actor task.
- similar to our nursery and portal add TODO list for moving all
  `_invoke_non_context()` content out the RPC core and instead implement
  them as `.hilevel` endpoint helpers (maybe as decorators?)which under
  neath define `@context`-funcs.
2024-06-28 19:06:17 -04:00
11 changed files with 400 additions and 182 deletions

View File

@ -11,9 +11,6 @@ from typing import (
Type, Type,
Union, Union,
) )
from contextvars import (
Context,
)
from msgspec import ( from msgspec import (
structs, structs,
@ -27,6 +24,7 @@ import tractor
from tractor import ( from tractor import (
_state, _state,
MsgTypeError, MsgTypeError,
Context,
) )
from tractor.msg import ( from tractor.msg import (
_codec, _codec,
@ -41,7 +39,7 @@ from tractor.msg import (
from tractor.msg.types import ( from tractor.msg.types import (
_payload_msgs, _payload_msgs,
log, log,
Msg, PayloadMsg,
Started, Started,
mk_msg_spec, mk_msg_spec,
) )
@ -61,7 +59,7 @@ def mk_custom_codec(
uid: tuple[str, str] = tractor.current_actor().uid uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type # XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `Msg.pld`, we still need a enc/dec_hook() pair # field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs: # to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
@ -321,12 +319,12 @@ def dec_type_union(
import importlib import importlib
types: list[Type] = [] types: list[Type] = []
for type_name in type_names: for type_name in type_names:
for ns in [ for mod in [
typing, typing,
importlib.import_module(__name__), importlib.import_module(__name__),
]: ]:
if type_ref := getattr( if type_ref := getattr(
ns, mod,
type_name, type_name,
False, False,
): ):
@ -744,7 +742,7 @@ def chk_pld_type(
# 'Error', .pld: ErrorData # 'Error', .pld: ErrorData
codec: MsgCodec = mk_codec( codec: MsgCodec = mk_codec(
# NOTE: this ONLY accepts `Msg.pld` fields of a specified # NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified
# type union. # type union.
ipc_pld_spec=payload_spec, ipc_pld_spec=payload_spec,
) )
@ -752,7 +750,7 @@ def chk_pld_type(
# make a one-off dec to compare with our `MsgCodec` instance # make a one-off dec to compare with our `MsgCodec` instance
# which does the below `mk_msg_spec()` call internally # which does the below `mk_msg_spec()` call internally
ipc_msg_spec: Union[Type[Struct]] ipc_msg_spec: Union[Type[Struct]]
msg_types: list[Msg[payload_spec]] msg_types: list[PayloadMsg[payload_spec]]
( (
ipc_msg_spec, ipc_msg_spec,
msg_types, msg_types,
@ -761,7 +759,7 @@ def chk_pld_type(
) )
_enc = msgpack.Encoder() _enc = msgpack.Encoder()
_dec = msgpack.Decoder( _dec = msgpack.Decoder(
type=ipc_msg_spec or Any, # like `Msg[Any]` type=ipc_msg_spec or Any, # like `PayloadMsg[Any]`
) )
assert ( assert (
@ -806,7 +804,7 @@ def chk_pld_type(
'cid': '666', 'cid': '666',
'pld': pld, 'pld': pld,
} }
enc_msg: Msg = typedef(**kwargs) enc_msg: PayloadMsg = typedef(**kwargs)
_wire_bytes: bytes = _enc.encode(enc_msg) _wire_bytes: bytes = _enc.encode(enc_msg)
wire_bytes: bytes = codec.enc.encode(enc_msg) wire_bytes: bytes = codec.enc.encode(enc_msg)
@ -883,25 +881,16 @@ def test_limit_msgspec():
debug_mode=True debug_mode=True
): ):
# ensure we can round-trip a boxing `Msg` # ensure we can round-trip a boxing `PayloadMsg`
assert chk_pld_type( assert chk_pld_type(
# Msg, payload_spec=Any,
Any, pld=None,
None,
expect_roundtrip=True, expect_roundtrip=True,
) )
# TODO: don't need this any more right since
# `msgspec>=0.15` has the nice generics stuff yah??
#
# manually override the type annot of the payload
# field and ensure it propagates to all msg-subtypes.
# Msg.__annotations__['pld'] = Any
# verify that a mis-typed payload value won't decode # verify that a mis-typed payload value won't decode
assert not chk_pld_type( assert not chk_pld_type(
# Msg, payload_spec=int,
int,
pld='doggy', pld='doggy',
) )
@ -913,18 +902,16 @@ def test_limit_msgspec():
value: Any value: Any
assert not chk_pld_type( assert not chk_pld_type(
# Msg, payload_spec=CustomPayload,
CustomPayload,
pld='doggy', pld='doggy',
) )
assert chk_pld_type( assert chk_pld_type(
# Msg, payload_spec=CustomPayload,
CustomPayload,
pld=CustomPayload(name='doggy', value='urmom') pld=CustomPayload(name='doggy', value='urmom')
) )
# uhh bc we can `.pause_from_sync()` now! :surfer: # yah, we can `.pause_from_sync()` now!
# breakpoint() # breakpoint()
trio.run(main) trio.run(main)

View File

@ -1336,6 +1336,23 @@ def test_shield_pause(
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead
# (in terms of `greenback` and/or extra threads) and if it's from
# a sync scope suggest that usage must first call
# `ensure_portal()` in the (eventual parent) async calling scope?
def test_sync_pause_from_bg_task_in_root_actor_():
'''
When used from the root actor, normally we can only implicitly
support `.pause_from_sync()` from the main-parent-task (that
opens the runtime via `open_root_actor()`) since `greenback`
requires a `.ensure_portal()` call per `trio.Task` where it is
used.
'''
...
# TODO: needs ANSI code stripping tho, see `assert_before()` # above! # TODO: needs ANSI code stripping tho, see `assert_before()` # above!
def test_correct_frames_below_hidden(): def test_correct_frames_below_hidden():
''' '''

View File

@ -19,7 +19,7 @@ from tractor._testing import (
@pytest.fixture @pytest.fixture
def run_example_in_subproc( def run_example_in_subproc(
loglevel: str, loglevel: str,
testdir, testdir: pytest.Testdir,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
): ):

View File

@ -49,6 +49,7 @@ from ._exceptions import (
ModuleNotExposed as ModuleNotExposed, ModuleNotExposed as ModuleNotExposed,
MsgTypeError as MsgTypeError, MsgTypeError as MsgTypeError,
RemoteActorError as RemoteActorError, RemoteActorError as RemoteActorError,
TransportClosed as TransportClosed,
) )
from .devx import ( from .devx import (
breakpoint as breakpoint, breakpoint as breakpoint,

View File

@ -906,8 +906,59 @@ class StreamOverrun(
''' '''
class TransportClosed(trio.ClosedResourceError): class TransportClosed(trio.BrokenResourceError):
"Underlying channel transport was closed prior to use" '''
IPC transport (protocol) connection was closed or broke and
indicates that the wrapping communication `Channel` can no longer
be used to send/receive msgs from the remote peer.
'''
def __init__(
self,
message: str,
loglevel: str = 'transport',
cause: BaseException|None = None,
raise_on_report: bool = False,
) -> None:
self.message: str = message
self._loglevel = loglevel
super().__init__(message)
if cause is not None:
self.__cause__ = cause
# flag to toggle whether the msg loop should raise
# the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report
def report_n_maybe_raise(
self,
message: str|None = None,
) -> None:
'''
Using the init-specified log level emit a logging report
for this error.
'''
message: str = message or self.message
# when a cause is set, slap it onto the log emission.
if cause := self.__cause__:
cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__)
)
message += (
f'{cause_tb_str}\n' # tb
f' {cause}\n' # exc repr
)
getattr(log, self._loglevel)(message)
# some errors we want to blow up from
# inside the RPC msg loop
if self._raise_on_report:
raise self from cause
class NoResult(RuntimeError): class NoResult(RuntimeError):

View File

@ -54,7 +54,7 @@ from tractor._exceptions import (
) )
from tractor.msg import ( from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
_codec, # _codec, XXX see `self._codec` sanity/debug checks
MsgCodec, MsgCodec,
types as msgtypes, types as msgtypes,
pretty_struct, pretty_struct,
@ -65,8 +65,18 @@ log = get_logger(__name__)
_is_windows = platform.system() == 'Windows' _is_windows = platform.system() == 'Windows'
def get_stream_addrs(stream: trio.SocketStream) -> tuple: def get_stream_addrs(
# should both be IP sockets stream: trio.SocketStream
) -> tuple[
tuple[str, int], # local
tuple[str, int], # remote
]:
'''
Return the `trio` streaming transport prot's socket-addrs for
both the local and remote sides as a pair.
'''
# rn, should both be IP sockets
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
return ( return (
@ -75,17 +85,22 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple:
) )
# TODO: this should be our `Union[*msgtypes.__spec__]` now right? # from tractor.msg.types import MsgType
MsgType = TypeVar("MsgType") # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
# => BLEH, except can't bc prots must inherit typevar or param-spec
# TODO: consider using a generic def and indexing with our eventual # vars..
# msg definition/types? MsgType = TypeVar('MsgType')
# - https://docs.python.org/3/library/typing.html#typing.Protocol
# - https://jcristharif.com/msgspec/usage.html#structs
# TODO: break up this mod into a subpkg so we can start adding new
# backends and move this type stuff into a dedicated file.. Bo
#
@runtime_checkable @runtime_checkable
class MsgTransport(Protocol[MsgType]): class MsgTransport(Protocol[MsgType]):
#
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
stream: trio.SocketStream stream: trio.SocketStream
drained: list[MsgType] drained: list[MsgType]
@ -120,9 +135,9 @@ class MsgTransport(Protocol[MsgType]):
... ...
# TODO: not sure why we have to inherit here, but it seems to be an # TODO: typing oddity.. not sure why we have to inherit here, but it
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; # seems to be an issue with `get_msg_transport()` returning
# probably should make a `mypy` issue? # a `Type[Protocol]`; probably should make a `mypy` issue?
class MsgpackTCPStream(MsgTransport): class MsgpackTCPStream(MsgTransport):
''' '''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data A ``trio.SocketStream`` delivering ``msgpack`` formatted data
@ -145,7 +160,7 @@ class MsgpackTCPStream(MsgTransport):
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
# #
# TODO: define this as a `Codec` struct which can be # TODO: define this as a `Codec` struct which can be
# overriden dynamically by the application/runtime. # overriden dynamically by the application/runtime?
codec: tuple[ codec: tuple[
Callable[[Any], Any]|None, # coder Callable[[Any], Any]|None, # coder
Callable[[type, Any], Any]|None, # decoder Callable[[type, Any], Any]|None, # decoder
@ -160,7 +175,7 @@ class MsgpackTCPStream(MsgTransport):
self._laddr, self._raddr = get_stream_addrs(stream) self._laddr, self._raddr = get_stream_addrs(stream)
# create read loop instance # create read loop instance
self._agen = self._iter_packets() self._aiter_pkts = self._iter_packets()
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
# public i guess? # public i guess?
@ -174,15 +189,12 @@ class MsgpackTCPStream(MsgTransport):
# allow for custom IPC msg interchange format # allow for custom IPC msg interchange format
# dynamic override Bo # dynamic override Bo
self._task = trio.lowlevel.current_task() self._task = trio.lowlevel.current_task()
self._codec: MsgCodec = (
codec # XXX for ctxvar debug only!
or # self._codec: MsgCodec = (
_codec._ctxvar_MsgCodec.get() # codec
) # or
# TODO: mask out before release? # _codec._ctxvar_MsgCodec.get()
# log.runtime(
# f'New {self} created with codec\n'
# f'codec: {self._codec}\n'
# ) # )
async def _iter_packets(self) -> AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
@ -190,6 +202,11 @@ class MsgpackTCPStream(MsgTransport):
Yield `bytes`-blob decoded packets from the underlying TCP Yield `bytes`-blob decoded packets from the underlying TCP
stream using the current task's `MsgCodec`. stream using the current task's `MsgCodec`.
This is a streaming routine implemented as an async generator
func (which was the original design, but could be changed?)
and is allocated by a `.__call__()` inside `.__init__()` where
it is assigned to the `._aiter_pkts` attr.
''' '''
decodes_failed: int = 0 decodes_failed: int = 0
@ -204,16 +221,82 @@ class MsgpackTCPStream(MsgTransport):
# seem to be getting racy failures here on # seem to be getting racy failures here on
# arbiter/registry name subs.. # arbiter/registry name subs..
trio.BrokenResourceError, trio.BrokenResourceError,
):
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
) as trans_err:
loglevel = 'transport'
match trans_err:
# case (
# ConnectionResetError()
# ):
# loglevel = 'transport'
# peer actor (graceful??) TCP EOF but `tricycle`
# seems to raise a 0-bytes-read?
case ValueError() if (
'unclean EOF' in trans_err.args[0]
):
pass
# peer actor (task) prolly shutdown quickly due
# to cancellation
case trio.BrokenResourceError() if (
'Connection reset by peer' in trans_err.args[0]
):
pass
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
loglevel: str = 'warning'
raise TransportClosed(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
loglevel=loglevel,
) from trans_err
# XXX definitely can happen if transport is closed
# manually by another `trio.lowlevel.Task` in the
# same actor; we use this in some simulated fault
# testing for ex, but generally should never happen
# under normal operation!
#
# NOTE: as such we always re-raise this error from the
# RPC msg loop!
except trio.ClosedResourceError as closure_err:
raise TransportClosed(
message=(
f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n'
f' |_{self}\n'
),
loglevel='error',
raise_on_report=(
closure_err.args[0] == 'another task closed this fd'
or
closure_err.args[0] in ['another task closed this fd']
),
) from closure_err
# graceful TCP EOF disconnect
if header == b'': if header == b'':
raise TransportClosed( raise TransportClosed(
f'transport {self} was already closed prior ro read' message=(
f'IPC transport already gracefully closed\n'
f')>\n'
f'|_{self}\n'
),
loglevel='transport',
# cause=??? # handy or no?
) )
size: int
size, = struct.unpack("<I", header) size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore log.transport(f'received header {size}') # type: ignore
@ -225,33 +308,20 @@ class MsgpackTCPStream(MsgTransport):
# the current `MsgCodec`. # the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get() codec: MsgCodec = _ctxvar_MsgCodec.get()
# TODO: mask out before release? # XXX for ctxvar debug only!
if self._codec.pld_spec != codec.pld_spec: # if self._codec.pld_spec != codec.pld_spec:
# assert ( # assert (
# task := trio.lowlevel.current_task() # task := trio.lowlevel.current_task()
# ) is not self._task # ) is not self._task
# self._task = task # self._task = task
self._codec = codec # self._codec = codec
log.runtime( # log.runtime(
f'Using new codec in {self}.recv()\n' # f'Using new codec in {self}.recv()\n'
f'codec: {self._codec}\n\n' # f'codec: {self._codec}\n\n'
f'msg_bytes: {msg_bytes}\n' # f'msg_bytes: {msg_bytes}\n'
) # )
yield codec.decode(msg_bytes) yield codec.decode(msg_bytes)
# TODO: remove, was only for orig draft impl
# testing.
#
# curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
# obj = curr_codec.decode(msg_bytes)
# if (
# curr_codec is not
# _codec._def_msgspec_codec
# ):
# print(f'OBJ: {obj}\n')
#
# yield obj
# XXX NOTE: since the below error derives from # XXX NOTE: since the below error derives from
# `DecodeError` we need to catch is specially # `DecodeError` we need to catch is specially
# and always raise such that spec violations # and always raise such that spec violations
@ -295,7 +365,8 @@ class MsgpackTCPStream(MsgTransport):
msg: msgtypes.MsgType, msg: msgtypes.MsgType,
strict_types: bool = True, strict_types: bool = True,
# hide_tb: bool = False, hide_tb: bool = False,
) -> None: ) -> None:
''' '''
Send a msgpack encoded py-object-blob-as-msg over TCP. Send a msgpack encoded py-object-blob-as-msg over TCP.
@ -304,21 +375,24 @@ class MsgpackTCPStream(MsgTransport):
invalid msg type invalid msg type
''' '''
# __tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# XXX see `trio._sync.AsyncContextManagerMixin` for details
# on the `.acquire()`/`.release()` sequencing..
async with self._send_lock: async with self._send_lock:
# NOTE: lookup the `trio.Task.context`'s var for # NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`. # the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get() codec: MsgCodec = _ctxvar_MsgCodec.get()
# TODO: mask out before release? # XXX for ctxvar debug only!
if self._codec.pld_spec != codec.pld_spec: # if self._codec.pld_spec != codec.pld_spec:
self._codec = codec # self._codec = codec
log.runtime( # log.runtime(
f'Using new codec in {self}.send()\n' # f'Using new codec in {self}.send()\n'
f'codec: {self._codec}\n\n' # f'codec: {self._codec}\n\n'
f'msg: {msg}\n' # f'msg: {msg}\n'
) # )
if type(msg) not in msgtypes.__msg_types__: if type(msg) not in msgtypes.__msg_types__:
if strict_types: if strict_types:
@ -352,6 +426,16 @@ class MsgpackTCPStream(MsgTransport):
size: bytes = struct.pack("<I", len(bytes_data)) size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
# ?TODO? does it help ever to dynamically show this
# frame?
# try:
# <the-above_code>
# except BaseException as _err:
# err = _err
# if not isinstance(err, MsgTypeError):
# __tracebackhide__: bool = False
# raise
@property @property
def laddr(self) -> tuple[str, int]: def laddr(self) -> tuple[str, int]:
return self._laddr return self._laddr
@ -361,7 +445,7 @@ class MsgpackTCPStream(MsgTransport):
return self._raddr return self._raddr
async def recv(self) -> Any: async def recv(self) -> Any:
return await self._agen.asend(None) return await self._aiter_pkts.asend(None)
async def drain(self) -> AsyncIterator[dict]: async def drain(self) -> AsyncIterator[dict]:
''' '''
@ -378,7 +462,7 @@ class MsgpackTCPStream(MsgTransport):
yield msg yield msg
def __aiter__(self): def __aiter__(self):
return self._agen return self._aiter_pkts
def connected(self) -> bool: def connected(self) -> bool:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
@ -433,7 +517,7 @@ class Channel:
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None self.uid: tuple[str, str]|None = None
self._agen = self._aiter_recv() self._aiter_msgs = self._iter_msgs()
self._exc: Exception|None = None # set if far end actor errors self._exc: Exception|None = None # set if far end actor errors
self._closed: bool = False self._closed: bool = False
@ -497,8 +581,6 @@ class Channel:
) )
return self._transport return self._transport
# TODO: something simliar at the IPC-`Context`
# level so as to support
@cm @cm
def apply_codec( def apply_codec(
self, self,
@ -517,6 +599,7 @@ class Channel:
finally: finally:
self._transport.codec = orig self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs?
def __repr__(self) -> str: def __repr__(self) -> str:
if not self._transport: if not self._transport:
return '<Channel with inactive transport?>' return '<Channel with inactive transport?>'
@ -560,27 +643,43 @@ class Channel:
) )
return transport return transport
# TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])`
# instead of the `try/except` hack we have rn..
# seems like a pretty useful thing to have in general
# along with being able to filter certain stack frame(s / sets)
# possibly based on the current log-level?
async def send( async def send(
self, self,
payload: Any, payload: Any,
# hide_tb: bool = False, hide_tb: bool = False,
) -> None: ) -> None:
''' '''
Send a coded msg-blob over the transport. Send a coded msg-blob over the transport.
''' '''
# __tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
log.transport( try:
'=> send IPC msg:\n\n' log.transport(
f'{pformat(payload)}\n' '=> send IPC msg:\n\n'
) # type: ignore f'{pformat(payload)}\n'
assert self._transport )
await self._transport.send( # assert self._transport # but why typing?
payload, await self._transport.send(
# hide_tb=hide_tb, payload,
) hide_tb=hide_tb,
)
except BaseException as _err:
err = _err # bind for introspection
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
assert err.cid
raise
async def recv(self) -> Any: async def recv(self) -> Any:
assert self._transport assert self._transport
@ -617,8 +716,11 @@ class Channel:
await self.aclose(*args) await self.aclose(*args)
def __aiter__(self): def __aiter__(self):
return self._agen return self._aiter_msgs
# ?TODO? run any reconnection sequence?
# -[ ] prolly should be impl-ed as deco-API?
#
# async def _reconnect(self) -> None: # async def _reconnect(self) -> None:
# """Handle connection failures by polling until a reconnect can be # """Handle connection failures by polling until a reconnect can be
# established. # established.
@ -636,7 +738,6 @@ class Channel:
# else: # else:
# log.transport("Stream connection re-established!") # log.transport("Stream connection re-established!")
# # TODO: run any reconnection sequence
# # on_recon = self._recon_seq # # on_recon = self._recon_seq
# # if on_recon: # # if on_recon:
# # await on_recon(self) # # await on_recon(self)
@ -650,11 +751,17 @@ class Channel:
# " for re-establishment") # " for re-establishment")
# await trio.sleep(1) # await trio.sleep(1)
async def _aiter_recv( async def _iter_msgs(
self self
) -> AsyncGenerator[Any, None]: ) -> AsyncGenerator[Any, None]:
''' '''
Async iterate items from underlying stream. Yield `MsgType` IPC msgs decoded and deliverd from
an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an async-gen
func (same a `MsgTransport._iter_pkts()`) gets allocated by
a `.__call__()` inside `.__init__()` where it is assigned to
the `._aiter_msgs` attr.
''' '''
assert self._transport assert self._transport
@ -680,15 +787,6 @@ class Channel:
case _: case _:
yield msg yield msg
# TODO: if we were gonna do this it should be
# done up at the `MsgStream` layer!
#
# sent = yield item
# if sent is not None:
# # optimization, passing None through all the
# # time is pointless
# await self._transport.send(sent)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# if not self._autorecon: # if not self._autorecon:

View File

@ -68,7 +68,7 @@ from .msg import (
MsgCodec, MsgCodec,
PayloadT, PayloadT,
NamespacePath, NamespacePath,
pretty_struct, # pretty_struct,
_ops as msgops, _ops as msgops,
) )
from tractor.msg.types import ( from tractor.msg.types import (
@ -89,6 +89,16 @@ if TYPE_CHECKING:
log = get_logger('tractor') log = get_logger('tractor')
# ?TODO? move to a `tractor.lowlevel._rpc` with the below
# func-type-cases implemented "on top of" `@context` defs:
# -[ ] std async func helper decorated with `@rpc_func`?
# -[ ] `Portal.open_stream_from()` with async-gens?
# |_ possibly a duplex form of this with a
# `sent_from_peer = yield send_to_peer` form, which would require
# syncing the send/recv side with possibly `.receive_nowait()`
# on each `yield`?
# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with
# user only defining a single-`yield` generator-func?
async def _invoke_non_context( async def _invoke_non_context(
actor: Actor, actor: Actor,
cancel_scope: CancelScope, cancel_scope: CancelScope,
@ -108,8 +118,9 @@ async def _invoke_non_context(
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
): ):
__tracebackhide__: bool = True __tracebackhide__: bool = True
cs: CancelScope|None = None # ref when activated
# TODO: can we unify this with the `context=True` impl below? # ?TODO? can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
await chan.send( await chan.send(
StartAck( StartAck(
@ -160,10 +171,6 @@ async def _invoke_non_context(
functype='asyncgen', functype='asyncgen',
) )
) )
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs: with cancel_scope as cs:
ctx._scope = cs ctx._scope = cs
task_status.started(ctx) task_status.started(ctx)
@ -175,15 +182,13 @@ async def _invoke_non_context(
await chan.send( await chan.send(
Stop(cid=cid) Stop(cid=cid)
) )
# simplest function/method request-response pattern
# XXX: in the most minimally used case, just a scheduled internal runtime
# call to `Actor._cancel_task()` from the ctx-peer task since we
# don't (yet) have a dedicated IPC msg.
# ------ - ------
else: else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
failed_resp: bool = False failed_resp: bool = False
try: try:
ack = StartAck( ack = StartAck(
@ -354,8 +359,15 @@ async def _errors_relayed_via_ipc(
# channel. # channel.
task_status.started(err) task_status.started(err)
# always reraise KBIs so they propagate at the sys-process level. # always propagate KBIs at the sys-process level.
if isinstance(err, KeyboardInterrupt): if (
isinstance(err, KeyboardInterrupt)
# ?TODO? except when running in asyncio mode?
# |_ wut if you want to open a `@context` FROM an
# infected_aio task?
# and not actor.is_infected_aio()
):
raise raise
# RPC task bookeeping. # RPC task bookeeping.
@ -458,7 +470,6 @@ async def _invoke(
# tb: TracebackType = None # tb: TracebackType = None
cancel_scope = CancelScope() cancel_scope = CancelScope()
cs: CancelScope|None = None # ref when activated
ctx = actor.get_context( ctx = actor.get_context(
chan=chan, chan=chan,
cid=cid, cid=cid,
@ -607,6 +618,8 @@ async def _invoke(
# `@context` marked RPC function. # `@context` marked RPC function.
# - `._portal` is never set. # - `._portal` is never set.
try: try:
tn: trio.Nursery
rpc_ctx_cs: CancelScope
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
@ -616,7 +629,7 @@ async def _invoke(
), ),
): ):
ctx._scope_nursery = tn ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope rpc_ctx_cs = ctx._scope = tn.cancel_scope
task_status.started(ctx) task_status.started(ctx)
# TODO: better `trionics` tooling: # TODO: better `trionics` tooling:
@ -642,7 +655,7 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()` # itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error # which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser. # is not a `.cancel_acked` pleaser.
if ctx._scope.cancelled_caught: if rpc_ctx_cs.cancelled_caught:
our_uid: tuple = actor.uid our_uid: tuple = actor.uid
# first check for and raise any remote error # first check for and raise any remote error
@ -652,9 +665,7 @@ async def _invoke(
if re := ctx._remote_error: if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re) ctx._maybe_raise_remote_err(re)
cs: CancelScope = ctx._scope if rpc_ctx_cs.cancel_called:
if cs.cancel_called:
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by ' explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -680,9 +691,15 @@ async def _invoke(
elif canceller == ctx.chan.uid: elif canceller == ctx.chan.uid:
explain += f'its {ctx.peer_side!r}-side peer' explain += f'its {ctx.peer_side!r}-side peer'
else: elif canceller == our_uid:
explain += 'itself'
elif canceller:
explain += 'a remote peer' explain += 'a remote peer'
else:
explain += 'an unknown cause?'
explain += ( explain += (
add_div(message=explain) add_div(message=explain)
+ +
@ -911,7 +928,10 @@ async def process_messages(
f'IPC msg from peer\n' f'IPC msg from peer\n'
f'<= {chan.uid}\n\n' f'<= {chan.uid}\n\n'
# TODO: avoid fmting depending on loglevel for perf? # TODO: use of the pprinting of structs is
# FRAGILE and should prolly not be
#
# avoid fmting depending on loglevel for perf?
# -[ ] specifically `pretty_struct.pformat()` sub-call..? # -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this? # - how to only log-level-aware actually call this?
# -[ ] use `.msg.pretty_struct` here now instead! # -[ ] use `.msg.pretty_struct` here now instead!
@ -1177,7 +1197,7 @@ async def process_messages(
parent_chan=chan, parent_chan=chan,
) )
except TransportClosed: except TransportClosed as tc:
# channels "breaking" (for TCP streams by EOF or 104 # channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown # connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of # handshake for them (yet) and instead we simply bail out of
@ -1185,12 +1205,20 @@ async def process_messages(
# up.. # up..
# #
# TODO: maybe add a teardown handshake? and, # TODO: maybe add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call? # -[x] don't show this msg if it's an ephemeral discovery ep call?
# |_ see the below `.report_n_maybe_raise()` impl as well as
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
# for different read-failure cases.
# -[ ] figure out how this will break with other transports? # -[ ] figure out how this will break with other transports?
log.runtime( tc.report_n_maybe_raise(
f'IPC channel closed abruptly\n' message=(
f'<=x peer: {chan.uid}\n' f'peer IPC channel closed abruptly?\n\n'
f' |_{chan.raddr}\n' f'<=x {chan}\n'
f' |_{chan.raddr}\n\n'
)
+
tc.message
) )
# transport **WAS** disconnected # transport **WAS** disconnected
@ -1238,7 +1266,7 @@ async def process_messages(
'Exiting IPC msg loop with final msg\n\n' 'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n' f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n' f' |_{chan}\n\n'
f'{pretty_struct.pformat(msg)}' # f'{pretty_struct.pformat(msg)}'
) )
log.runtime(message) log.runtime(message)

View File

@ -54,11 +54,12 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT = '%b %d %H:%M:%S'
# FYI, ERROR is 40 # FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check?
CUSTOM_LEVELS: dict[str, int] = { CUSTOM_LEVELS: dict[str, int] = {
'TRANSPORT': 5, 'TRANSPORT': 5,
'RUNTIME': 15, 'RUNTIME': 15,
'DEVX': 17, 'DEVX': 17,
'CANCEL': 18, 'CANCEL': 22,
'PDB': 500, 'PDB': 500,
} }
STD_PALETTE = { STD_PALETTE = {
@ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter):
Delegate a log call to the underlying logger, after adding Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance. contextual information from this adapter instance.
NOTE: all custom level methods (above) delegate to this!
''' '''
if self.isEnabledFor(level): if self.isEnabledFor(level):
stacklevel: int = 3 stacklevel: int = 3

View File

@ -34,6 +34,9 @@ from pprint import (
saferepr, saferepr,
) )
from tractor.log import get_logger
log = get_logger()
# TODO: auto-gen type sig for input func both for # TODO: auto-gen type sig for input func both for
# type-msgs and logging of RPC tasks? # type-msgs and logging of RPC tasks?
# taken and modified from: # taken and modified from:
@ -143,7 +146,13 @@ def pformat(
else: # the `pprint` recursion-safe format: else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v) try:
val_str: str = saferepr(v)
except Exception:
log.exception(
'Failed to `saferepr({type(struct)})` !?\n'
)
return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -194,12 +203,20 @@ class Struct(
return sin_props return sin_props
pformat = pformat pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat # __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering # TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty? # inside a known tty?
# def __repr__(self) -> str: # def __repr__(self) -> str:
# ... # ...
__repr__ = pformat def __repr__(self) -> str:
try:
return pformat(self)
except Exception:
log.exception(
f'Failed to `pformat({type(self)})` !?\n'
)
return _Struct.__repr__(self)
def copy( def copy(
self, self,

View File

@ -156,11 +156,12 @@ class BroadcastState(Struct):
class BroadcastReceiver(ReceiveChannel): class BroadcastReceiver(ReceiveChannel):
''' '''
A memory receive channel broadcaster which is non-lossy for the A memory receive channel broadcaster which is non-lossy for
fastest consumer. the fastest consumer.
Additional consumer tasks can receive all produced values by registering Additional consumer tasks can receive all produced values by
with ``.subscribe()`` and receiving from the new instance it delivers. registering with ``.subscribe()`` and receiving from the new
instance it delivers.
''' '''
def __init__( def __init__(

View File

@ -18,8 +18,12 @@
Async context manager primitives with hard ``trio``-aware semantics Async context manager primitives with hard ``trio``-aware semantics
''' '''
from contextlib import asynccontextmanager as acm from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
import inspect import inspect
from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncContextManager, AsyncContextManager,
@ -30,13 +34,16 @@ from typing import (
Optional, Optional,
Sequence, Sequence,
TypeVar, TypeVar,
TYPE_CHECKING,
) )
import trio import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__) log = get_logger(__name__)
@ -46,8 +53,10 @@ T = TypeVar("T")
@acm @acm
async def maybe_open_nursery( async def maybe_open_nursery(
nursery: trio.Nursery | None = None, nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False, shield: bool = False,
lib: ModuleType = trio,
) -> AsyncGenerator[trio.Nursery, Any]: ) -> AsyncGenerator[trio.Nursery, Any]:
''' '''
Create a new nursery if None provided. Create a new nursery if None provided.
@ -58,13 +67,12 @@ async def maybe_open_nursery(
if nursery is not None: if nursery is not None:
yield nursery yield nursery
else: else:
async with trio.open_nursery() as nursery: async with lib.open_nursery() as nursery:
nursery.cancel_scope.shield = shield nursery.cancel_scope.shield = shield
yield nursery yield nursery
async def _enter_and_wait( async def _enter_and_wait(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
unwrapped: dict[int, T], unwrapped: dict[int, T],
all_entered: trio.Event, all_entered: trio.Event,
@ -91,7 +99,6 @@ async def _enter_and_wait(
@acm @acm
async def gather_contexts( async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[ ) -> AsyncGenerator[
@ -102,15 +109,17 @@ async def gather_contexts(
None, None,
]: ]:
''' '''
Concurrently enter a sequence of async context managers, each in Concurrently enter a sequence of async context managers (acms),
a separate ``trio`` task and deliver the unwrapped values in the each from a separate `trio` task and deliver the unwrapped
same order once all managers have entered. On exit all contexts are `yield`-ed values in the same order once all managers have entered.
subsequently and concurrently exited.
This function is somewhat similar to common usage of On exit, all acms are subsequently and concurrently exited.
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently This function is somewhat similar to a batch of non-blocking
entered and exited, and cancellation just works. calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation just works*(R).
''' '''
seed: int = id(mngrs) seed: int = id(mngrs)
@ -210,9 +219,10 @@ async def maybe_open_context(
) -> AsyncIterator[tuple[bool, T]]: ) -> AsyncIterator[tuple[bool, T]]:
''' '''
Maybe open a context manager if there is not already a _Cached Maybe open an async-context-manager (acm) if there is not already
version for the provided ``key`` for *this* actor. Return the a `_Cached` version for the provided (input) `key` for *this* actor.
_Cached instance on a _Cache hit.
Return the `_Cached` instance on a _Cache hit.
''' '''
fid = id(acm_func) fid = id(acm_func)
@ -273,8 +283,13 @@ async def maybe_open_context(
else: else:
_Cache.users += 1 _Cache.users += 1
log.runtime( log.runtime(
f'Reusing resource for `_Cache` user {_Cache.users}\n\n' f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {yielded!r}\n' f'{ctx_key!r} -> {type(yielded)}\n'
# TODO: make this work with values but without
# `msgspec.Struct` causing frickin crashes on field-type
# lookups..
# f'{ctx_key!r} -> {yielded!r}\n'
) )
lock.release() lock.release()
yield True, yielded yield True, yielded