Compare commits

...

7 Commits

Author SHA1 Message Date
Tyler Goodlet af3745684c More formal `TransportClosed` reporting/raising
Since it was all ad-hoc defined inside
`._ipc.MsgpackTCPStream._iter_pkts()` more or less, this starts
formalizing a way for particular transport backends to indicate whether
a disconnect condition should be re-raised in the RPC msg loop and if
not what log level to report it at (if any).

Based on our lone transport currently we try to suppress any logging
noise from ephemeral connections expected during normal actor
interaction and discovery subsys ops:
- any short lived discovery related TCP connects are only logged as
  `.transport()` level.
- both `.error()` and raise on any underlying `trio.ClosedResource`
  cause since that normally means some task touched transport layer
  internals that it shouldn't have.
- do a `.warning()` on anything else unexpected.

Impl deats:
- extend the `._exceptions.TransportClosed` to accept an input log
  level, raise-on-report toggle and custom reporting & raising via a new
  `.report_n_maybe_raise()` method.
- construct the TCs with inputs per case in (the newly named) `._iter_pkts().
- call ^ this method from the `TransportClosed` handler block inside the
  RPC msg loop thus delegating reporting levels and/or raising to the
  backend's per-case TC instantiating.

Related `._ipc` changes:
- mask out all the `MsgpackTCPStream._codec` debug helper stuff and drop
  any lingering cruft from the initial proto-ing of msg-codecs.
- rename some attrs/methods:
  |_`MsgpackTCPStream._iter_packets()` -> `._iter_pkts()` and
    `._agen` -> `_aiter_pkts`.
  |_`Channel._aiter_recv()` -> `._aiter_msgs()` and
    `._agen` -> `_aiter_msgs`.
- add `hide_tb: bool` support to `Channel.send()` and only show the
  frame on non-MTEs.
2024-07-02 12:21:26 -04:00
Tyler Goodlet 3907cba68e Refine some `.trionics` docs and logging
- allow passing and report the lib name (`trio` or `tractor`) from
  `maybe_open_nursery()`.
- use `.runtime()` level when reporting `_Cache`-hits in
  `maybe_open_context()`.
- tidy up some doc strings.
2024-06-28 19:28:12 -04:00
Tyler Goodlet e3d59964af Woops, set `.cancel()` level in custom levels table.. 2024-06-28 19:27:13 -04:00
Tyler Goodlet ba83bab776 Todo a test for sync-pausing from non-main-root-tasks 2024-06-28 19:26:35 -04:00
Tyler Goodlet 18d440c207 (Re)type annot some tests
- For the (still not finished) `test_caps_based_msging`, switch to
  using the new `PayloadMsg`.
- add `testdir` fixture type.
2024-06-28 19:24:03 -04:00
Tyler Goodlet edac717613 Use `msgspec.Struct.__repr__()` failover impl
In case the struct doesn't import a field type (which will cause the
`.pformat()` to raise) just report the issue and try to fall back to the
original `repr()` version.
2024-06-28 19:17:05 -04:00
Tyler Goodlet 7e93b81a83 Don't use pretty struct stuff in `._invoke`
It's too fragile to put in side core RPC machinery since
`msgspec.Struct` defs can fail if a field type can't be
looked up at creation time (like can easily happen if you
conditionally import using `if TYPE_CHECKING:`)

Also,
- rename `cs` to `rpc_ctx_cs: CancelScope` since it's literally
  the wrapping RPC `Context._scope`.
- report self cancellation via `explain: str` and add tail case for
  "unknown cause".
- put a ?TODO? around what to do about KBIs if a context is opened
  from an `infected_aio`-actor task.
- similar to our nursery and portal add TODO list for moving all
  `_invoke_non_context()` content out the RPC core and instead implement
  them as `.hilevel` endpoint helpers (maybe as decorators?)which under
  neath define `@context`-funcs.
2024-06-28 19:06:17 -04:00
11 changed files with 400 additions and 182 deletions

View File

@ -11,9 +11,6 @@ from typing import (
Type,
Union,
)
from contextvars import (
Context,
)
from msgspec import (
structs,
@ -27,6 +24,7 @@ import tractor
from tractor import (
_state,
MsgTypeError,
Context,
)
from tractor.msg import (
_codec,
@ -41,7 +39,7 @@ from tractor.msg import (
from tractor.msg.types import (
_payload_msgs,
log,
Msg,
PayloadMsg,
Started,
mk_msg_spec,
)
@ -61,7 +59,7 @@ def mk_custom_codec(
uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
# field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
@ -321,12 +319,12 @@ def dec_type_union(
import importlib
types: list[Type] = []
for type_name in type_names:
for ns in [
for mod in [
typing,
importlib.import_module(__name__),
]:
if type_ref := getattr(
ns,
mod,
type_name,
False,
):
@ -744,7 +742,7 @@ def chk_pld_type(
# 'Error', .pld: ErrorData
codec: MsgCodec = mk_codec(
# NOTE: this ONLY accepts `Msg.pld` fields of a specified
# NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified
# type union.
ipc_pld_spec=payload_spec,
)
@ -752,7 +750,7 @@ def chk_pld_type(
# make a one-off dec to compare with our `MsgCodec` instance
# which does the below `mk_msg_spec()` call internally
ipc_msg_spec: Union[Type[Struct]]
msg_types: list[Msg[payload_spec]]
msg_types: list[PayloadMsg[payload_spec]]
(
ipc_msg_spec,
msg_types,
@ -761,7 +759,7 @@ def chk_pld_type(
)
_enc = msgpack.Encoder()
_dec = msgpack.Decoder(
type=ipc_msg_spec or Any, # like `Msg[Any]`
type=ipc_msg_spec or Any, # like `PayloadMsg[Any]`
)
assert (
@ -806,7 +804,7 @@ def chk_pld_type(
'cid': '666',
'pld': pld,
}
enc_msg: Msg = typedef(**kwargs)
enc_msg: PayloadMsg = typedef(**kwargs)
_wire_bytes: bytes = _enc.encode(enc_msg)
wire_bytes: bytes = codec.enc.encode(enc_msg)
@ -883,25 +881,16 @@ def test_limit_msgspec():
debug_mode=True
):
# ensure we can round-trip a boxing `Msg`
# ensure we can round-trip a boxing `PayloadMsg`
assert chk_pld_type(
# Msg,
Any,
None,
payload_spec=Any,
pld=None,
expect_roundtrip=True,
)
# TODO: don't need this any more right since
# `msgspec>=0.15` has the nice generics stuff yah??
#
# manually override the type annot of the payload
# field and ensure it propagates to all msg-subtypes.
# Msg.__annotations__['pld'] = Any
# verify that a mis-typed payload value won't decode
assert not chk_pld_type(
# Msg,
int,
payload_spec=int,
pld='doggy',
)
@ -913,18 +902,16 @@ def test_limit_msgspec():
value: Any
assert not chk_pld_type(
# Msg,
CustomPayload,
payload_spec=CustomPayload,
pld='doggy',
)
assert chk_pld_type(
# Msg,
CustomPayload,
payload_spec=CustomPayload,
pld=CustomPayload(name='doggy', value='urmom')
)
# uhh bc we can `.pause_from_sync()` now! :surfer:
# yah, we can `.pause_from_sync()` now!
# breakpoint()
trio.run(main)

View File

@ -1336,6 +1336,23 @@ def test_shield_pause(
child.expect(pexpect.EOF)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead
# (in terms of `greenback` and/or extra threads) and if it's from
# a sync scope suggest that usage must first call
# `ensure_portal()` in the (eventual parent) async calling scope?
def test_sync_pause_from_bg_task_in_root_actor_():
'''
When used from the root actor, normally we can only implicitly
support `.pause_from_sync()` from the main-parent-task (that
opens the runtime via `open_root_actor()`) since `greenback`
requires a `.ensure_portal()` call per `trio.Task` where it is
used.
'''
...
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
def test_correct_frames_below_hidden():
'''

View File

@ -19,7 +19,7 @@ from tractor._testing import (
@pytest.fixture
def run_example_in_subproc(
loglevel: str,
testdir,
testdir: pytest.Testdir,
reg_addr: tuple[str, int],
):

View File

@ -49,6 +49,7 @@ from ._exceptions import (
ModuleNotExposed as ModuleNotExposed,
MsgTypeError as MsgTypeError,
RemoteActorError as RemoteActorError,
TransportClosed as TransportClosed,
)
from .devx import (
breakpoint as breakpoint,

View File

@ -906,8 +906,59 @@ class StreamOverrun(
'''
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class TransportClosed(trio.BrokenResourceError):
'''
IPC transport (protocol) connection was closed or broke and
indicates that the wrapping communication `Channel` can no longer
be used to send/receive msgs from the remote peer.
'''
def __init__(
self,
message: str,
loglevel: str = 'transport',
cause: BaseException|None = None,
raise_on_report: bool = False,
) -> None:
self.message: str = message
self._loglevel = loglevel
super().__init__(message)
if cause is not None:
self.__cause__ = cause
# flag to toggle whether the msg loop should raise
# the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report
def report_n_maybe_raise(
self,
message: str|None = None,
) -> None:
'''
Using the init-specified log level emit a logging report
for this error.
'''
message: str = message or self.message
# when a cause is set, slap it onto the log emission.
if cause := self.__cause__:
cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__)
)
message += (
f'{cause_tb_str}\n' # tb
f' {cause}\n' # exc repr
)
getattr(log, self._loglevel)(message)
# some errors we want to blow up from
# inside the RPC msg loop
if self._raise_on_report:
raise self from cause
class NoResult(RuntimeError):

View File

@ -54,7 +54,7 @@ from tractor._exceptions import (
)
from tractor.msg import (
_ctxvar_MsgCodec,
_codec,
# _codec, XXX see `self._codec` sanity/debug checks
MsgCodec,
types as msgtypes,
pretty_struct,
@ -65,8 +65,18 @@ log = get_logger(__name__)
_is_windows = platform.system() == 'Windows'
def get_stream_addrs(stream: trio.SocketStream) -> tuple:
# should both be IP sockets
def get_stream_addrs(
stream: trio.SocketStream
) -> tuple[
tuple[str, int], # local
tuple[str, int], # remote
]:
'''
Return the `trio` streaming transport prot's socket-addrs for
both the local and remote sides as a pair.
'''
# rn, should both be IP sockets
lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername()
return (
@ -75,17 +85,22 @@ def get_stream_addrs(stream: trio.SocketStream) -> tuple:
)
# TODO: this should be our `Union[*msgtypes.__spec__]` now right?
MsgType = TypeVar("MsgType")
# TODO: consider using a generic def and indexing with our eventual
# msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
# - https://jcristharif.com/msgspec/usage.html#structs
# from tractor.msg.types import MsgType
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
# => BLEH, except can't bc prots must inherit typevar or param-spec
# vars..
MsgType = TypeVar('MsgType')
# TODO: break up this mod into a subpkg so we can start adding new
# backends and move this type stuff into a dedicated file.. Bo
#
@runtime_checkable
class MsgTransport(Protocol[MsgType]):
#
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
stream: trio.SocketStream
drained: list[MsgType]
@ -120,9 +135,9 @@ class MsgTransport(Protocol[MsgType]):
...
# TODO: not sure why we have to inherit here, but it seems to be an
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# probably should make a `mypy` issue?
# TODO: typing oddity.. not sure why we have to inherit here, but it
# seems to be an issue with `get_msg_transport()` returning
# a `Type[Protocol]`; probably should make a `mypy` issue?
class MsgpackTCPStream(MsgTransport):
'''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
@ -145,7 +160,7 @@ class MsgpackTCPStream(MsgTransport):
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
#
# TODO: define this as a `Codec` struct which can be
# overriden dynamically by the application/runtime.
# overriden dynamically by the application/runtime?
codec: tuple[
Callable[[Any], Any]|None, # coder
Callable[[type, Any], Any]|None, # decoder
@ -160,7 +175,7 @@ class MsgpackTCPStream(MsgTransport):
self._laddr, self._raddr = get_stream_addrs(stream)
# create read loop instance
self._agen = self._iter_packets()
self._aiter_pkts = self._iter_packets()
self._send_lock = trio.StrictFIFOLock()
# public i guess?
@ -174,15 +189,12 @@ class MsgpackTCPStream(MsgTransport):
# allow for custom IPC msg interchange format
# dynamic override Bo
self._task = trio.lowlevel.current_task()
self._codec: MsgCodec = (
codec
or
_codec._ctxvar_MsgCodec.get()
)
# TODO: mask out before release?
# log.runtime(
# f'New {self} created with codec\n'
# f'codec: {self._codec}\n'
# XXX for ctxvar debug only!
# self._codec: MsgCodec = (
# codec
# or
# _codec._ctxvar_MsgCodec.get()
# )
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
@ -190,6 +202,11 @@ class MsgpackTCPStream(MsgTransport):
Yield `bytes`-blob decoded packets from the underlying TCP
stream using the current task's `MsgCodec`.
This is a streaming routine implemented as an async generator
func (which was the original design, but could be changed?)
and is allocated by a `.__call__()` inside `.__init__()` where
it is assigned to the `._aiter_pkts` attr.
'''
decodes_failed: int = 0
@ -204,16 +221,82 @@ class MsgpackTCPStream(MsgTransport):
# seem to be getting racy failures here on
# arbiter/registry name subs..
trio.BrokenResourceError,
):
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
) as trans_err:
loglevel = 'transport'
match trans_err:
# case (
# ConnectionResetError()
# ):
# loglevel = 'transport'
# peer actor (graceful??) TCP EOF but `tricycle`
# seems to raise a 0-bytes-read?
case ValueError() if (
'unclean EOF' in trans_err.args[0]
):
pass
# peer actor (task) prolly shutdown quickly due
# to cancellation
case trio.BrokenResourceError() if (
'Connection reset by peer' in trans_err.args[0]
):
pass
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
loglevel: str = 'warning'
raise TransportClosed(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
loglevel=loglevel,
) from trans_err
# XXX definitely can happen if transport is closed
# manually by another `trio.lowlevel.Task` in the
# same actor; we use this in some simulated fault
# testing for ex, but generally should never happen
# under normal operation!
#
# NOTE: as such we always re-raise this error from the
# RPC msg loop!
except trio.ClosedResourceError as closure_err:
raise TransportClosed(
message=(
f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n'
f' |_{self}\n'
),
loglevel='error',
raise_on_report=(
closure_err.args[0] == 'another task closed this fd'
or
closure_err.args[0] in ['another task closed this fd']
),
) from closure_err
# graceful TCP EOF disconnect
if header == b'':
raise TransportClosed(
f'transport {self} was already closed prior ro read'
message=(
f'IPC transport already gracefully closed\n'
f')>\n'
f'|_{self}\n'
),
loglevel='transport',
# cause=??? # handy or no?
)
size: int
size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore
@ -225,33 +308,20 @@ class MsgpackTCPStream(MsgTransport):
# the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get()
# TODO: mask out before release?
if self._codec.pld_spec != codec.pld_spec:
# assert (
# task := trio.lowlevel.current_task()
# ) is not self._task
# self._task = task
self._codec = codec
log.runtime(
f'Using new codec in {self}.recv()\n'
f'codec: {self._codec}\n\n'
f'msg_bytes: {msg_bytes}\n'
)
# XXX for ctxvar debug only!
# if self._codec.pld_spec != codec.pld_spec:
# assert (
# task := trio.lowlevel.current_task()
# ) is not self._task
# self._task = task
# self._codec = codec
# log.runtime(
# f'Using new codec in {self}.recv()\n'
# f'codec: {self._codec}\n\n'
# f'msg_bytes: {msg_bytes}\n'
# )
yield codec.decode(msg_bytes)
# TODO: remove, was only for orig draft impl
# testing.
#
# curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
# obj = curr_codec.decode(msg_bytes)
# if (
# curr_codec is not
# _codec._def_msgspec_codec
# ):
# print(f'OBJ: {obj}\n')
#
# yield obj
# XXX NOTE: since the below error derives from
# `DecodeError` we need to catch is specially
# and always raise such that spec violations
@ -295,7 +365,8 @@ class MsgpackTCPStream(MsgTransport):
msg: msgtypes.MsgType,
strict_types: bool = True,
# hide_tb: bool = False,
hide_tb: bool = False,
) -> None:
'''
Send a msgpack encoded py-object-blob-as-msg over TCP.
@ -304,21 +375,24 @@ class MsgpackTCPStream(MsgTransport):
invalid msg type
'''
# __tracebackhide__: bool = hide_tb
__tracebackhide__: bool = hide_tb
# XXX see `trio._sync.AsyncContextManagerMixin` for details
# on the `.acquire()`/`.release()` sequencing..
async with self._send_lock:
# NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get()
# TODO: mask out before release?
if self._codec.pld_spec != codec.pld_spec:
self._codec = codec
log.runtime(
f'Using new codec in {self}.send()\n'
f'codec: {self._codec}\n\n'
f'msg: {msg}\n'
)
# XXX for ctxvar debug only!
# if self._codec.pld_spec != codec.pld_spec:
# self._codec = codec
# log.runtime(
# f'Using new codec in {self}.send()\n'
# f'codec: {self._codec}\n\n'
# f'msg: {msg}\n'
# )
if type(msg) not in msgtypes.__msg_types__:
if strict_types:
@ -352,6 +426,16 @@ class MsgpackTCPStream(MsgTransport):
size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
# ?TODO? does it help ever to dynamically show this
# frame?
# try:
# <the-above_code>
# except BaseException as _err:
# err = _err
# if not isinstance(err, MsgTypeError):
# __tracebackhide__: bool = False
# raise
@property
def laddr(self) -> tuple[str, int]:
return self._laddr
@ -361,7 +445,7 @@ class MsgpackTCPStream(MsgTransport):
return self._raddr
async def recv(self) -> Any:
return await self._agen.asend(None)
return await self._aiter_pkts.asend(None)
async def drain(self) -> AsyncIterator[dict]:
'''
@ -378,7 +462,7 @@ class MsgpackTCPStream(MsgTransport):
yield msg
def __aiter__(self):
return self._agen
return self._aiter_pkts
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
@ -433,7 +517,7 @@ class Channel:
# set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None
self._agen = self._aiter_recv()
self._aiter_msgs = self._iter_msgs()
self._exc: Exception|None = None # set if far end actor errors
self._closed: bool = False
@ -497,8 +581,6 @@ class Channel:
)
return self._transport
# TODO: something simliar at the IPC-`Context`
# level so as to support
@cm
def apply_codec(
self,
@ -517,6 +599,7 @@ class Channel:
finally:
self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs?
def __repr__(self) -> str:
if not self._transport:
return '<Channel with inactive transport?>'
@ -560,27 +643,43 @@ class Channel:
)
return transport
# TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])`
# instead of the `try/except` hack we have rn..
# seems like a pretty useful thing to have in general
# along with being able to filter certain stack frame(s / sets)
# possibly based on the current log-level?
async def send(
self,
payload: Any,
# hide_tb: bool = False,
hide_tb: bool = False,
) -> None:
'''
Send a coded msg-blob over the transport.
'''
# __tracebackhide__: bool = hide_tb
log.transport(
'=> send IPC msg:\n\n'
f'{pformat(payload)}\n'
) # type: ignore
assert self._transport
await self._transport.send(
payload,
# hide_tb=hide_tb,
)
__tracebackhide__: bool = hide_tb
try:
log.transport(
'=> send IPC msg:\n\n'
f'{pformat(payload)}\n'
)
# assert self._transport # but why typing?
await self._transport.send(
payload,
hide_tb=hide_tb,
)
except BaseException as _err:
err = _err # bind for introspection
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
assert err.cid
raise
async def recv(self) -> Any:
assert self._transport
@ -617,8 +716,11 @@ class Channel:
await self.aclose(*args)
def __aiter__(self):
return self._agen
return self._aiter_msgs
# ?TODO? run any reconnection sequence?
# -[ ] prolly should be impl-ed as deco-API?
#
# async def _reconnect(self) -> None:
# """Handle connection failures by polling until a reconnect can be
# established.
@ -636,7 +738,6 @@ class Channel:
# else:
# log.transport("Stream connection re-established!")
# # TODO: run any reconnection sequence
# # on_recon = self._recon_seq
# # if on_recon:
# # await on_recon(self)
@ -650,11 +751,17 @@ class Channel:
# " for re-establishment")
# await trio.sleep(1)
async def _aiter_recv(
async def _iter_msgs(
self
) -> AsyncGenerator[Any, None]:
'''
Async iterate items from underlying stream.
Yield `MsgType` IPC msgs decoded and deliverd from
an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an async-gen
func (same a `MsgTransport._iter_pkts()`) gets allocated by
a `.__call__()` inside `.__init__()` where it is assigned to
the `._aiter_msgs` attr.
'''
assert self._transport
@ -680,15 +787,6 @@ class Channel:
case _:
yield msg
# TODO: if we were gonna do this it should be
# done up at the `MsgStream` layer!
#
# sent = yield item
# if sent is not None:
# # optimization, passing None through all the
# # time is pointless
# await self._transport.send(sent)
except trio.BrokenResourceError:
# if not self._autorecon:

View File

@ -68,7 +68,7 @@ from .msg import (
MsgCodec,
PayloadT,
NamespacePath,
pretty_struct,
# pretty_struct,
_ops as msgops,
)
from tractor.msg.types import (
@ -89,6 +89,16 @@ if TYPE_CHECKING:
log = get_logger('tractor')
# ?TODO? move to a `tractor.lowlevel._rpc` with the below
# func-type-cases implemented "on top of" `@context` defs:
# -[ ] std async func helper decorated with `@rpc_func`?
# -[ ] `Portal.open_stream_from()` with async-gens?
# |_ possibly a duplex form of this with a
# `sent_from_peer = yield send_to_peer` form, which would require
# syncing the send/recv side with possibly `.receive_nowait()`
# on each `yield`?
# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with
# user only defining a single-`yield` generator-func?
async def _invoke_non_context(
actor: Actor,
cancel_scope: CancelScope,
@ -108,8 +118,9 @@ async def _invoke_non_context(
] = trio.TASK_STATUS_IGNORED,
):
__tracebackhide__: bool = True
cs: CancelScope|None = None # ref when activated
# TODO: can we unify this with the `context=True` impl below?
# ?TODO? can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
await chan.send(
StartAck(
@ -160,10 +171,6 @@ async def _invoke_non_context(
functype='asyncgen',
)
)
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
@ -175,15 +182,13 @@ async def _invoke_non_context(
await chan.send(
Stop(cid=cid)
)
# simplest function/method request-response pattern
# XXX: in the most minimally used case, just a scheduled internal runtime
# call to `Actor._cancel_task()` from the ctx-peer task since we
# don't (yet) have a dedicated IPC msg.
# ------ - ------
else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
failed_resp: bool = False
try:
ack = StartAck(
@ -354,8 +359,15 @@ async def _errors_relayed_via_ipc(
# channel.
task_status.started(err)
# always reraise KBIs so they propagate at the sys-process level.
if isinstance(err, KeyboardInterrupt):
# always propagate KBIs at the sys-process level.
if (
isinstance(err, KeyboardInterrupt)
# ?TODO? except when running in asyncio mode?
# |_ wut if you want to open a `@context` FROM an
# infected_aio task?
# and not actor.is_infected_aio()
):
raise
# RPC task bookeeping.
@ -458,7 +470,6 @@ async def _invoke(
# tb: TracebackType = None
cancel_scope = CancelScope()
cs: CancelScope|None = None # ref when activated
ctx = actor.get_context(
chan=chan,
cid=cid,
@ -607,6 +618,8 @@ async def _invoke(
# `@context` marked RPC function.
# - `._portal` is never set.
try:
tn: trio.Nursery
rpc_ctx_cs: CancelScope
async with (
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
@ -616,7 +629,7 @@ async def _invoke(
),
):
ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope
rpc_ctx_cs = ctx._scope = tn.cancel_scope
task_status.started(ctx)
# TODO: better `trionics` tooling:
@ -642,7 +655,7 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser.
if ctx._scope.cancelled_caught:
if rpc_ctx_cs.cancelled_caught:
our_uid: tuple = actor.uid
# first check for and raise any remote error
@ -652,9 +665,7 @@ async def _invoke(
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
cs: CancelScope = ctx._scope
if cs.cancel_called:
if rpc_ctx_cs.cancel_called:
canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -680,9 +691,15 @@ async def _invoke(
elif canceller == ctx.chan.uid:
explain += f'its {ctx.peer_side!r}-side peer'
else:
elif canceller == our_uid:
explain += 'itself'
elif canceller:
explain += 'a remote peer'
else:
explain += 'an unknown cause?'
explain += (
add_div(message=explain)
+
@ -911,7 +928,10 @@ async def process_messages(
f'IPC msg from peer\n'
f'<= {chan.uid}\n\n'
# TODO: avoid fmting depending on loglevel for perf?
# TODO: use of the pprinting of structs is
# FRAGILE and should prolly not be
#
# avoid fmting depending on loglevel for perf?
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this?
# -[ ] use `.msg.pretty_struct` here now instead!
@ -1177,7 +1197,7 @@ async def process_messages(
parent_chan=chan,
)
except TransportClosed:
except TransportClosed as tc:
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
@ -1185,12 +1205,20 @@ async def process_messages(
# up..
#
# TODO: maybe add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
# -[x] don't show this msg if it's an ephemeral discovery ep call?
# |_ see the below `.report_n_maybe_raise()` impl as well as
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
# for different read-failure cases.
# -[ ] figure out how this will break with other transports?
log.runtime(
f'IPC channel closed abruptly\n'
f'<=x peer: {chan.uid}\n'
f' |_{chan.raddr}\n'
tc.report_n_maybe_raise(
message=(
f'peer IPC channel closed abruptly?\n\n'
f'<=x {chan}\n'
f' |_{chan.raddr}\n\n'
)
+
tc.message
)
# transport **WAS** disconnected
@ -1238,7 +1266,7 @@ async def process_messages(
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
f'{pretty_struct.pformat(msg)}'
# f'{pretty_struct.pformat(msg)}'
)
log.runtime(message)

View File

@ -54,11 +54,12 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
# FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check?
CUSTOM_LEVELS: dict[str, int] = {
'TRANSPORT': 5,
'RUNTIME': 15,
'DEVX': 17,
'CANCEL': 18,
'CANCEL': 22,
'PDB': 500,
}
STD_PALETTE = {
@ -147,6 +148,8 @@ class StackLevelAdapter(LoggerAdapter):
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
NOTE: all custom level methods (above) delegate to this!
'''
if self.isEnabledFor(level):
stacklevel: int = 3

View File

@ -34,6 +34,9 @@ from pprint import (
saferepr,
)
from tractor.log import get_logger
log = get_logger()
# TODO: auto-gen type sig for input func both for
# type-msgs and logging of RPC tasks?
# taken and modified from:
@ -143,7 +146,13 @@ def pformat(
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
try:
val_str: str = saferepr(v)
except Exception:
log.exception(
'Failed to `saferepr({type(struct)})` !?\n'
)
return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -194,12 +203,20 @@ class Struct(
return sin_props
pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
__repr__ = pformat
def __repr__(self) -> str:
try:
return pformat(self)
except Exception:
log.exception(
f'Failed to `pformat({type(self)})` !?\n'
)
return _Struct.__repr__(self)
def copy(
self,

View File

@ -156,11 +156,12 @@ class BroadcastState(Struct):
class BroadcastReceiver(ReceiveChannel):
'''
A memory receive channel broadcaster which is non-lossy for the
fastest consumer.
A memory receive channel broadcaster which is non-lossy for
the fastest consumer.
Additional consumer tasks can receive all produced values by registering
with ``.subscribe()`` and receiving from the new instance it delivers.
Additional consumer tasks can receive all produced values by
registering with ``.subscribe()`` and receiving from the new
instance it delivers.
'''
def __init__(

View File

@ -18,8 +18,12 @@
Async context manager primitives with hard ``trio``-aware semantics
'''
from contextlib import asynccontextmanager as acm
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
import inspect
from types import ModuleType
from typing import (
Any,
AsyncContextManager,
@ -30,13 +34,16 @@ from typing import (
Optional,
Sequence,
TypeVar,
TYPE_CHECKING,
)
import trio
from tractor._state import current_actor
from tractor.log import get_logger
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__)
@ -46,8 +53,10 @@ T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery | None = None,
nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False,
lib: ModuleType = trio,
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
@ -58,13 +67,12 @@ async def maybe_open_nursery(
if nursery is not None:
yield nursery
else:
async with trio.open_nursery() as nursery:
async with lib.open_nursery() as nursery:
nursery.cancel_scope.shield = shield
yield nursery
async def _enter_and_wait(
mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
@ -91,7 +99,6 @@ async def _enter_and_wait(
@acm
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[
@ -102,15 +109,17 @@ async def gather_contexts(
None,
]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.
Concurrently enter a sequence of async context managers (acms),
each from a separate `trio` task and deliver the unwrapped
`yield`-ed values in the same order once all managers have entered.
This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited, and cancellation just works.
On exit, all acms are subsequently and concurrently exited.
This function is somewhat similar to a batch of non-blocking
calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation just works*(R).
'''
seed: int = id(mngrs)
@ -210,9 +219,10 @@ async def maybe_open_context(
) -> AsyncIterator[tuple[bool, T]]:
'''
Maybe open a context manager if there is not already a _Cached
version for the provided ``key`` for *this* actor. Return the
_Cached instance on a _Cache hit.
Maybe open an async-context-manager (acm) if there is not already
a `_Cached` version for the provided (input) `key` for *this* actor.
Return the `_Cached` instance on a _Cache hit.
'''
fid = id(acm_func)
@ -273,8 +283,13 @@ async def maybe_open_context(
else:
_Cache.users += 1
log.runtime(
f'Reusing resource for `_Cache` user {_Cache.users}\n\n'
f'{ctx_key!r} -> {yielded!r}\n'
f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n'
# TODO: make this work with values but without
# `msgspec.Struct` causing frickin crashes on field-type
# lookups..
# f'{ctx_key!r} -> {yielded!r}\n'
)
lock.release()
yield True, yielded